1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 };
9 use std::time::Duration;
10 use std::{
11 future::Future,
12 io::{self, ErrorKind, Read, Write},
13 task::{Context, Waker},
14 };
15
16 use nix::unistd::{read, write};
17
18 use futures::poll;
19
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio::io::Interest;
22 use tokio_test::{assert_err, assert_pending};
23
24 struct TestWaker {
25 inner: Arc<TestWakerInner>,
26 waker: Waker,
27 }
28
29 #[derive(Default)]
30 struct TestWakerInner {
31 awoken: AtomicBool,
32 }
33
34 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)35 fn wake_by_ref(arc_self: &Arc<Self>) {
36 arc_self.awoken.store(true, Ordering::SeqCst);
37 }
38 }
39
40 impl TestWaker {
new() -> Self41 fn new() -> Self {
42 let inner: Arc<TestWakerInner> = Default::default();
43
44 Self {
45 inner: inner.clone(),
46 waker: futures::task::waker(inner),
47 }
48 }
49
awoken(&self) -> bool50 fn awoken(&self) -> bool {
51 self.inner.awoken.swap(false, Ordering::SeqCst)
52 }
53
context(&self) -> Context<'_>54 fn context(&self) -> Context<'_> {
55 Context::from_waker(&self.waker)
56 }
57 }
58
59 #[derive(Debug)]
60 struct FileDescriptor {
61 fd: std::os::fd::OwnedFd,
62 }
63
64 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd65 fn as_raw_fd(&self) -> RawFd {
66 self.fd.as_raw_fd()
67 }
68 }
69
70 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>71 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
72 read(self.fd.as_raw_fd(), buf).map_err(io::Error::from)
73 }
74 }
75
76 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>77 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
78 (self as &Self).read(buf)
79 }
80 }
81
82 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>83 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
84 write(&self.fd, buf).map_err(io::Error::from)
85 }
86
flush(&mut self) -> io::Result<()>87 fn flush(&mut self) -> io::Result<()> {
88 Ok(())
89 }
90 }
91
92 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>93 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
94 (self as &Self).write(buf)
95 }
96
flush(&mut self) -> io::Result<()>97 fn flush(&mut self) -> io::Result<()> {
98 (self as &Self).flush()
99 }
100 }
101
set_nonblocking(fd: RawFd)102 fn set_nonblocking(fd: RawFd) {
103 use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
104
105 let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
106
107 if flags < 0 {
108 panic!(
109 "bad return value from fcntl(F_GETFL): {} ({:?})",
110 flags,
111 nix::Error::last()
112 );
113 }
114
115 let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
116
117 nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
118 }
119
socketpair() -> (FileDescriptor, FileDescriptor)120 fn socketpair() -> (FileDescriptor, FileDescriptor) {
121 use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
122
123 let (fd_a, fd_b) = socket::socketpair(
124 AddressFamily::Unix,
125 SockType::Stream,
126 None,
127 SockFlag::empty(),
128 )
129 .expect("socketpair");
130 let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
131
132 set_nonblocking(fds.0.fd.as_raw_fd());
133 set_nonblocking(fds.1.fd.as_raw_fd());
134
135 fds
136 }
137
drain(mut fd: &FileDescriptor, mut amt: usize)138 fn drain(mut fd: &FileDescriptor, mut amt: usize) {
139 let mut buf = [0u8; 512];
140 while amt > 0 {
141 match fd.read(&mut buf[..]) {
142 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
143 Ok(0) => panic!("unexpected EOF"),
144 Err(e) => panic!("unexpected error: {e:?}"),
145 Ok(x) => amt -= x,
146 }
147 }
148 }
149
150 #[tokio::test]
151 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
initially_writable()152 async fn initially_writable() {
153 let (a, b) = socketpair();
154
155 let afd_a = AsyncFd::new(a).unwrap();
156 let afd_b = AsyncFd::new(b).unwrap();
157
158 afd_a.writable().await.unwrap().clear_ready();
159 afd_b.writable().await.unwrap().clear_ready();
160
161 tokio::select! {
162 biased;
163 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
164 _ = afd_a.readable() => panic!("Unexpected readable state"),
165 _ = afd_b.readable() => panic!("Unexpected readable state"),
166 }
167 }
168
169 #[tokio::test]
170 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
reset_readable()171 async fn reset_readable() {
172 let (a, mut b) = socketpair();
173
174 let afd_a = AsyncFd::new(a).unwrap();
175
176 let readable = afd_a.readable();
177 tokio::pin!(readable);
178
179 tokio::select! {
180 _ = readable.as_mut() => panic!(),
181 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
182 }
183
184 b.write_all(b"0").unwrap();
185
186 let mut guard = readable.await.unwrap();
187
188 guard
189 .try_io(|_| afd_a.get_ref().read(&mut [0]))
190 .unwrap()
191 .unwrap();
192
193 // `a` is not readable, but the reactor still thinks it is
194 // (because we have not observed a not-ready error yet)
195 afd_a.readable().await.unwrap().retain_ready();
196
197 // Explicitly clear the ready state
198 guard.clear_ready();
199
200 let readable = afd_a.readable();
201 tokio::pin!(readable);
202
203 tokio::select! {
204 _ = readable.as_mut() => panic!(),
205 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
206 }
207
208 b.write_all(b"0").unwrap();
209
210 // We can observe the new readable event
211 afd_a.readable().await.unwrap().clear_ready();
212 }
213
214 #[tokio::test]
215 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
reset_writable()216 async fn reset_writable() {
217 let (a, b) = socketpair();
218
219 let afd_a = AsyncFd::new(a).unwrap();
220
221 let mut guard = afd_a.writable().await.unwrap();
222
223 // Write until we get a WouldBlock. This also clears the ready state.
224 let mut bytes = 0;
225 while let Ok(Ok(amt)) = guard.try_io(|_| afd_a.get_ref().write(&[0; 512][..])) {
226 bytes += amt;
227 }
228
229 // Writable state should be cleared now.
230 let writable = afd_a.writable();
231 tokio::pin!(writable);
232
233 tokio::select! {
234 _ = writable.as_mut() => panic!(),
235 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
236 }
237
238 // Read from the other side; we should become writable now.
239 drain(&b, bytes);
240
241 let _ = writable.await.unwrap();
242 }
243
244 #[derive(Debug)]
245 struct ArcFd<T>(Arc<T>);
246 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd247 fn as_raw_fd(&self) -> RawFd {
248 self.0.as_raw_fd()
249 }
250 }
251
252 #[tokio::test]
253 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
drop_closes()254 async fn drop_closes() {
255 let (a, mut b) = socketpair();
256
257 let afd_a = AsyncFd::new(a).unwrap();
258
259 assert_eq!(
260 ErrorKind::WouldBlock,
261 b.read(&mut [0]).err().unwrap().kind()
262 );
263
264 std::mem::drop(afd_a);
265
266 assert_eq!(0, b.read(&mut [0]).unwrap());
267
268 // into_inner does not close the fd
269
270 let (a, mut b) = socketpair();
271 let afd_a = AsyncFd::new(a).unwrap();
272 let _a: FileDescriptor = afd_a.into_inner();
273
274 assert_eq!(
275 ErrorKind::WouldBlock,
276 b.read(&mut [0]).err().unwrap().kind()
277 );
278
279 // Drop closure behavior is delegated to the inner object
280 let (a, mut b) = socketpair();
281 let arc_fd = Arc::new(a);
282 let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
283 std::mem::drop(afd_a);
284
285 assert_eq!(
286 ErrorKind::WouldBlock,
287 b.read(&mut [0]).err().unwrap().kind()
288 );
289
290 std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
291 }
292
293 #[tokio::test]
294 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
reregister()295 async fn reregister() {
296 let (a, _b) = socketpair();
297
298 let afd_a = AsyncFd::new(a).unwrap();
299 let a = afd_a.into_inner();
300 AsyncFd::new(a).unwrap();
301 }
302
303 #[tokio::test]
304 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
guard_try_io()305 async fn guard_try_io() {
306 let (a, mut b) = socketpair();
307
308 b.write_all(b"0").unwrap();
309
310 let afd_a = AsyncFd::new(a).unwrap();
311
312 let mut guard = afd_a.readable().await.unwrap();
313
314 afd_a.get_ref().read_exact(&mut [0]).unwrap();
315
316 // Should not clear the readable state
317 let _ = guard.try_io(|_| Ok(()));
318
319 // Still readable...
320 let _ = afd_a.readable().await.unwrap();
321
322 // Should clear the readable state
323 let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
324
325 // Assert not readable
326 let readable = afd_a.readable();
327 tokio::pin!(readable);
328
329 tokio::select! {
330 _ = readable.as_mut() => panic!(),
331 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
332 }
333
334 // Write something down b again and make sure we're reawoken
335 b.write_all(b"0").unwrap();
336 let _ = readable.await.unwrap();
337 }
338
339 #[tokio::test]
340 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
try_io_readable()341 async fn try_io_readable() {
342 let (a, mut b) = socketpair();
343 let mut afd_a = AsyncFd::new(a).unwrap();
344
345 // Give the runtime some time to update bookkeeping.
346 tokio::task::yield_now().await;
347
348 {
349 let mut called = false;
350 let _ = afd_a.try_io_mut(Interest::READABLE, |_| {
351 called = true;
352 Ok(())
353 });
354 assert!(
355 !called,
356 "closure should not have been called, since socket should not be readable"
357 );
358 }
359
360 // Make `a` readable by writing to `b`.
361 // Give the runtime some time to update bookkeeping.
362 b.write_all(&[0]).unwrap();
363 tokio::task::yield_now().await;
364
365 {
366 let mut called = false;
367 let _ = afd_a.try_io(Interest::READABLE, |_| {
368 called = true;
369 Ok(())
370 });
371 assert!(
372 called,
373 "closure should have been called, since socket should have data available to read"
374 );
375 }
376
377 {
378 let mut called = false;
379 let _ = afd_a.try_io(Interest::READABLE, |_| {
380 called = true;
381 io::Result::<()>::Err(ErrorKind::WouldBlock.into())
382 });
383 assert!(
384 called,
385 "closure should have been called, since socket should have data available to read"
386 );
387 }
388
389 {
390 let mut called = false;
391 let _ = afd_a.try_io(Interest::READABLE, |_| {
392 called = true;
393 Ok(())
394 });
395 assert!(!called, "closure should not have been called, since socket readable state should have been cleared");
396 }
397 }
398
399 #[tokio::test]
400 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
try_io_writable()401 async fn try_io_writable() {
402 let (a, _b) = socketpair();
403 let afd_a = AsyncFd::new(a).unwrap();
404
405 // Give the runtime some time to update bookkeeping.
406 tokio::task::yield_now().await;
407
408 {
409 let mut called = false;
410 let _ = afd_a.try_io(Interest::WRITABLE, |_| {
411 called = true;
412 Ok(())
413 });
414 assert!(
415 called,
416 "closure should have been called, since socket should still be marked as writable"
417 );
418 }
419 {
420 let mut called = false;
421 let _ = afd_a.try_io(Interest::WRITABLE, |_| {
422 called = true;
423 io::Result::<()>::Err(ErrorKind::WouldBlock.into())
424 });
425 assert!(
426 called,
427 "closure should have been called, since socket should still be marked as writable"
428 );
429 }
430
431 {
432 let mut called = false;
433 let _ = afd_a.try_io(Interest::WRITABLE, |_| {
434 called = true;
435 Ok(())
436 });
437 assert!(!called, "closure should not have been called, since socket writable state should have been cleared");
438 }
439 }
440
441 #[tokio::test]
442 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
multiple_waiters()443 async fn multiple_waiters() {
444 let (a, mut b) = socketpair();
445 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
446
447 let barrier = Arc::new(tokio::sync::Barrier::new(11));
448
449 let mut tasks = Vec::new();
450 for _ in 0..10 {
451 let afd_a = afd_a.clone();
452 let barrier = barrier.clone();
453
454 let f = async move {
455 let notify_barrier = async {
456 barrier.wait().await;
457 futures::future::pending::<()>().await;
458 };
459
460 tokio::select! {
461 biased;
462 guard = afd_a.readable() => {
463 tokio::task::yield_now().await;
464 guard.unwrap().clear_ready()
465 },
466 _ = notify_barrier => unreachable!(),
467 }
468
469 std::mem::drop(afd_a);
470 };
471
472 tasks.push(tokio::spawn(f));
473 }
474
475 let mut all_tasks = futures::future::try_join_all(tasks);
476
477 tokio::select! {
478 r = std::pin::Pin::new(&mut all_tasks) => {
479 r.unwrap(); // propagate panic
480 panic!("Tasks exited unexpectedly")
481 },
482 _ = barrier.wait() => {}
483 }
484
485 b.write_all(b"0").unwrap();
486
487 all_tasks.await.unwrap();
488 }
489
490 #[tokio::test]
491 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
poll_fns()492 async fn poll_fns() {
493 let (a, b) = socketpair();
494 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
495 let afd_b = Arc::new(AsyncFd::new(b).unwrap());
496
497 // Fill up the write side of A
498 let mut bytes = 0;
499 while let Ok(amt) = afd_a.get_ref().write(&[0; 512]) {
500 bytes += amt;
501 }
502
503 let waker = TestWaker::new();
504
505 assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
506
507 let afd_a_2 = afd_a.clone();
508 let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
509 let barrier_clone = r_barrier.clone();
510
511 let read_fut = tokio::spawn(async move {
512 // Move waker onto this task first
513 assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2
514 .as_ref()
515 .poll_read_ready(cx))));
516 barrier_clone.wait().await;
517
518 let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
519 });
520
521 let afd_a_2 = afd_a.clone();
522 let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
523 let barrier_clone = w_barrier.clone();
524
525 let mut write_fut = tokio::spawn(async move {
526 // Move waker onto this task first
527 assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2
528 .as_ref()
529 .poll_write_ready(cx))));
530 barrier_clone.wait().await;
531
532 let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
533 });
534
535 r_barrier.wait().await;
536 w_barrier.wait().await;
537
538 let readable = afd_a.readable();
539 tokio::pin!(readable);
540
541 tokio::select! {
542 _ = &mut readable => unreachable!(),
543 _ = tokio::task::yield_now() => {}
544 }
545
546 // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
547 afd_b.get_ref().write_all(b"0").unwrap();
548
549 let _ = tokio::join!(readable, read_fut);
550
551 // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
552 assert!(!waker.awoken());
553
554 // The writable side should not be awoken
555 tokio::select! {
556 _ = &mut write_fut => unreachable!(),
557 _ = tokio::time::sleep(Duration::from_millis(5)) => {}
558 }
559
560 // Make it writable now
561 drain(afd_b.get_ref(), bytes);
562
563 // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
564 let _ = write_fut.await;
565 }
566
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>567 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
568 let mut pinned = Box::pin(f);
569
570 assert_pending!(pinned
571 .as_mut()
572 .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
573
574 pinned
575 }
576
rt() -> tokio::runtime::Runtime577 fn rt() -> tokio::runtime::Runtime {
578 tokio::runtime::Builder::new_current_thread()
579 .enable_all()
580 .build()
581 .unwrap()
582 }
583
584 #[test]
585 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_currently_pending()586 fn driver_shutdown_wakes_currently_pending() {
587 let rt = rt();
588
589 let (a, _b) = socketpair();
590 let afd_a = {
591 let _enter = rt.enter();
592 AsyncFd::new(a).unwrap()
593 };
594
595 let readable = assert_pending(afd_a.readable());
596
597 std::mem::drop(rt);
598
599 // The future was initialized **before** dropping the rt
600 assert_err!(futures::executor::block_on(readable));
601
602 // The future is initialized **after** dropping the rt.
603 assert_err!(futures::executor::block_on(afd_a.readable()));
604 }
605
606 #[test]
607 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_future_pending()608 fn driver_shutdown_wakes_future_pending() {
609 let rt = rt();
610
611 let (a, _b) = socketpair();
612 let afd_a = {
613 let _enter = rt.enter();
614 AsyncFd::new(a).unwrap()
615 };
616
617 std::mem::drop(rt);
618
619 assert_err!(futures::executor::block_on(afd_a.readable()));
620 }
621
622 #[test]
623 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_pending_race()624 fn driver_shutdown_wakes_pending_race() {
625 // TODO: make this a loom test
626 for _ in 0..100 {
627 let rt = rt();
628
629 let (a, _b) = socketpair();
630 let afd_a = {
631 let _enter = rt.enter();
632 AsyncFd::new(a).unwrap()
633 };
634
635 let _ = std::thread::spawn(move || std::mem::drop(rt));
636
637 // This may or may not return an error (but will be awoken)
638 let _ = futures::executor::block_on(afd_a.readable());
639
640 // However retrying will always return an error
641 assert_err!(futures::executor::block_on(afd_a.readable()));
642 }
643 }
644
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>645 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
646 std::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
647 }
648
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>649 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
650 std::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
651 }
652
653 #[test]
654 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_currently_pending_polls()655 fn driver_shutdown_wakes_currently_pending_polls() {
656 let rt = rt();
657
658 let (a, _b) = socketpair();
659 let afd_a = {
660 let _enter = rt.enter();
661 AsyncFd::new(a).unwrap()
662 };
663
664 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
665
666 let readable = assert_pending(poll_readable(&afd_a));
667 let writable = assert_pending(poll_writable(&afd_a));
668
669 std::mem::drop(rt);
670
671 // Attempting to poll readiness when the rt is dropped is an error
672 assert_err!(futures::executor::block_on(readable));
673 assert_err!(futures::executor::block_on(writable));
674 }
675
676 #[test]
677 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_poll()678 fn driver_shutdown_wakes_poll() {
679 let rt = rt();
680
681 let (a, _b) = socketpair();
682 let afd_a = {
683 let _enter = rt.enter();
684 AsyncFd::new(a).unwrap()
685 };
686
687 std::mem::drop(rt);
688
689 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
690 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
691 }
692
693 #[test]
694 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_then_clear_readiness()695 fn driver_shutdown_then_clear_readiness() {
696 let rt = rt();
697
698 let (a, _b) = socketpair();
699 let afd_a = {
700 let _enter = rt.enter();
701 AsyncFd::new(a).unwrap()
702 };
703
704 let mut write_ready = rt.block_on(afd_a.writable()).unwrap();
705
706 std::mem::drop(rt);
707
708 write_ready.clear_ready();
709 }
710
711 #[test]
712 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
driver_shutdown_wakes_poll_race()713 fn driver_shutdown_wakes_poll_race() {
714 // TODO: make this a loom test
715 for _ in 0..100 {
716 let rt = rt();
717
718 let (a, _b) = socketpair();
719 let afd_a = {
720 let _enter = rt.enter();
721 AsyncFd::new(a).unwrap()
722 };
723
724 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
725
726 let _ = std::thread::spawn(move || std::mem::drop(rt));
727
728 // The poll variants will always return an error in this case
729 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
730 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
731 }
732 }
733
734 #[tokio::test]
735 #[cfg_attr(miri, ignore)] // No socket in miri.
736 #[cfg(any(target_os = "linux", target_os = "android"))]
priority_event_on_oob_data()737 async fn priority_event_on_oob_data() {
738 use std::net::SocketAddr;
739
740 use tokio::io::Interest;
741
742 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
743
744 let listener = std::net::TcpListener::bind(addr).unwrap();
745 let addr = listener.local_addr().unwrap();
746
747 let client = std::net::TcpStream::connect(addr).unwrap();
748 let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap();
749
750 let (stream, _) = listener.accept().unwrap();
751
752 // Sending out of band data should trigger priority event.
753 send_oob_data(&stream, b"hello").unwrap();
754
755 let _ = client.ready(Interest::PRIORITY).await.unwrap();
756 }
757
758 #[cfg(any(target_os = "linux", target_os = "android"))]
send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize>759 fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
760 unsafe {
761 let res = libc::send(
762 stream.as_raw_fd(),
763 data.as_ptr().cast(),
764 data.len(),
765 libc::MSG_OOB,
766 );
767 if res == -1 {
768 Err(io::Error::last_os_error())
769 } else {
770 Ok(res as usize)
771 }
772 }
773 }
774
775 #[tokio::test]
776 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
clear_ready_matching_clears_ready()777 async fn clear_ready_matching_clears_ready() {
778 use tokio::io::{Interest, Ready};
779
780 let (a, mut b) = socketpair();
781
782 let afd_a = AsyncFd::new(a).unwrap();
783 b.write_all(b"0").unwrap();
784
785 let mut guard = afd_a
786 .ready(Interest::READABLE | Interest::WRITABLE)
787 .await
788 .unwrap();
789
790 assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
791
792 guard.clear_ready_matching(Ready::READABLE);
793 assert_eq!(guard.ready(), Ready::WRITABLE);
794
795 guard.clear_ready_matching(Ready::WRITABLE);
796 assert_eq!(guard.ready(), Ready::EMPTY);
797 }
798
799 #[tokio::test]
800 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
clear_ready_matching_clears_ready_mut()801 async fn clear_ready_matching_clears_ready_mut() {
802 use tokio::io::{Interest, Ready};
803
804 let (a, mut b) = socketpair();
805
806 let mut afd_a = AsyncFd::new(a).unwrap();
807 b.write_all(b"0").unwrap();
808
809 let mut guard = afd_a
810 .ready_mut(Interest::READABLE | Interest::WRITABLE)
811 .await
812 .unwrap();
813
814 assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
815
816 guard.clear_ready_matching(Ready::READABLE);
817 assert_eq!(guard.ready(), Ready::WRITABLE);
818
819 guard.clear_ready_matching(Ready::WRITABLE);
820 assert_eq!(guard.ready(), Ready::EMPTY);
821 }
822
823 #[tokio::test]
824 #[cfg_attr(miri, ignore)] // No socket in miri.
825 #[cfg(target_os = "linux")]
await_error_readiness_timestamping()826 async fn await_error_readiness_timestamping() {
827 use std::net::{Ipv4Addr, SocketAddr};
828
829 use tokio::io::{Interest, Ready};
830
831 let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
832 let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
833
834 let socket = std::net::UdpSocket::bind(address_a).unwrap();
835
836 socket.set_nonblocking(true).unwrap();
837
838 // configure send timestamps
839 configure_timestamping_socket(&socket).unwrap();
840
841 socket.connect(address_b).unwrap();
842
843 let fd = AsyncFd::new(socket).unwrap();
844
845 tokio::select! {
846 _ = fd.ready(Interest::ERROR) => panic!(),
847 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
848 }
849
850 let buf = b"hello there";
851 fd.get_ref().send(buf).unwrap();
852
853 // the send timestamp should now be in the error queue
854 let guard = fd.ready(Interest::ERROR).await.unwrap();
855 assert_eq!(guard.ready(), Ready::ERROR);
856 }
857
858 #[cfg(target_os = "linux")]
configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int>859 fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> {
860 // enable software timestamping, and specifically software send timestamping
861 let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE;
862
863 let res = unsafe {
864 libc::setsockopt(
865 udp_socket.as_raw_fd(),
866 libc::SOL_SOCKET,
867 libc::SO_TIMESTAMP,
868 &options as *const _ as *const libc::c_void,
869 std::mem::size_of_val(&options) as libc::socklen_t,
870 )
871 };
872
873 if res == -1 {
874 Err(std::io::Error::last_os_error())
875 } else {
876 Ok(res)
877 }
878 }
879
880 #[tokio::test]
881 #[cfg_attr(miri, ignore)] // No F_GETFL for fcntl in miri.
882 #[cfg(target_os = "linux")]
await_error_readiness_invalid_address()883 async fn await_error_readiness_invalid_address() {
884 use std::net::{Ipv4Addr, SocketAddr};
885 use tokio::io::{Interest, Ready};
886
887 let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
888 let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
889 let socket_fd = socket.as_raw_fd();
890
891 // Enable IP_RECVERR option to receive error messages
892 // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information
893 let recv_err: libc::c_int = 1;
894 unsafe {
895 let res = libc::setsockopt(
896 socket.as_raw_fd(),
897 libc::SOL_IP,
898 libc::IP_RECVERR,
899 &recv_err as *const _ as *const libc::c_void,
900 std::mem::size_of_val(&recv_err) as libc::socklen_t,
901 );
902 if res == -1 {
903 panic!("{:?}", std::io::Error::last_os_error());
904 }
905 }
906
907 // Spawn a separate thread for sending messages
908 tokio::spawn(async move {
909 // Set the destination address. This address is invalid in this context. the OS will notice
910 // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"),
911 // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket
912 let mut dest_addr =
913 unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() };
914 dest_addr.sin_family = libc::AF_INET as _;
915 // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number
916 // below 1024 to guarantee that other tests don't select this port by accident when they
917 // use port 0 to select an ephemeral port.
918 dest_addr.sin_port = 512u16.to_be(); // Destination port
919
920 // Prepare the message data
921 let message = "Hello, Socket!";
922
923 // Prepare the message structure for sendmsg
924 let mut iov = libc::iovec {
925 iov_base: message.as_ptr() as *mut libc::c_void,
926 iov_len: message.len(),
927 };
928
929 // Prepare the destination address for the sendmsg call
930 let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr;
931 let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t;
932
933 let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
934 msg.msg_name = dest_sockaddr as *mut libc::c_void;
935 msg.msg_namelen = dest_addrlen;
936 msg.msg_iov = &mut iov;
937 msg.msg_iovlen = 1;
938
939 if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 {
940 panic!("{:?}", std::io::Error::last_os_error())
941 }
942 });
943
944 let fd = AsyncFd::new(socket).unwrap();
945
946 let guard = fd.ready(Interest::ERROR).await.unwrap();
947 assert_eq!(guard.ready(), Ready::ERROR);
948 }
949
950 #[derive(Debug, PartialEq, Eq)]
951 struct InvalidSource;
952
953 impl AsRawFd for InvalidSource {
as_raw_fd(&self) -> RawFd954 fn as_raw_fd(&self) -> RawFd {
955 -1
956 }
957 }
958
959 #[tokio::test]
try_new()960 async fn try_new() {
961 let original = Arc::new(InvalidSource);
962
963 let error = AsyncFd::try_new(original.clone()).unwrap_err();
964 let (returned, _cause) = error.into_parts();
965
966 assert!(Arc::ptr_eq(&original, &returned));
967 }
968
969 #[tokio::test]
try_with_interest()970 async fn try_with_interest() {
971 let original = Arc::new(InvalidSource);
972
973 let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err();
974 let (returned, _cause) = error.into_parts();
975
976 assert!(Arc::ptr_eq(&original, &returned));
977 }
978