1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![allow(clippy::needless_range_loop)]
3 #![warn(rust_2018_idioms)]
4 #![cfg(feature = "full")]
5 #![cfg(not(miri))]
6
7 // Tests to run on both current-thread & multi-thread runtime variants.
8
9 macro_rules! rt_test {
10 ($($t:tt)*) => {
11 mod current_thread_scheduler {
12 $($t)*
13
14 #[cfg(not(target_os="wasi"))]
15 const NUM_WORKERS: usize = 1;
16
17 fn rt() -> Arc<Runtime> {
18 tokio::runtime::Builder::new_current_thread()
19 .enable_all()
20 .build()
21 .unwrap()
22 .into()
23 }
24 }
25
26 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
27 mod threaded_scheduler_4_threads {
28 $($t)*
29
30 const NUM_WORKERS: usize = 4;
31
32 fn rt() -> Arc<Runtime> {
33 tokio::runtime::Builder::new_multi_thread()
34 .worker_threads(4)
35 .enable_all()
36 .build()
37 .unwrap()
38 .into()
39 }
40 }
41
42 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
43 mod threaded_scheduler_1_thread {
44 $($t)*
45
46 const NUM_WORKERS: usize = 1;
47
48 fn rt() -> Arc<Runtime> {
49 tokio::runtime::Builder::new_multi_thread()
50 .worker_threads(1)
51 .enable_all()
52 .build()
53 .unwrap()
54 .into()
55 }
56 }
57
58 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
59 #[cfg(tokio_unstable)]
60 mod alt_threaded_scheduler_4_threads {
61 $($t)*
62
63 const NUM_WORKERS: usize = 4;
64
65 fn rt() -> Arc<Runtime> {
66 tokio::runtime::Builder::new_multi_thread()
67 .worker_threads(4)
68 .enable_all()
69 .build()
70 .unwrap()
71 .into()
72 }
73 }
74
75 #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
76 #[cfg(tokio_unstable)]
77 mod alt_threaded_scheduler_1_thread {
78 $($t)*
79
80 const NUM_WORKERS: usize = 1;
81
82 fn rt() -> Arc<Runtime> {
83 tokio::runtime::Builder::new_multi_thread()
84 .worker_threads(1)
85 .enable_all()
86 .build()
87 .unwrap()
88 .into()
89 }
90 }
91 }
92 }
93
94 #[test]
send_sync_bound()95 fn send_sync_bound() {
96 use tokio::runtime::Runtime;
97 fn is_send<T: Send + Sync>() {}
98
99 is_send::<Runtime>();
100 }
101
102 rt_test! {
103 #[cfg(not(target_os="wasi"))]
104 use tokio::net::{TcpListener, TcpStream};
105 #[cfg(not(target_os="wasi"))]
106 use tokio::io::{AsyncReadExt, AsyncWriteExt};
107
108 use tokio::runtime::Runtime;
109 use tokio::sync::oneshot;
110 use tokio::{task, time};
111
112 #[cfg(not(target_os="wasi"))]
113 use tokio_test::assert_err;
114 use tokio_test::assert_ok;
115
116 use std::future::{poll_fn, Future};
117 use std::pin::Pin;
118
119 #[cfg(not(target_os="wasi"))]
120 use std::sync::mpsc;
121
122 use std::sync::Arc;
123 use std::task::{Context, Poll};
124
125 #[cfg(not(target_os="wasi"))]
126 use std::thread;
127 use std::time::{Duration, Instant};
128
129 #[test]
130 fn block_on_sync() {
131 let rt = rt();
132
133 let mut win = false;
134 rt.block_on(async {
135 win = true;
136 });
137
138 assert!(win);
139 }
140
141
142 #[cfg(not(target_os="wasi"))]
143 #[test]
144 fn block_on_async() {
145 let rt = rt();
146
147 let out = rt.block_on(async {
148 let (tx, rx) = oneshot::channel();
149
150 thread::spawn(move || {
151 thread::sleep(Duration::from_millis(50));
152 tx.send("ZOMG").unwrap();
153 });
154
155 assert_ok!(rx.await)
156 });
157
158 assert_eq!(out, "ZOMG");
159 }
160
161 #[test]
162 fn spawn_one_bg() {
163 let rt = rt();
164
165 let out = rt.block_on(async {
166 let (tx, rx) = oneshot::channel();
167
168 tokio::spawn(async move {
169 tx.send("ZOMG").unwrap();
170 });
171
172 assert_ok!(rx.await)
173 });
174
175 assert_eq!(out, "ZOMG");
176 }
177
178 #[test]
179 fn spawn_one_join() {
180 let rt = rt();
181
182 let out = rt.block_on(async {
183 let (tx, rx) = oneshot::channel();
184
185 let handle = tokio::spawn(async move {
186 tx.send("ZOMG").unwrap();
187 "DONE"
188 });
189
190 let msg = assert_ok!(rx.await);
191
192 let out = assert_ok!(handle.await);
193 assert_eq!(out, "DONE");
194
195 msg
196 });
197
198 assert_eq!(out, "ZOMG");
199 }
200
201 #[test]
202 fn spawn_two() {
203 let rt = rt();
204
205 let out = rt.block_on(async {
206 let (tx1, rx1) = oneshot::channel();
207 let (tx2, rx2) = oneshot::channel();
208
209 tokio::spawn(async move {
210 assert_ok!(tx1.send("ZOMG"));
211 });
212
213 tokio::spawn(async move {
214 let msg = assert_ok!(rx1.await);
215 assert_ok!(tx2.send(msg));
216 });
217
218 assert_ok!(rx2.await)
219 });
220
221 assert_eq!(out, "ZOMG");
222 }
223
224 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
225 #[test]
226 fn spawn_many_from_block_on() {
227 use tokio::sync::mpsc;
228
229 const ITER: usize = 200;
230
231 let rt = rt();
232
233 let out = rt.block_on(async {
234 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
235
236 let mut txs = (0..ITER)
237 .map(|i| {
238 let (tx, rx) = oneshot::channel();
239 let done_tx = done_tx.clone();
240
241 tokio::spawn(async move {
242 let msg = assert_ok!(rx.await);
243 assert_eq!(i, msg);
244 assert_ok!(done_tx.send(msg));
245 });
246
247 tx
248 })
249 .collect::<Vec<_>>();
250
251 drop(done_tx);
252
253 thread::spawn(move || {
254 for (i, tx) in txs.drain(..).enumerate() {
255 assert_ok!(tx.send(i));
256 }
257 });
258
259 let mut out = vec![];
260 while let Some(i) = done_rx.recv().await {
261 out.push(i);
262 }
263
264 out.sort_unstable();
265 out
266 });
267
268 assert_eq!(ITER, out.len());
269
270 for i in 0..ITER {
271 assert_eq!(i, out[i]);
272 }
273 }
274
275 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
276 #[test]
277 fn spawn_many_from_task() {
278 use tokio::sync::mpsc;
279
280 const ITER: usize = 500;
281
282 let rt = rt();
283
284 let out = rt.block_on(async {
285 tokio::spawn(async move {
286 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
287
288 let mut txs = (0..ITER)
289 .map(|i| {
290 let (tx, rx) = oneshot::channel();
291 let done_tx = done_tx.clone();
292
293 tokio::spawn(async move {
294 let msg = assert_ok!(rx.await);
295 assert_eq!(i, msg);
296 assert_ok!(done_tx.send(msg));
297 });
298
299 tx
300 })
301 .collect::<Vec<_>>();
302
303 drop(done_tx);
304
305 thread::spawn(move || {
306 for (i, tx) in txs.drain(..).enumerate() {
307 assert_ok!(tx.send(i));
308 }
309 });
310
311 let mut out = vec![];
312 while let Some(i) = done_rx.recv().await {
313 out.push(i);
314 }
315
316 out.sort_unstable();
317 out
318 }).await.unwrap()
319 });
320
321 assert_eq!(ITER, out.len());
322
323 for i in 0..ITER {
324 assert_eq!(i, out[i]);
325 }
326 }
327
328 #[test]
329 fn spawn_one_from_block_on_called_on_handle() {
330 let rt = rt();
331 let (tx, rx) = oneshot::channel();
332
333 #[allow(clippy::async_yields_async)]
334 let handle = rt.handle().block_on(async {
335 tokio::spawn(async move {
336 tx.send("ZOMG").unwrap();
337 "DONE"
338 })
339 });
340
341 let out = rt.block_on(async {
342 let msg = assert_ok!(rx.await);
343
344 let out = assert_ok!(handle.await);
345 assert_eq!(out, "DONE");
346
347 msg
348 });
349
350 assert_eq!(out, "ZOMG");
351 }
352
353 #[test]
354 fn spawn_await_chain() {
355 let rt = rt();
356
357 let out = rt.block_on(async {
358 assert_ok!(tokio::spawn(async {
359 assert_ok!(tokio::spawn(async {
360 "hello"
361 }).await)
362 }).await)
363 });
364
365 assert_eq!(out, "hello");
366 }
367
368 #[test]
369 fn outstanding_tasks_dropped() {
370 let rt = rt();
371
372 let cnt = Arc::new(());
373
374 rt.block_on(async {
375 let cnt = cnt.clone();
376
377 tokio::spawn(poll_fn(move |_| {
378 assert_eq!(2, Arc::strong_count(&cnt));
379 Poll::<()>::Pending
380 }));
381 });
382
383 assert_eq!(2, Arc::strong_count(&cnt));
384
385 drop(rt);
386
387 assert_eq!(1, Arc::strong_count(&cnt));
388 }
389
390 #[test]
391 #[should_panic]
392 fn nested_rt() {
393 let rt1 = rt();
394 let rt2 = rt();
395
396 rt1.block_on(async { rt2.block_on(async { "hello" }) });
397 }
398
399 #[test]
400 fn create_rt_in_block_on() {
401 let rt1 = rt();
402 let rt2 = rt1.block_on(async { rt() });
403 let out = rt2.block_on(async { "ZOMG" });
404
405 assert_eq!(out, "ZOMG");
406 }
407
408 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
409 #[test]
410 fn complete_block_on_under_load() {
411 let rt = rt();
412
413 rt.block_on(async {
414 let (tx, rx) = oneshot::channel();
415
416 // Spin hard
417 tokio::spawn(async {
418 loop {
419 yield_once().await;
420 }
421 });
422
423 thread::spawn(move || {
424 thread::sleep(Duration::from_millis(50));
425 assert_ok!(tx.send(()));
426 });
427
428 assert_ok!(rx.await);
429 });
430 }
431
432 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
433 #[test]
434 fn complete_task_under_load() {
435 let rt = rt();
436
437 rt.block_on(async {
438 let (tx1, rx1) = oneshot::channel();
439 let (tx2, rx2) = oneshot::channel();
440
441 // Spin hard
442 tokio::spawn(async {
443 loop {
444 yield_once().await;
445 }
446 });
447
448 thread::spawn(move || {
449 thread::sleep(Duration::from_millis(50));
450 assert_ok!(tx1.send(()));
451 });
452
453 tokio::spawn(async move {
454 assert_ok!(rx1.await);
455 assert_ok!(tx2.send(()));
456 });
457
458 assert_ok!(rx2.await);
459 });
460 }
461
462 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
463 #[test]
464 fn spawn_from_other_thread_idle() {
465 let rt = rt();
466 let handle = rt.clone();
467
468 let (tx, rx) = oneshot::channel();
469
470 thread::spawn(move || {
471 thread::sleep(Duration::from_millis(50));
472
473 handle.spawn(async move {
474 assert_ok!(tx.send(()));
475 });
476 });
477
478 rt.block_on(async move {
479 assert_ok!(rx.await);
480 });
481 }
482
483 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
484 #[test]
485 fn spawn_from_other_thread_under_load() {
486 let rt = rt();
487 let handle = rt.clone();
488
489 let (tx, rx) = oneshot::channel();
490
491 thread::spawn(move || {
492 handle.spawn(async move {
493 assert_ok!(tx.send(()));
494 });
495 });
496
497 rt.block_on(async move {
498 // Spin hard
499 tokio::spawn(async {
500 loop {
501 yield_once().await;
502 }
503 });
504
505 assert_ok!(rx.await);
506 });
507 }
508
509 #[test]
510 fn sleep_at_root() {
511 let rt = rt();
512
513 let now = Instant::now();
514 let dur = Duration::from_millis(50);
515
516 rt.block_on(async move {
517 time::sleep(dur).await;
518 });
519
520 assert!(now.elapsed() >= dur);
521 }
522
523 #[test]
524 fn sleep_in_spawn() {
525 let rt = rt();
526
527 let now = Instant::now();
528 let dur = Duration::from_millis(50);
529
530 rt.block_on(async move {
531 let (tx, rx) = oneshot::channel();
532
533 tokio::spawn(async move {
534 time::sleep(dur).await;
535 assert_ok!(tx.send(()));
536 });
537
538 assert_ok!(rx.await);
539 });
540
541 assert!(now.elapsed() >= dur);
542 }
543
544 #[cfg(not(target_os="wasi"))] // Wasi does not support bind
545 #[cfg_attr(miri, ignore)] // No `socket` in miri.
546 #[test]
547 fn block_on_socket() {
548 let rt = rt();
549
550 rt.block_on(async move {
551 let (tx, rx) = oneshot::channel();
552
553 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
554 let addr = listener.local_addr().unwrap();
555
556 tokio::spawn(async move {
557 let _ = listener.accept().await;
558 tx.send(()).unwrap();
559 });
560
561 TcpStream::connect(&addr).await.unwrap();
562 rx.await.unwrap();
563 });
564 }
565
566 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
567 #[test]
568 fn spawn_from_blocking() {
569 let rt = rt();
570
571 let out = rt.block_on(async move {
572 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
573 tokio::spawn(async move { "hello" })
574 }).await);
575
576 assert_ok!(inner.await)
577 });
578
579 assert_eq!(out, "hello")
580 }
581
582 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
583 #[test]
584 fn spawn_blocking_from_blocking() {
585 let rt = rt();
586
587 let out = rt.block_on(async move {
588 let inner = assert_ok!(tokio::task::spawn_blocking(|| {
589 tokio::task::spawn_blocking(|| "hello")
590 }).await);
591
592 assert_ok!(inner.await)
593 });
594
595 assert_eq!(out, "hello")
596 }
597
598 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
599 #[test]
600 fn sleep_from_blocking() {
601 let rt = rt();
602
603 rt.block_on(async move {
604 assert_ok!(tokio::task::spawn_blocking(|| {
605 let now = std::time::Instant::now();
606 let dur = Duration::from_millis(1);
607
608 // use the futures' block_on fn to make sure we aren't setting
609 // any Tokio context
610 futures::executor::block_on(async {
611 tokio::time::sleep(dur).await;
612 });
613
614 assert!(now.elapsed() >= dur);
615 }).await);
616 });
617 }
618
619 #[cfg(not(target_os="wasi"))] // Wasi does not support bind
620 #[cfg_attr(miri, ignore)] // No `socket` in miri.
621 #[test]
622 fn socket_from_blocking() {
623 let rt = rt();
624
625 rt.block_on(async move {
626 let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
627 let addr = assert_ok!(listener.local_addr());
628
629 let peer = tokio::task::spawn_blocking(move || {
630 // use the futures' block_on fn to make sure we aren't setting
631 // any Tokio context
632 futures::executor::block_on(async {
633 assert_ok!(TcpStream::connect(addr).await);
634 });
635 });
636
637 // Wait for the client to connect
638 let _ = assert_ok!(listener.accept().await);
639
640 assert_ok!(peer.await);
641 });
642 }
643
644 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
645 #[test]
646 fn always_active_parker() {
647 // This test it to show that we will always have
648 // an active parker even if we call block_on concurrently
649
650 let rt = rt();
651 let rt2 = rt.clone();
652
653 let (tx1, rx1) = oneshot::channel();
654 let (tx2, rx2) = oneshot::channel();
655
656 let jh1 = thread::spawn(move || {
657 rt.block_on(async move {
658 rx2.await.unwrap();
659 time::sleep(Duration::from_millis(5)).await;
660 tx1.send(()).unwrap();
661 });
662 });
663
664 let jh2 = thread::spawn(move || {
665 rt2.block_on(async move {
666 tx2.send(()).unwrap();
667 time::sleep(Duration::from_millis(5)).await;
668 rx1.await.unwrap();
669 time::sleep(Duration::from_millis(5)).await;
670 });
671 });
672
673 jh1.join().unwrap();
674 jh2.join().unwrap();
675 }
676
677 #[test]
678 // IOCP requires setting the "max thread" concurrency value. The sane,
679 // default, is to set this to the number of cores. Threads that poll I/O
680 // become associated with the IOCP handle. Once those threads sleep for any
681 // reason (mutex), they yield their ownership.
682 //
683 // This test hits an edge case on windows where more threads than cores are
684 // created, none of those threads ever yield due to being at capacity, so
685 // IOCP gets "starved".
686 //
687 // For now, this is a very edge case that is probably not a real production
688 // concern. There also isn't a great/obvious solution to take. For now, the
689 // test is disabled.
690 #[cfg(not(windows))]
691 #[cfg_attr(miri, ignore)] // No `socket` in miri.
692 #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
693 fn io_driver_called_when_under_load() {
694 let rt = rt();
695
696 // Create a lot of constant load. The scheduler will always be busy.
697 for _ in 0..100 {
698 rt.spawn(async {
699 loop {
700 // Don't use Tokio's `yield_now()` to avoid special defer
701 // logic.
702 std::future::poll_fn::<(), _>(|cx| {
703 cx.waker().wake_by_ref();
704 std::task::Poll::Pending
705 }).await;
706 }
707 });
708 }
709
710 // Do some I/O work
711 rt.block_on(async {
712 let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
713 let addr = assert_ok!(listener.local_addr());
714
715 let srv = tokio::spawn(async move {
716 let (mut stream, _) = assert_ok!(listener.accept().await);
717 assert_ok!(stream.write_all(b"hello world").await);
718 });
719
720 let cli = tokio::spawn(async move {
721 let mut stream = assert_ok!(TcpStream::connect(addr).await);
722 let mut dst = vec![0; 11];
723
724 assert_ok!(stream.read_exact(&mut dst).await);
725 assert_eq!(dst, b"hello world");
726 });
727
728 assert_ok!(srv.await);
729 assert_ok!(cli.await);
730 });
731 }
732
733 /// Tests that yielded tasks are not scheduled until **after** resource
734 /// drivers are polled.
735 ///
736 /// The OS does not guarantee when I/O events are delivered, so there may be
737 /// more yields than anticipated. This makes the test slightly flaky. To
738 /// help avoid flakiness, we run the test 10 times and only fail it after
739 /// 10 failures in a row.
740 ///
741 /// Note that if the test fails by panicking rather than by returning false,
742 /// then we fail it immediately. That kind of failure should not happen
743 /// spuriously.
744 #[test]
745 #[cfg(not(target_os="wasi"))]
746 #[cfg_attr(miri, ignore)] // No `socket` in miri.
747 fn yield_defers_until_park() {
748 for _ in 0..10 {
749 if yield_defers_until_park_inner() {
750 // test passed
751 return;
752 }
753
754 // Wait a bit and run the test again.
755 std::thread::sleep(std::time::Duration::from_secs(2));
756 }
757
758 panic!("yield_defers_until_park is failing consistently");
759 }
760
761 /// Implementation of `yield_defers_until_park` test. Returns `true` if the
762 /// test passed.
763 #[cfg(not(target_os="wasi"))]
764 fn yield_defers_until_park_inner() -> bool {
765 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
766 use std::sync::Barrier;
767
768 let rt = rt();
769
770 let flag = Arc::new(AtomicBool::new(false));
771 let barrier = Arc::new(Barrier::new(NUM_WORKERS));
772
773 rt.block_on(async {
774 // Make sure other workers cannot steal tasks
775 #[allow(clippy::reversed_empty_ranges)]
776 for _ in 0..(NUM_WORKERS-1) {
777 let flag = flag.clone();
778 let barrier = barrier.clone();
779
780 tokio::spawn(async move {
781 barrier.wait();
782
783 while !flag.load(SeqCst) {
784 std::thread::sleep(std::time::Duration::from_millis(1));
785 }
786 });
787 }
788
789 barrier.wait();
790
791 let (fail_test, fail_test_recv) = oneshot::channel::<()>();
792 let flag_clone = flag.clone();
793 let jh = tokio::spawn(async move {
794 // Create a TCP listener
795 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
796 let addr = listener.local_addr().unwrap();
797
798 tokio::join!(
799 async {
800 // Done in a blocking manner intentionally.
801 let _socket = std::net::TcpStream::connect(addr).unwrap();
802
803 // Yield until connected
804 let mut cnt = 0;
805 while !flag_clone.load(SeqCst){
806 tokio::task::yield_now().await;
807 cnt += 1;
808
809 if cnt >= 10 {
810 // yielded too many times; report failure and
811 // sleep forever so that the `fail_test` branch
812 // of the `select!` below triggers.
813 let _ = fail_test.send(());
814 futures::future::pending::<()>().await;
815 break;
816 }
817 }
818 },
819 async {
820 let _ = listener.accept().await.unwrap();
821 flag_clone.store(true, SeqCst);
822 }
823 );
824 });
825
826 // Wait until the spawned task completes or fails. If no message is
827 // sent on `fail_test`, then the test succeeds. Otherwise, it fails.
828 let success = fail_test_recv.await.is_err();
829
830 if success {
831 // Setting flag to true ensures that the tasks we spawned at
832 // the beginning of the test will exit.
833 // If we don't do this, the test will hang since the runtime waits
834 // for all spawned tasks to finish when dropping.
835 flag.store(true, SeqCst);
836 // Check for panics in spawned task.
837 jh.abort();
838 jh.await.unwrap();
839 }
840
841 success
842 })
843 }
844
845 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
846 #[cfg_attr(miri, ignore)] // No `socket` in miri.
847 #[test]
848 fn client_server_block_on() {
849 let rt = rt();
850 let (tx, rx) = mpsc::channel();
851
852 rt.block_on(async move { client_server(tx).await });
853
854 assert_ok!(rx.try_recv());
855 assert_err!(rx.try_recv());
856 }
857
858 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads or panic recovery")]
859 #[cfg(panic = "unwind")]
860 #[test]
861 fn panic_in_task() {
862 let rt = rt();
863 let (tx, rx) = oneshot::channel();
864
865 struct Boom(Option<oneshot::Sender<()>>);
866
867 impl Future for Boom {
868 type Output = ();
869
870 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
871 panic!();
872 }
873 }
874
875 impl Drop for Boom {
876 fn drop(&mut self) {
877 assert!(std::thread::panicking());
878 self.0.take().unwrap().send(()).unwrap();
879 }
880 }
881
882 rt.spawn(Boom(Some(tx)));
883 assert_ok!(rt.block_on(rx));
884 }
885
886 #[test]
887 #[should_panic]
888 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
889 fn panic_in_block_on() {
890 let rt = rt();
891 rt.block_on(async { panic!() });
892 }
893
894 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
895 async fn yield_once() {
896 let mut yielded = false;
897 poll_fn(|cx| {
898 if yielded {
899 Poll::Ready(())
900 } else {
901 yielded = true;
902 cx.waker().wake_by_ref();
903 Poll::Pending
904 }
905 })
906 .await
907 }
908
909 #[test]
910 fn enter_and_spawn() {
911 let rt = rt();
912 let handle = {
913 let _enter = rt.enter();
914 tokio::spawn(async {})
915 };
916
917 assert_ok!(rt.block_on(handle));
918 }
919
920 #[test]
921 fn eagerly_drops_futures_on_shutdown() {
922 use std::sync::mpsc;
923
924 struct Never {
925 drop_tx: mpsc::Sender<()>,
926 }
927
928 impl Future for Never {
929 type Output = ();
930
931 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
932 Poll::Pending
933 }
934 }
935
936 impl Drop for Never {
937 fn drop(&mut self) {
938 self.drop_tx.send(()).unwrap();
939 }
940 }
941
942 let rt = rt();
943
944 let (drop_tx, drop_rx) = mpsc::channel();
945 let (run_tx, run_rx) = oneshot::channel();
946
947 rt.block_on(async move {
948 tokio::spawn(async move {
949 assert_ok!(run_tx.send(()));
950
951 Never { drop_tx }.await
952 });
953
954 assert_ok!(run_rx.await);
955 });
956
957 drop(rt);
958
959 assert_ok!(drop_rx.recv());
960 }
961
962 #[test]
963 fn wake_while_rt_is_dropping() {
964 use tokio::sync::Barrier;
965
966 struct OnDrop<F: FnMut()>(F);
967
968 impl<F: FnMut()> Drop for OnDrop<F> {
969 fn drop(&mut self) {
970 (self.0)()
971 }
972 }
973
974 let (tx1, rx1) = oneshot::channel();
975 let (tx2, rx2) = oneshot::channel();
976
977 let barrier = Arc::new(Barrier::new(3));
978 let barrier1 = barrier.clone();
979 let barrier2 = barrier.clone();
980
981 let rt = rt();
982
983 rt.spawn(async move {
984 let mut tx2 = Some(tx2);
985 let _d = OnDrop(move || {
986 let _ = tx2.take().unwrap().send(());
987 });
988
989 // Ensure a waker gets stored in oneshot 1.
990 let _ = tokio::join!(rx1, barrier1.wait());
991 });
992
993 rt.spawn(async move {
994 let mut tx1 = Some(tx1);
995 let _d = OnDrop(move || {
996 let _ = tx1.take().unwrap().send(());
997 });
998
999 // Ensure a waker gets stored in oneshot 2.
1000 let _ = tokio::join!(rx2, barrier2.wait());
1001 });
1002
1003 // Wait until every oneshot channel has been polled.
1004 rt.block_on(barrier.wait());
1005
1006 // Drop the rt. Regardless of which task is dropped first, its destructor will wake the
1007 // other task.
1008 drop(rt);
1009 }
1010
1011 #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
1012 #[cfg_attr(miri, ignore)] // No `socket` in miri.
1013 #[test]
1014 fn io_notify_while_shutting_down() {
1015 use tokio::net::UdpSocket;
1016 use std::sync::Arc;
1017
1018 for _ in 1..10 {
1019 let runtime = rt();
1020
1021 runtime.block_on(async {
1022 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
1023 let addr = socket.local_addr().unwrap();
1024 let send_half = Arc::new(socket);
1025 let recv_half = send_half.clone();
1026
1027 tokio::spawn(async move {
1028 let mut buf = [0];
1029 loop {
1030 recv_half.recv_from(&mut buf).await.unwrap();
1031 std::thread::sleep(Duration::from_millis(2));
1032 }
1033 });
1034
1035 tokio::spawn(async move {
1036 let buf = [0];
1037 loop {
1038 send_half.send_to(&buf, &addr).await.unwrap();
1039 tokio::time::sleep(Duration::from_millis(1)).await;
1040 }
1041 });
1042
1043 tokio::time::sleep(Duration::from_millis(5)).await;
1044 });
1045 }
1046 }
1047
1048 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1049 #[test]
1050 fn shutdown_timeout() {
1051 let (tx, rx) = oneshot::channel();
1052 let runtime = rt();
1053
1054 runtime.block_on(async move {
1055 task::spawn_blocking(move || {
1056 tx.send(()).unwrap();
1057 thread::sleep(Duration::from_secs(10_000));
1058 });
1059
1060 rx.await.unwrap();
1061 });
1062
1063 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
1064 }
1065
1066 #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1067 #[test]
1068 fn shutdown_timeout_0() {
1069 let runtime = rt();
1070
1071 runtime.block_on(async move {
1072 task::spawn_blocking(move || {
1073 thread::sleep(Duration::from_secs(10_000));
1074 });
1075 });
1076
1077 let now = Instant::now();
1078 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
1079 assert!(now.elapsed().as_secs() < 1);
1080 }
1081
1082 #[test]
1083 fn shutdown_wakeup_time() {
1084 let runtime = rt();
1085
1086 runtime.block_on(async move {
1087 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1088 });
1089
1090 Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
1091 }
1092
1093 // This test is currently ignored on Windows because of a
1094 // rust-lang issue in thread local storage destructors.
1095 // See https://github.com/rust-lang/rust/issues/74875
1096 #[test]
1097 #[cfg(not(windows))]
1098 #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads")]
1099 fn runtime_in_thread_local() {
1100 use std::cell::RefCell;
1101 use std::thread;
1102
1103 thread_local!(
1104 static R: RefCell<Option<Runtime>> = const { RefCell::new(None) };
1105 );
1106
1107 thread::spawn(|| {
1108 R.with(|cell| {
1109 let rt = rt();
1110 let rt = Arc::try_unwrap(rt).unwrap();
1111 *cell.borrow_mut() = Some(rt);
1112 });
1113
1114 let _rt = rt();
1115 }).join().unwrap();
1116 }
1117
1118 #[cfg(not(target_os="wasi"))] // Wasi does not support bind
1119 async fn client_server(tx: mpsc::Sender<()>) {
1120 let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1121
1122 // Get the assigned address
1123 let addr = assert_ok!(server.local_addr());
1124
1125 // Spawn the server
1126 tokio::spawn(async move {
1127 // Accept a socket
1128 let (mut socket, _) = server.accept().await.unwrap();
1129
1130 // Write some data
1131 socket.write_all(b"hello").await.unwrap();
1132 });
1133
1134 let mut client = TcpStream::connect(&addr).await.unwrap();
1135
1136 let mut buf = vec![];
1137 client.read_to_end(&mut buf).await.unwrap();
1138
1139 assert_eq!(buf, b"hello");
1140 tx.send(()).unwrap();
1141 }
1142
1143 #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1144 #[cfg_attr(miri, ignore)] // No `socket` in miri.
1145 #[test]
1146 fn local_set_block_on_socket() {
1147 let rt = rt();
1148 let local = task::LocalSet::new();
1149
1150 local.block_on(&rt, async move {
1151 let (tx, rx) = oneshot::channel();
1152
1153 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1154 let addr = listener.local_addr().unwrap();
1155
1156 task::spawn_local(async move {
1157 let _ = listener.accept().await;
1158 tx.send(()).unwrap();
1159 });
1160
1161 TcpStream::connect(&addr).await.unwrap();
1162 rx.await.unwrap();
1163 });
1164 }
1165
1166 #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1167 #[cfg_attr(miri, ignore)] // No `socket` in miri.
1168 #[test]
1169 fn local_set_client_server_block_on() {
1170 let rt = rt();
1171 let (tx, rx) = mpsc::channel();
1172
1173 let local = task::LocalSet::new();
1174
1175 local.block_on(&rt, async move { client_server_local(tx).await });
1176
1177 assert_ok!(rx.try_recv());
1178 assert_err!(rx.try_recv());
1179 }
1180
1181 #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1182 async fn client_server_local(tx: mpsc::Sender<()>) {
1183 let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1184
1185 // Get the assigned address
1186 let addr = assert_ok!(server.local_addr());
1187
1188 // Spawn the server
1189 task::spawn_local(async move {
1190 // Accept a socket
1191 let (mut socket, _) = server.accept().await.unwrap();
1192
1193 // Write some data
1194 socket.write_all(b"hello").await.unwrap();
1195 });
1196
1197 let mut client = TcpStream::connect(&addr).await.unwrap();
1198
1199 let mut buf = vec![];
1200 client.read_to_end(&mut buf).await.unwrap();
1201
1202 assert_eq!(buf, b"hello");
1203 tx.send(()).unwrap();
1204 }
1205
1206 #[test]
1207 fn coop() {
1208 use std::task::Poll::Ready;
1209 use tokio::sync::mpsc;
1210
1211 let rt = rt();
1212
1213 rt.block_on(async {
1214 let (send, mut recv) = mpsc::unbounded_channel();
1215
1216 // Send a bunch of messages.
1217 for _ in 0..1_000 {
1218 send.send(()).unwrap();
1219 }
1220
1221 poll_fn(|cx| {
1222 // At least one response should return pending.
1223 for _ in 0..1_000 {
1224 if recv.poll_recv(cx).is_pending() {
1225 return Ready(());
1226 }
1227 }
1228
1229 panic!("did not yield");
1230 }).await;
1231 });
1232 }
1233
1234 #[test]
1235 fn coop_unconstrained() {
1236 use std::task::Poll::Ready;
1237 use tokio::sync::mpsc;
1238
1239 let rt = rt();
1240
1241 rt.block_on(async {
1242 let (send, mut recv) = mpsc::unbounded_channel();
1243
1244 // Send a bunch of messages.
1245 for _ in 0..1_000 {
1246 send.send(()).unwrap();
1247 }
1248
1249 tokio::task::unconstrained(poll_fn(|cx| {
1250 // All the responses should be ready.
1251 for _ in 0..1_000 {
1252 assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
1253 }
1254
1255 Ready(())
1256 })).await;
1257 });
1258 }
1259
1260 #[cfg(tokio_unstable)]
1261 #[test]
1262 fn coop_consume_budget() {
1263 let rt = rt();
1264
1265 rt.block_on(async {
1266 poll_fn(|cx| {
1267 let counter = Arc::new(std::sync::Mutex::new(0));
1268 let counter_clone = Arc::clone(&counter);
1269 let mut worker = Box::pin(async move {
1270 // Consume the budget until a yield happens
1271 for _ in 0..1000 {
1272 *counter.lock().unwrap() += 1;
1273 task::consume_budget().await
1274 }
1275 });
1276 // Assert that the worker was yielded and it didn't manage
1277 // to finish the whole work (assuming the total budget of 128)
1278 assert!(Pin::new(&mut worker).poll(cx).is_pending());
1279 assert!(*counter_clone.lock().unwrap() < 1000);
1280 std::task::Poll::Ready(())
1281 }).await;
1282 });
1283 }
1284
1285 // Tests that the "next task" scheduler optimization is not able to starve
1286 // other tasks.
1287 #[test]
1288 fn ping_pong_saturation() {
1289 use std::sync::atomic::{Ordering, AtomicBool};
1290 use tokio::sync::mpsc;
1291
1292 const NUM: usize = 100;
1293
1294 let rt = rt();
1295
1296 let running = Arc::new(AtomicBool::new(true));
1297
1298 rt.block_on(async {
1299 let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1300
1301 let mut tasks = vec![];
1302 // Spawn a bunch of tasks that ping-pong between each other to
1303 // saturate the runtime.
1304 for _ in 0..NUM {
1305 let (tx1, mut rx1) = mpsc::unbounded_channel();
1306 let (tx2, mut rx2) = mpsc::unbounded_channel();
1307 let spawned_tx = spawned_tx.clone();
1308 let running = running.clone();
1309 tasks.push(task::spawn(async move {
1310 spawned_tx.send(()).unwrap();
1311
1312
1313 while running.load(Ordering::Relaxed) {
1314 tx1.send(()).unwrap();
1315 rx2.recv().await.unwrap();
1316 }
1317
1318 // Close the channel and wait for the other task to exit.
1319 drop(tx1);
1320 assert!(rx2.recv().await.is_none());
1321 }));
1322
1323 tasks.push(task::spawn(async move {
1324 while rx1.recv().await.is_some() {
1325 tx2.send(()).unwrap();
1326 }
1327 }));
1328 }
1329
1330 for _ in 0..NUM {
1331 spawned_rx.recv().await.unwrap();
1332 }
1333
1334 // spawn another task and wait for it to complete
1335 let handle = task::spawn(async {
1336 for _ in 0..5 {
1337 // Yielding forces it back into the local queue.
1338 task::yield_now().await;
1339 }
1340 });
1341 handle.await.unwrap();
1342 running.store(false, Ordering::Relaxed);
1343 for t in tasks {
1344 t.await.unwrap();
1345 }
1346 });
1347 }
1348
1349 #[test]
1350 #[cfg(not(target_os="wasi"))]
1351 fn shutdown_concurrent_spawn() {
1352 const NUM_TASKS: usize = 10_000;
1353 for _ in 0..5 {
1354 let (tx, rx) = std::sync::mpsc::channel();
1355 let rt = rt();
1356
1357 let mut txs = vec![];
1358
1359 for _ in 0..NUM_TASKS {
1360 let (tx, rx) = tokio::sync::oneshot::channel();
1361 txs.push(tx);
1362 rt.spawn(async move {
1363 rx.await.unwrap();
1364 });
1365 }
1366
1367 // Prime the tasks
1368 rt.block_on(async { tokio::task::yield_now().await });
1369
1370 let th = std::thread::spawn(move || {
1371 tx.send(()).unwrap();
1372 for tx in txs.drain(..) {
1373 let _ = tx.send(());
1374 }
1375 });
1376
1377 rx.recv().unwrap();
1378 drop(rt);
1379
1380 th.join().unwrap();
1381 }
1382 }
1383
1384 #[test]
1385 #[cfg_attr(target_family = "wasm", ignore)]
1386 fn wake_by_ref_from_thread_local() {
1387 wake_from_thread_local(true);
1388 }
1389
1390 #[test]
1391 #[cfg_attr(target_family = "wasm", ignore)]
1392 fn wake_by_val_from_thread_local() {
1393 wake_from_thread_local(false);
1394 }
1395
1396 fn wake_from_thread_local(by_ref: bool) {
1397 use std::cell::RefCell;
1398 use std::sync::mpsc::{channel, Sender};
1399 use std::task::Waker;
1400
1401 struct TLData {
1402 by_ref: bool,
1403 waker: Option<Waker>,
1404 done: Sender<bool>,
1405 }
1406
1407 impl Drop for TLData {
1408 fn drop(&mut self) {
1409 if self.by_ref {
1410 self.waker.take().unwrap().wake_by_ref();
1411 } else {
1412 self.waker.take().unwrap().wake();
1413 }
1414 let _ = self.done.send(true);
1415 }
1416 }
1417
1418 std::thread_local! {
1419 static TL_DATA: RefCell<Option<TLData>> = const { RefCell::new(None) };
1420 };
1421
1422 let (send, recv) = channel();
1423
1424 std::thread::spawn(move || {
1425 let rt = rt();
1426 rt.block_on(rt.spawn(poll_fn(move |cx| {
1427 let waker = cx.waker().clone();
1428 let send = send.clone();
1429 TL_DATA.with(|tl| {
1430 tl.replace(Some(TLData {
1431 by_ref,
1432 waker: Some(waker),
1433 done: send,
1434 }));
1435 });
1436 Poll::Ready(())
1437 })))
1438 .unwrap();
1439 })
1440 .join()
1441 .unwrap();
1442
1443 assert!(recv.recv().unwrap());
1444 }
1445 }
1446