1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3 
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6     atomic::{AtomicBool, Ordering},
7     Arc,
8 };
9 use std::time::Duration;
10 use std::{
11     future::Future,
12     io::{self, ErrorKind, Read, Write},
13     task::{Context, Waker},
14 };
15 
16 use nix::unistd::{read, write};
17 
18 use futures::poll;
19 
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio::io::Interest;
22 use tokio_test::{assert_err, assert_pending};
23 
24 struct TestWaker {
25     inner: Arc<TestWakerInner>,
26     waker: Waker,
27 }
28 
29 #[derive(Default)]
30 struct TestWakerInner {
31     awoken: AtomicBool,
32 }
33 
34 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)35     fn wake_by_ref(arc_self: &Arc<Self>) {
36         arc_self.awoken.store(true, Ordering::SeqCst);
37     }
38 }
39 
40 impl TestWaker {
new() -> Self41     fn new() -> Self {
42         let inner: Arc<TestWakerInner> = Default::default();
43 
44         Self {
45             inner: inner.clone(),
46             waker: futures::task::waker(inner),
47         }
48     }
49 
awoken(&self) -> bool50     fn awoken(&self) -> bool {
51         self.inner.awoken.swap(false, Ordering::SeqCst)
52     }
53 
context(&self) -> Context<'_>54     fn context(&self) -> Context<'_> {
55         Context::from_waker(&self.waker)
56     }
57 }
58 
59 #[derive(Debug)]
60 struct FileDescriptor {
61     fd: std::os::fd::OwnedFd,
62 }
63 
64 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd65     fn as_raw_fd(&self) -> RawFd {
66         self.fd.as_raw_fd()
67     }
68 }
69 
70 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>71     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
72         read(self.fd.as_raw_fd(), buf).map_err(io::Error::from)
73     }
74 }
75 
76 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>77     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
78         (self as &Self).read(buf)
79     }
80 }
81 
82 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>83     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
84         write(&self.fd, buf).map_err(io::Error::from)
85     }
86 
flush(&mut self) -> io::Result<()>87     fn flush(&mut self) -> io::Result<()> {
88         Ok(())
89     }
90 }
91 
92 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>93     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
94         (self as &Self).write(buf)
95     }
96 
flush(&mut self) -> io::Result<()>97     fn flush(&mut self) -> io::Result<()> {
98         (self as &Self).flush()
99     }
100 }
101 
set_nonblocking(fd: RawFd)102 fn set_nonblocking(fd: RawFd) {
103     use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
104 
105     let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
106 
107     if flags < 0 {
108         panic!(
109             "bad return value from fcntl(F_GETFL): {} ({:?})",
110             flags,
111             nix::Error::last()
112         );
113     }
114 
115     let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
116 
117     nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
118 }
119 
socketpair() -> (FileDescriptor, FileDescriptor)120 fn socketpair() -> (FileDescriptor, FileDescriptor) {
121     use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
122 
123     let (fd_a, fd_b) = socket::socketpair(
124         AddressFamily::Unix,
125         SockType::Stream,
126         None,
127         SockFlag::empty(),
128     )
129     .expect("socketpair");
130     let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
131 
132     set_nonblocking(fds.0.fd.as_raw_fd());
133     set_nonblocking(fds.1.fd.as_raw_fd());
134 
135     fds
136 }
137 
drain(mut fd: &FileDescriptor, mut amt: usize)138 fn drain(mut fd: &FileDescriptor, mut amt: usize) {
139     let mut buf = [0u8; 512];
140     while amt > 0 {
141         match fd.read(&mut buf[..]) {
142             Err(e) if e.kind() == ErrorKind::WouldBlock => {}
143             Ok(0) => panic!("unexpected EOF"),
144             Err(e) => panic!("unexpected error: {e:?}"),
145             Ok(x) => amt -= x,
146         }
147     }
148 }
149 
150 #[tokio::test]
151 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
initially_writable()152 async fn initially_writable() {
153     let (a, b) = socketpair();
154 
155     let afd_a = AsyncFd::new(a).unwrap();
156     let afd_b = AsyncFd::new(b).unwrap();
157 
158     afd_a.writable().await.unwrap().clear_ready();
159     afd_b.writable().await.unwrap().clear_ready();
160 
161     tokio::select! {
162         biased;
163         _ = tokio::time::sleep(Duration::from_millis(10)) => {},
164         _ = afd_a.readable() => panic!("Unexpected readable state"),
165         _ = afd_b.readable() => panic!("Unexpected readable state"),
166     }
167 }
168 
169 #[tokio::test]
170 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
reset_readable()171 async fn reset_readable() {
172     let (a, mut b) = socketpair();
173 
174     let afd_a = AsyncFd::new(a).unwrap();
175 
176     let readable = afd_a.readable();
177     tokio::pin!(readable);
178 
179     tokio::select! {
180         _ = readable.as_mut() => panic!(),
181         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
182     }
183 
184     b.write_all(b"0").unwrap();
185 
186     let mut guard = readable.await.unwrap();
187 
188     guard
189         .try_io(|_| afd_a.get_ref().read(&mut [0]))
190         .unwrap()
191         .unwrap();
192 
193     // `a` is not readable, but the reactor still thinks it is
194     // (because we have not observed a not-ready error yet)
195     afd_a.readable().await.unwrap().retain_ready();
196 
197     // Explicitly clear the ready state
198     guard.clear_ready();
199 
200     let readable = afd_a.readable();
201     tokio::pin!(readable);
202 
203     tokio::select! {
204         _ = readable.as_mut() => panic!(),
205         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
206     }
207 
208     b.write_all(b"0").unwrap();
209 
210     // We can observe the new readable event
211     afd_a.readable().await.unwrap().clear_ready();
212 }
213 
214 #[tokio::test]
215 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
reset_writable()216 async fn reset_writable() {
217     let (a, b) = socketpair();
218 
219     let afd_a = AsyncFd::new(a).unwrap();
220 
221     let mut guard = afd_a.writable().await.unwrap();
222 
223     // Write until we get a WouldBlock. This also clears the ready state.
224     let mut bytes = 0;
225     while let Ok(Ok(amt)) = guard.try_io(|_| afd_a.get_ref().write(&[0; 512][..])) {
226         bytes += amt;
227     }
228 
229     // Writable state should be cleared now.
230     let writable = afd_a.writable();
231     tokio::pin!(writable);
232 
233     tokio::select! {
234         _ = writable.as_mut() => panic!(),
235         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
236     }
237 
238     // Read from the other side; we should become writable now.
239     drain(&b, bytes);
240 
241     let _ = writable.await.unwrap();
242 }
243 
244 #[derive(Debug)]
245 struct ArcFd<T>(Arc<T>);
246 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd247     fn as_raw_fd(&self) -> RawFd {
248         self.0.as_raw_fd()
249     }
250 }
251 
252 #[tokio::test]
253 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
drop_closes()254 async fn drop_closes() {
255     let (a, mut b) = socketpair();
256 
257     let afd_a = AsyncFd::new(a).unwrap();
258 
259     assert_eq!(
260         ErrorKind::WouldBlock,
261         b.read(&mut [0]).err().unwrap().kind()
262     );
263 
264     std::mem::drop(afd_a);
265 
266     assert_eq!(0, b.read(&mut [0]).unwrap());
267 
268     // into_inner does not close the fd
269 
270     let (a, mut b) = socketpair();
271     let afd_a = AsyncFd::new(a).unwrap();
272     let _a: FileDescriptor = afd_a.into_inner();
273 
274     assert_eq!(
275         ErrorKind::WouldBlock,
276         b.read(&mut [0]).err().unwrap().kind()
277     );
278 
279     // Drop closure behavior is delegated to the inner object
280     let (a, mut b) = socketpair();
281     let arc_fd = Arc::new(a);
282     let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
283     std::mem::drop(afd_a);
284 
285     assert_eq!(
286         ErrorKind::WouldBlock,
287         b.read(&mut [0]).err().unwrap().kind()
288     );
289 
290     std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
291 }
292 
293 #[tokio::test]
294 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
reregister()295 async fn reregister() {
296     let (a, _b) = socketpair();
297 
298     let afd_a = AsyncFd::new(a).unwrap();
299     let a = afd_a.into_inner();
300     AsyncFd::new(a).unwrap();
301 }
302 
303 #[tokio::test]
304 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
guard_try_io()305 async fn guard_try_io() {
306     let (a, mut b) = socketpair();
307 
308     b.write_all(b"0").unwrap();
309 
310     let afd_a = AsyncFd::new(a).unwrap();
311 
312     let mut guard = afd_a.readable().await.unwrap();
313 
314     afd_a.get_ref().read_exact(&mut [0]).unwrap();
315 
316     // Should not clear the readable state
317     let _ = guard.try_io(|_| Ok(()));
318 
319     // Still readable...
320     let _ = afd_a.readable().await.unwrap();
321 
322     // Should clear the readable state
323     let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
324 
325     // Assert not readable
326     let readable = afd_a.readable();
327     tokio::pin!(readable);
328 
329     tokio::select! {
330         _ = readable.as_mut() => panic!(),
331         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
332     }
333 
334     // Write something down b again and make sure we're reawoken
335     b.write_all(b"0").unwrap();
336     let _ = readable.await.unwrap();
337 }
338 
339 #[tokio::test]
340 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
try_io_readable()341 async fn try_io_readable() {
342     let (a, mut b) = socketpair();
343     let mut afd_a = AsyncFd::new(a).unwrap();
344 
345     // Give the runtime some time to update bookkeeping.
346     tokio::task::yield_now().await;
347 
348     {
349         let mut called = false;
350         let _ = afd_a.try_io_mut(Interest::READABLE, |_| {
351             called = true;
352             Ok(())
353         });
354         assert!(
355             !called,
356             "closure should not have been called, since socket should not be readable"
357         );
358     }
359 
360     // Make `a` readable by writing to `b`.
361     // Give the runtime some time to update bookkeeping.
362     b.write_all(&[0]).unwrap();
363     tokio::task::yield_now().await;
364 
365     {
366         let mut called = false;
367         let _ = afd_a.try_io(Interest::READABLE, |_| {
368             called = true;
369             Ok(())
370         });
371         assert!(
372             called,
373             "closure should have been called, since socket should have data available to read"
374         );
375     }
376 
377     {
378         let mut called = false;
379         let _ = afd_a.try_io(Interest::READABLE, |_| {
380             called = true;
381             io::Result::<()>::Err(ErrorKind::WouldBlock.into())
382         });
383         assert!(
384             called,
385             "closure should have been called, since socket should have data available to read"
386         );
387     }
388 
389     {
390         let mut called = false;
391         let _ = afd_a.try_io(Interest::READABLE, |_| {
392             called = true;
393             Ok(())
394         });
395         assert!(!called, "closure should not have been called, since socket readable state should have been cleared");
396     }
397 }
398 
399 #[tokio::test]
400 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
try_io_writable()401 async fn try_io_writable() {
402     let (a, _b) = socketpair();
403     let afd_a = AsyncFd::new(a).unwrap();
404 
405     // Give the runtime some time to update bookkeeping.
406     tokio::task::yield_now().await;
407 
408     {
409         let mut called = false;
410         let _ = afd_a.try_io(Interest::WRITABLE, |_| {
411             called = true;
412             Ok(())
413         });
414         assert!(
415             called,
416             "closure should have been called, since socket should still be marked as writable"
417         );
418     }
419     {
420         let mut called = false;
421         let _ = afd_a.try_io(Interest::WRITABLE, |_| {
422             called = true;
423             io::Result::<()>::Err(ErrorKind::WouldBlock.into())
424         });
425         assert!(
426             called,
427             "closure should have been called, since socket should still be marked as writable"
428         );
429     }
430 
431     {
432         let mut called = false;
433         let _ = afd_a.try_io(Interest::WRITABLE, |_| {
434             called = true;
435             Ok(())
436         });
437         assert!(!called, "closure should not have been called, since socket writable state should have been cleared");
438     }
439 }
440 
441 #[tokio::test]
442 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
multiple_waiters()443 async fn multiple_waiters() {
444     let (a, mut b) = socketpair();
445     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
446 
447     let barrier = Arc::new(tokio::sync::Barrier::new(11));
448 
449     let mut tasks = Vec::new();
450     for _ in 0..10 {
451         let afd_a = afd_a.clone();
452         let barrier = barrier.clone();
453 
454         let f = async move {
455             let notify_barrier = async {
456                 barrier.wait().await;
457                 futures::future::pending::<()>().await;
458             };
459 
460             tokio::select! {
461                 biased;
462                 guard = afd_a.readable() => {
463                     tokio::task::yield_now().await;
464                     guard.unwrap().clear_ready()
465                 },
466                 _ = notify_barrier => unreachable!(),
467             }
468 
469             std::mem::drop(afd_a);
470         };
471 
472         tasks.push(tokio::spawn(f));
473     }
474 
475     let mut all_tasks = futures::future::try_join_all(tasks);
476 
477     tokio::select! {
478         r = std::pin::Pin::new(&mut all_tasks) => {
479             r.unwrap(); // propagate panic
480             panic!("Tasks exited unexpectedly")
481         },
482         _ = barrier.wait() => {}
483     }
484 
485     b.write_all(b"0").unwrap();
486 
487     all_tasks.await.unwrap();
488 }
489 
490 #[tokio::test]
491 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
poll_fns()492 async fn poll_fns() {
493     let (a, b) = socketpair();
494     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
495     let afd_b = Arc::new(AsyncFd::new(b).unwrap());
496 
497     // Fill up the write side of A
498     let mut bytes = 0;
499     while let Ok(amt) = afd_a.get_ref().write(&[0; 512]) {
500         bytes += amt;
501     }
502 
503     let waker = TestWaker::new();
504 
505     assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
506 
507     let afd_a_2 = afd_a.clone();
508     let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
509     let barrier_clone = r_barrier.clone();
510 
511     let read_fut = tokio::spawn(async move {
512         // Move waker onto this task first
513         assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2
514             .as_ref()
515             .poll_read_ready(cx))));
516         barrier_clone.wait().await;
517 
518         let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
519     });
520 
521     let afd_a_2 = afd_a.clone();
522     let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
523     let barrier_clone = w_barrier.clone();
524 
525     let mut write_fut = tokio::spawn(async move {
526         // Move waker onto this task first
527         assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2
528             .as_ref()
529             .poll_write_ready(cx))));
530         barrier_clone.wait().await;
531 
532         let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
533     });
534 
535     r_barrier.wait().await;
536     w_barrier.wait().await;
537 
538     let readable = afd_a.readable();
539     tokio::pin!(readable);
540 
541     tokio::select! {
542         _ = &mut readable => unreachable!(),
543         _ = tokio::task::yield_now() => {}
544     }
545 
546     // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
547     afd_b.get_ref().write_all(b"0").unwrap();
548 
549     let _ = tokio::join!(readable, read_fut);
550 
551     // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
552     assert!(!waker.awoken());
553 
554     // The writable side should not be awoken
555     tokio::select! {
556         _ = &mut write_fut => unreachable!(),
557         _ = tokio::time::sleep(Duration::from_millis(5)) => {}
558     }
559 
560     // Make it writable now
561     drain(afd_b.get_ref(), bytes);
562 
563     // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
564     let _ = write_fut.await;
565 }
566 
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>567 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
568     let mut pinned = Box::pin(f);
569 
570     assert_pending!(pinned
571         .as_mut()
572         .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
573 
574     pinned
575 }
576 
rt() -> tokio::runtime::Runtime577 fn rt() -> tokio::runtime::Runtime {
578     tokio::runtime::Builder::new_current_thread()
579         .enable_all()
580         .build()
581         .unwrap()
582 }
583 
584 #[test]
585 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_currently_pending()586 fn driver_shutdown_wakes_currently_pending() {
587     let rt = rt();
588 
589     let (a, _b) = socketpair();
590     let afd_a = {
591         let _enter = rt.enter();
592         AsyncFd::new(a).unwrap()
593     };
594 
595     let readable = assert_pending(afd_a.readable());
596 
597     std::mem::drop(rt);
598 
599     // The future was initialized **before** dropping the rt
600     assert_err!(futures::executor::block_on(readable));
601 
602     // The future is initialized **after** dropping the rt.
603     assert_err!(futures::executor::block_on(afd_a.readable()));
604 }
605 
606 #[test]
607 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_future_pending()608 fn driver_shutdown_wakes_future_pending() {
609     let rt = rt();
610 
611     let (a, _b) = socketpair();
612     let afd_a = {
613         let _enter = rt.enter();
614         AsyncFd::new(a).unwrap()
615     };
616 
617     std::mem::drop(rt);
618 
619     assert_err!(futures::executor::block_on(afd_a.readable()));
620 }
621 
622 #[test]
623 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_pending_race()624 fn driver_shutdown_wakes_pending_race() {
625     // TODO: make this a loom test
626     for _ in 0..100 {
627         let rt = rt();
628 
629         let (a, _b) = socketpair();
630         let afd_a = {
631             let _enter = rt.enter();
632             AsyncFd::new(a).unwrap()
633         };
634 
635         let _ = std::thread::spawn(move || std::mem::drop(rt));
636 
637         // This may or may not return an error (but will be awoken)
638         let _ = futures::executor::block_on(afd_a.readable());
639 
640         // However retrying will always return an error
641         assert_err!(futures::executor::block_on(afd_a.readable()));
642     }
643 }
644 
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>645 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
646     std::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
647 }
648 
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>649 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
650     std::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
651 }
652 
653 #[test]
654 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_currently_pending_polls()655 fn driver_shutdown_wakes_currently_pending_polls() {
656     let rt = rt();
657 
658     let (a, _b) = socketpair();
659     let afd_a = {
660         let _enter = rt.enter();
661         AsyncFd::new(a).unwrap()
662     };
663 
664     while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
665 
666     let readable = assert_pending(poll_readable(&afd_a));
667     let writable = assert_pending(poll_writable(&afd_a));
668 
669     std::mem::drop(rt);
670 
671     // Attempting to poll readiness when the rt is dropped is an error
672     assert_err!(futures::executor::block_on(readable));
673     assert_err!(futures::executor::block_on(writable));
674 }
675 
676 #[test]
677 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_poll()678 fn driver_shutdown_wakes_poll() {
679     let rt = rt();
680 
681     let (a, _b) = socketpair();
682     let afd_a = {
683         let _enter = rt.enter();
684         AsyncFd::new(a).unwrap()
685     };
686 
687     std::mem::drop(rt);
688 
689     assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
690     assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
691 }
692 
693 #[test]
694 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_then_clear_readiness()695 fn driver_shutdown_then_clear_readiness() {
696     let rt = rt();
697 
698     let (a, _b) = socketpair();
699     let afd_a = {
700         let _enter = rt.enter();
701         AsyncFd::new(a).unwrap()
702     };
703 
704     let mut write_ready = rt.block_on(afd_a.writable()).unwrap();
705 
706     std::mem::drop(rt);
707 
708     write_ready.clear_ready();
709 }
710 
711 #[test]
712 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_poll_race()713 fn driver_shutdown_wakes_poll_race() {
714     // TODO: make this a loom test
715     for _ in 0..100 {
716         let rt = rt();
717 
718         let (a, _b) = socketpair();
719         let afd_a = {
720             let _enter = rt.enter();
721             AsyncFd::new(a).unwrap()
722         };
723 
724         while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
725 
726         let _ = std::thread::spawn(move || std::mem::drop(rt));
727 
728         // The poll variants will always return an error in this case
729         assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
730         assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
731     }
732 }
733 
734 #[tokio::test]
735 #[cfg_attr(miri, ignore)] // No socket in miri.
736 #[cfg(any(target_os = "linux", target_os = "android"))]
priority_event_on_oob_data()737 async fn priority_event_on_oob_data() {
738     use std::net::SocketAddr;
739 
740     use tokio::io::Interest;
741 
742     let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
743 
744     let listener = std::net::TcpListener::bind(addr).unwrap();
745     let addr = listener.local_addr().unwrap();
746 
747     let client = std::net::TcpStream::connect(addr).unwrap();
748     let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap();
749 
750     let (stream, _) = listener.accept().unwrap();
751 
752     // Sending out of band data should trigger priority event.
753     send_oob_data(&stream, b"hello").unwrap();
754 
755     let _ = client.ready(Interest::PRIORITY).await.unwrap();
756 }
757 
758 #[cfg(any(target_os = "linux", target_os = "android"))]
send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize>759 fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
760     unsafe {
761         let res = libc::send(
762             stream.as_raw_fd(),
763             data.as_ptr().cast(),
764             data.len(),
765             libc::MSG_OOB,
766         );
767         if res == -1 {
768             Err(io::Error::last_os_error())
769         } else {
770             Ok(res as usize)
771         }
772     }
773 }
774 
775 #[tokio::test]
776 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
clear_ready_matching_clears_ready()777 async fn clear_ready_matching_clears_ready() {
778     use tokio::io::{Interest, Ready};
779 
780     let (a, mut b) = socketpair();
781 
782     let afd_a = AsyncFd::new(a).unwrap();
783     b.write_all(b"0").unwrap();
784 
785     let mut guard = afd_a
786         .ready(Interest::READABLE | Interest::WRITABLE)
787         .await
788         .unwrap();
789 
790     assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
791 
792     guard.clear_ready_matching(Ready::READABLE);
793     assert_eq!(guard.ready(), Ready::WRITABLE);
794 
795     guard.clear_ready_matching(Ready::WRITABLE);
796     assert_eq!(guard.ready(), Ready::EMPTY);
797 }
798 
799 #[tokio::test]
800 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
clear_ready_matching_clears_ready_mut()801 async fn clear_ready_matching_clears_ready_mut() {
802     use tokio::io::{Interest, Ready};
803 
804     let (a, mut b) = socketpair();
805 
806     let mut afd_a = AsyncFd::new(a).unwrap();
807     b.write_all(b"0").unwrap();
808 
809     let mut guard = afd_a
810         .ready_mut(Interest::READABLE | Interest::WRITABLE)
811         .await
812         .unwrap();
813 
814     assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
815 
816     guard.clear_ready_matching(Ready::READABLE);
817     assert_eq!(guard.ready(), Ready::WRITABLE);
818 
819     guard.clear_ready_matching(Ready::WRITABLE);
820     assert_eq!(guard.ready(), Ready::EMPTY);
821 }
822 
823 #[tokio::test]
824 #[cfg_attr(miri, ignore)] // No socket in miri.
825 #[cfg(target_os = "linux")]
await_error_readiness_timestamping()826 async fn await_error_readiness_timestamping() {
827     use std::net::{Ipv4Addr, SocketAddr};
828 
829     use tokio::io::{Interest, Ready};
830 
831     let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
832     let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
833 
834     let socket = std::net::UdpSocket::bind(address_a).unwrap();
835 
836     socket.set_nonblocking(true).unwrap();
837 
838     // configure send timestamps
839     configure_timestamping_socket(&socket).unwrap();
840 
841     socket.connect(address_b).unwrap();
842 
843     let fd = AsyncFd::new(socket).unwrap();
844 
845     tokio::select! {
846         _ = fd.ready(Interest::ERROR) => panic!(),
847         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
848     }
849 
850     let buf = b"hello there";
851     fd.get_ref().send(buf).unwrap();
852 
853     // the send timestamp should now be in the error queue
854     let guard = fd.ready(Interest::ERROR).await.unwrap();
855     assert_eq!(guard.ready(), Ready::ERROR);
856 }
857 
858 #[cfg(target_os = "linux")]
configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int>859 fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> {
860     // enable software timestamping, and specifically software send timestamping
861     let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE;
862 
863     let res = unsafe {
864         libc::setsockopt(
865             udp_socket.as_raw_fd(),
866             libc::SOL_SOCKET,
867             libc::SO_TIMESTAMP,
868             &options as *const _ as *const libc::c_void,
869             std::mem::size_of_val(&options) as libc::socklen_t,
870         )
871     };
872 
873     if res == -1 {
874         Err(std::io::Error::last_os_error())
875     } else {
876         Ok(res)
877     }
878 }
879 
880 #[tokio::test]
881 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
882 #[cfg(target_os = "linux")]
await_error_readiness_invalid_address()883 async fn await_error_readiness_invalid_address() {
884     use std::net::{Ipv4Addr, SocketAddr};
885     use tokio::io::{Interest, Ready};
886 
887     let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
888     let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
889     let socket_fd = socket.as_raw_fd();
890 
891     // Enable IP_RECVERR option to receive error messages
892     // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information
893     let recv_err: libc::c_int = 1;
894     unsafe {
895         let res = libc::setsockopt(
896             socket.as_raw_fd(),
897             libc::SOL_IP,
898             libc::IP_RECVERR,
899             &recv_err as *const _ as *const libc::c_void,
900             std::mem::size_of_val(&recv_err) as libc::socklen_t,
901         );
902         if res == -1 {
903             panic!("{:?}", std::io::Error::last_os_error());
904         }
905     }
906 
907     // Spawn a separate thread for sending messages
908     tokio::spawn(async move {
909         // Set the destination address. This address is invalid in this context. the OS will notice
910         // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"),
911         // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket
912         let mut dest_addr =
913             unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() };
914         dest_addr.sin_family = libc::AF_INET as _;
915         // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number
916         // below 1024 to guarantee that other tests don't select this port by accident when they
917         // use port 0 to select an ephemeral port.
918         dest_addr.sin_port = 512u16.to_be(); // Destination port
919 
920         // Prepare the message data
921         let message = "Hello, Socket!";
922 
923         // Prepare the message structure for sendmsg
924         let mut iov = libc::iovec {
925             iov_base: message.as_ptr() as *mut libc::c_void,
926             iov_len: message.len(),
927         };
928 
929         // Prepare the destination address for the sendmsg call
930         let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr;
931         let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t;
932 
933         let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
934         msg.msg_name = dest_sockaddr as *mut libc::c_void;
935         msg.msg_namelen = dest_addrlen;
936         msg.msg_iov = &mut iov;
937         msg.msg_iovlen = 1;
938 
939         if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 {
940             panic!("{:?}", std::io::Error::last_os_error())
941         }
942     });
943 
944     let fd = AsyncFd::new(socket).unwrap();
945 
946     let guard = fd.ready(Interest::ERROR).await.unwrap();
947     assert_eq!(guard.ready(), Ready::ERROR);
948 }
949 
950 #[derive(Debug, PartialEq, Eq)]
951 struct InvalidSource;
952 
953 impl AsRawFd for InvalidSource {
as_raw_fd(&self) -> RawFd954     fn as_raw_fd(&self) -> RawFd {
955         -1
956     }
957 }
958 
959 #[tokio::test]
try_new()960 async fn try_new() {
961     let original = Arc::new(InvalidSource);
962 
963     let error = AsyncFd::try_new(original.clone()).unwrap_err();
964     let (returned, _cause) = error.into_parts();
965 
966     assert!(Arc::ptr_eq(&original, &returned));
967 }
968 
969 #[tokio::test]
try_with_interest()970 async fn try_with_interest() {
971     let original = Arc::new(InvalidSource);
972 
973     let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err();
974     let (returned, _cause) = error.into_parts();
975 
976     assert!(Arc::ptr_eq(&original, &returned));
977 }
978