1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![allow(clippy::needless_range_loop)]
3 #![warn(rust_2018_idioms)]
4 #![cfg(feature = "full")]
5 #![cfg(not(miri))]
6 
7 // Tests to run on both current-thread & multi-thread runtime variants.
8 
9 macro_rules! rt_test {
10     ($($t:tt)*) => {
11         mod current_thread_scheduler {
12             $($t)*
13 
14             #[cfg(not(target_os="wasi"))]
15             const NUM_WORKERS: usize = 1;
16 
17             fn rt() -> Arc<Runtime> {
18                 tokio::runtime::Builder::new_current_thread()
19                     .enable_all()
20                     .build()
21                     .unwrap()
22                     .into()
23             }
24         }
25 
26         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
27         mod threaded_scheduler_4_threads {
28             $($t)*
29 
30             const NUM_WORKERS: usize = 4;
31 
32             fn rt() -> Arc<Runtime> {
33                 tokio::runtime::Builder::new_multi_thread()
34                     .worker_threads(4)
35                     .enable_all()
36                     .build()
37                     .unwrap()
38                     .into()
39             }
40         }
41 
42         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
43         mod threaded_scheduler_1_thread {
44             $($t)*
45 
46             const NUM_WORKERS: usize = 1;
47 
48             fn rt() -> Arc<Runtime> {
49                 tokio::runtime::Builder::new_multi_thread()
50                     .worker_threads(1)
51                     .enable_all()
52                     .build()
53                     .unwrap()
54                     .into()
55             }
56         }
57 
58         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
59         #[cfg(tokio_unstable)]
60         mod alt_threaded_scheduler_4_threads {
61             $($t)*
62 
63             const NUM_WORKERS: usize = 4;
64 
65             fn rt() -> Arc<Runtime> {
66                 tokio::runtime::Builder::new_multi_thread()
67                     .worker_threads(4)
68                     .enable_all()
69                     .build()
70                     .unwrap()
71                     .into()
72             }
73         }
74 
75         #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
76         #[cfg(tokio_unstable)]
77         mod alt_threaded_scheduler_1_thread {
78             $($t)*
79 
80             const NUM_WORKERS: usize = 1;
81 
82             fn rt() -> Arc<Runtime> {
83                 tokio::runtime::Builder::new_multi_thread()
84                     .worker_threads(1)
85                     .enable_all()
86                     .build()
87                     .unwrap()
88                     .into()
89             }
90         }
91     }
92 }
93 
94 #[test]
send_sync_bound()95 fn send_sync_bound() {
96     use tokio::runtime::Runtime;
97     fn is_send<T: Send + Sync>() {}
98 
99     is_send::<Runtime>();
100 }
101 
102 rt_test! {
103     #[cfg(not(target_os="wasi"))]
104     use tokio::net::{TcpListener, TcpStream};
105     #[cfg(not(target_os="wasi"))]
106     use tokio::io::{AsyncReadExt, AsyncWriteExt};
107 
108     use tokio::runtime::Runtime;
109     use tokio::sync::oneshot;
110     use tokio::{task, time};
111 
112     #[cfg(not(target_os="wasi"))]
113     use tokio_test::assert_err;
114     use tokio_test::assert_ok;
115 
116     use std::future::{poll_fn, Future};
117     use std::pin::Pin;
118 
119     #[cfg(not(target_os="wasi"))]
120     use std::sync::mpsc;
121 
122     use std::sync::Arc;
123     use std::task::{Context, Poll};
124 
125     #[cfg(not(target_os="wasi"))]
126     use std::thread;
127     use std::time::{Duration, Instant};
128 
129     #[test]
130     fn block_on_sync() {
131         let rt = rt();
132 
133         let mut win = false;
134         rt.block_on(async {
135             win = true;
136         });
137 
138         assert!(win);
139     }
140 
141 
142     #[cfg(not(target_os="wasi"))]
143     #[test]
144     fn block_on_async() {
145         let rt = rt();
146 
147         let out = rt.block_on(async {
148             let (tx, rx) = oneshot::channel();
149 
150             thread::spawn(move || {
151                 thread::sleep(Duration::from_millis(50));
152                 tx.send("ZOMG").unwrap();
153             });
154 
155             assert_ok!(rx.await)
156         });
157 
158         assert_eq!(out, "ZOMG");
159     }
160 
161     #[test]
162     fn spawn_one_bg() {
163         let rt = rt();
164 
165         let out = rt.block_on(async {
166             let (tx, rx) = oneshot::channel();
167 
168             tokio::spawn(async move {
169                 tx.send("ZOMG").unwrap();
170             });
171 
172             assert_ok!(rx.await)
173         });
174 
175         assert_eq!(out, "ZOMG");
176     }
177 
178     #[test]
179     fn spawn_one_join() {
180         let rt = rt();
181 
182         let out = rt.block_on(async {
183             let (tx, rx) = oneshot::channel();
184 
185             let handle = tokio::spawn(async move {
186                 tx.send("ZOMG").unwrap();
187                 "DONE"
188             });
189 
190             let msg = assert_ok!(rx.await);
191 
192             let out = assert_ok!(handle.await);
193             assert_eq!(out, "DONE");
194 
195             msg
196         });
197 
198         assert_eq!(out, "ZOMG");
199     }
200 
201     #[test]
202     fn spawn_two() {
203         let rt = rt();
204 
205         let out = rt.block_on(async {
206             let (tx1, rx1) = oneshot::channel();
207             let (tx2, rx2) = oneshot::channel();
208 
209             tokio::spawn(async move {
210                 assert_ok!(tx1.send("ZOMG"));
211             });
212 
213             tokio::spawn(async move {
214                 let msg = assert_ok!(rx1.await);
215                 assert_ok!(tx2.send(msg));
216             });
217 
218             assert_ok!(rx2.await)
219         });
220 
221         assert_eq!(out, "ZOMG");
222     }
223 
224     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
225     #[test]
226     fn spawn_many_from_block_on() {
227         use tokio::sync::mpsc;
228 
229         const ITER: usize = 200;
230 
231         let rt = rt();
232 
233         let out = rt.block_on(async {
234             let (done_tx, mut done_rx) = mpsc::unbounded_channel();
235 
236             let mut txs = (0..ITER)
237                 .map(|i| {
238                     let (tx, rx) = oneshot::channel();
239                     let done_tx = done_tx.clone();
240 
241                     tokio::spawn(async move {
242                         let msg = assert_ok!(rx.await);
243                         assert_eq!(i, msg);
244                         assert_ok!(done_tx.send(msg));
245                     });
246 
247                     tx
248                 })
249                 .collect::<Vec<_>>();
250 
251             drop(done_tx);
252 
253             thread::spawn(move || {
254                 for (i, tx) in txs.drain(..).enumerate() {
255                     assert_ok!(tx.send(i));
256                 }
257             });
258 
259             let mut out = vec![];
260             while let Some(i) = done_rx.recv().await {
261                 out.push(i);
262             }
263 
264             out.sort_unstable();
265             out
266         });
267 
268         assert_eq!(ITER, out.len());
269 
270         for i in 0..ITER {
271             assert_eq!(i, out[i]);
272         }
273     }
274 
275     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
276     #[test]
277     fn spawn_many_from_task() {
278         use tokio::sync::mpsc;
279 
280         const ITER: usize = 500;
281 
282         let rt = rt();
283 
284         let out = rt.block_on(async {
285             tokio::spawn(async move {
286                 let (done_tx, mut done_rx) = mpsc::unbounded_channel();
287 
288                 let mut txs = (0..ITER)
289                     .map(|i| {
290                         let (tx, rx) = oneshot::channel();
291                         let done_tx = done_tx.clone();
292 
293                         tokio::spawn(async move {
294                             let msg = assert_ok!(rx.await);
295                             assert_eq!(i, msg);
296                             assert_ok!(done_tx.send(msg));
297                         });
298 
299                         tx
300                     })
301                     .collect::<Vec<_>>();
302 
303                 drop(done_tx);
304 
305                 thread::spawn(move || {
306                     for (i, tx) in txs.drain(..).enumerate() {
307                         assert_ok!(tx.send(i));
308                     }
309                 });
310 
311                 let mut out = vec![];
312                 while let Some(i) = done_rx.recv().await {
313                     out.push(i);
314                 }
315 
316                 out.sort_unstable();
317                 out
318             }).await.unwrap()
319         });
320 
321         assert_eq!(ITER, out.len());
322 
323         for i in 0..ITER {
324             assert_eq!(i, out[i]);
325         }
326     }
327 
328     #[test]
329     fn spawn_one_from_block_on_called_on_handle() {
330         let rt = rt();
331         let (tx, rx) = oneshot::channel();
332 
333         #[allow(clippy::async_yields_async)]
334         let handle = rt.handle().block_on(async {
335             tokio::spawn(async move {
336                 tx.send("ZOMG").unwrap();
337                 "DONE"
338             })
339         });
340 
341         let out = rt.block_on(async {
342             let msg = assert_ok!(rx.await);
343 
344             let out = assert_ok!(handle.await);
345             assert_eq!(out, "DONE");
346 
347             msg
348         });
349 
350         assert_eq!(out, "ZOMG");
351     }
352 
353     #[test]
354     fn spawn_await_chain() {
355         let rt = rt();
356 
357         let out = rt.block_on(async {
358             assert_ok!(tokio::spawn(async {
359                 assert_ok!(tokio::spawn(async {
360                     "hello"
361                 }).await)
362             }).await)
363         });
364 
365         assert_eq!(out, "hello");
366     }
367 
368     #[test]
369     fn outstanding_tasks_dropped() {
370         let rt = rt();
371 
372         let cnt = Arc::new(());
373 
374         rt.block_on(async {
375             let cnt = cnt.clone();
376 
377             tokio::spawn(poll_fn(move |_| {
378                 assert_eq!(2, Arc::strong_count(&cnt));
379                 Poll::<()>::Pending
380             }));
381         });
382 
383         assert_eq!(2, Arc::strong_count(&cnt));
384 
385         drop(rt);
386 
387         assert_eq!(1, Arc::strong_count(&cnt));
388     }
389 
390     #[test]
391     #[should_panic]
392     fn nested_rt() {
393         let rt1 = rt();
394         let rt2 = rt();
395 
396         rt1.block_on(async { rt2.block_on(async { "hello" }) });
397     }
398 
399     #[test]
400     fn create_rt_in_block_on() {
401         let rt1 = rt();
402         let rt2 = rt1.block_on(async { rt() });
403         let out = rt2.block_on(async { "ZOMG" });
404 
405         assert_eq!(out, "ZOMG");
406     }
407 
408     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
409     #[test]
410     fn complete_block_on_under_load() {
411         let rt = rt();
412 
413         rt.block_on(async {
414             let (tx, rx) = oneshot::channel();
415 
416             // Spin hard
417             tokio::spawn(async {
418                 loop {
419                     yield_once().await;
420                 }
421             });
422 
423             thread::spawn(move || {
424                 thread::sleep(Duration::from_millis(50));
425                 assert_ok!(tx.send(()));
426             });
427 
428             assert_ok!(rx.await);
429         });
430     }
431 
432     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
433     #[test]
434     fn complete_task_under_load() {
435         let rt = rt();
436 
437         rt.block_on(async {
438             let (tx1, rx1) = oneshot::channel();
439             let (tx2, rx2) = oneshot::channel();
440 
441             // Spin hard
442             tokio::spawn(async {
443                 loop {
444                     yield_once().await;
445                 }
446             });
447 
448             thread::spawn(move || {
449                 thread::sleep(Duration::from_millis(50));
450                 assert_ok!(tx1.send(()));
451             });
452 
453             tokio::spawn(async move {
454                 assert_ok!(rx1.await);
455                 assert_ok!(tx2.send(()));
456             });
457 
458             assert_ok!(rx2.await);
459         });
460     }
461 
462     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
463     #[test]
464     fn spawn_from_other_thread_idle() {
465         let rt = rt();
466         let handle = rt.clone();
467 
468         let (tx, rx) = oneshot::channel();
469 
470         thread::spawn(move || {
471             thread::sleep(Duration::from_millis(50));
472 
473             handle.spawn(async move {
474                 assert_ok!(tx.send(()));
475             });
476         });
477 
478         rt.block_on(async move {
479             assert_ok!(rx.await);
480         });
481     }
482 
483     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
484     #[test]
485     fn spawn_from_other_thread_under_load() {
486         let rt = rt();
487         let handle = rt.clone();
488 
489         let (tx, rx) = oneshot::channel();
490 
491         thread::spawn(move || {
492             handle.spawn(async move {
493                 assert_ok!(tx.send(()));
494             });
495         });
496 
497         rt.block_on(async move {
498             // Spin hard
499             tokio::spawn(async {
500                 loop {
501                     yield_once().await;
502                 }
503             });
504 
505             assert_ok!(rx.await);
506         });
507     }
508 
509     #[test]
510     fn sleep_at_root() {
511         let rt = rt();
512 
513         let now = Instant::now();
514         let dur = Duration::from_millis(50);
515 
516         rt.block_on(async move {
517             time::sleep(dur).await;
518         });
519 
520         assert!(now.elapsed() >= dur);
521     }
522 
523     #[test]
524     fn sleep_in_spawn() {
525         let rt = rt();
526 
527         let now = Instant::now();
528         let dur = Duration::from_millis(50);
529 
530         rt.block_on(async move {
531             let (tx, rx) = oneshot::channel();
532 
533             tokio::spawn(async move {
534                 time::sleep(dur).await;
535                 assert_ok!(tx.send(()));
536             });
537 
538             assert_ok!(rx.await);
539         });
540 
541         assert!(now.elapsed() >= dur);
542     }
543 
544     #[cfg(not(target_os="wasi"))] // Wasi does not support bind
545     #[cfg_attr(miri, ignore)] // No `socket` in miri.
546     #[test]
547     fn block_on_socket() {
548         let rt = rt();
549 
550         rt.block_on(async move {
551             let (tx, rx) = oneshot::channel();
552 
553             let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
554             let addr = listener.local_addr().unwrap();
555 
556             tokio::spawn(async move {
557                 let _ = listener.accept().await;
558                 tx.send(()).unwrap();
559             });
560 
561             TcpStream::connect(&addr).await.unwrap();
562             rx.await.unwrap();
563         });
564     }
565 
566     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
567     #[test]
568     fn spawn_from_blocking() {
569         let rt = rt();
570 
571         let out = rt.block_on(async move {
572             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
573                 tokio::spawn(async move { "hello" })
574             }).await);
575 
576             assert_ok!(inner.await)
577         });
578 
579         assert_eq!(out, "hello")
580     }
581 
582     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
583     #[test]
584     fn spawn_blocking_from_blocking() {
585         let rt = rt();
586 
587         let out = rt.block_on(async move {
588             let inner = assert_ok!(tokio::task::spawn_blocking(|| {
589                 tokio::task::spawn_blocking(|| "hello")
590             }).await);
591 
592             assert_ok!(inner.await)
593         });
594 
595         assert_eq!(out, "hello")
596     }
597 
598     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
599     #[test]
600     fn sleep_from_blocking() {
601         let rt = rt();
602 
603         rt.block_on(async move {
604             assert_ok!(tokio::task::spawn_blocking(|| {
605                 let now = std::time::Instant::now();
606                 let dur = Duration::from_millis(1);
607 
608                 // use the futures' block_on fn to make sure we aren't setting
609                 // any Tokio context
610                 futures::executor::block_on(async {
611                     tokio::time::sleep(dur).await;
612                 });
613 
614                 assert!(now.elapsed() >= dur);
615             }).await);
616         });
617     }
618 
619     #[cfg(not(target_os="wasi"))] // Wasi does not support bind
620     #[cfg_attr(miri, ignore)] // No `socket` in miri.
621     #[test]
622     fn socket_from_blocking() {
623         let rt = rt();
624 
625         rt.block_on(async move {
626             let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
627             let addr = assert_ok!(listener.local_addr());
628 
629             let peer = tokio::task::spawn_blocking(move || {
630                 // use the futures' block_on fn to make sure we aren't setting
631                 // any Tokio context
632                 futures::executor::block_on(async {
633                     assert_ok!(TcpStream::connect(addr).await);
634                 });
635             });
636 
637             // Wait for the client to connect
638             let _ = assert_ok!(listener.accept().await);
639 
640             assert_ok!(peer.await);
641         });
642     }
643 
644     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
645     #[test]
646     fn always_active_parker() {
647         // This test it to show that we will always have
648         // an active parker even if we call block_on concurrently
649 
650         let rt = rt();
651         let rt2 = rt.clone();
652 
653         let (tx1, rx1) = oneshot::channel();
654         let (tx2, rx2) = oneshot::channel();
655 
656         let jh1 = thread::spawn(move || {
657                 rt.block_on(async move {
658                     rx2.await.unwrap();
659                     time::sleep(Duration::from_millis(5)).await;
660                     tx1.send(()).unwrap();
661                 });
662         });
663 
664         let jh2 = thread::spawn(move || {
665             rt2.block_on(async move {
666                 tx2.send(()).unwrap();
667                 time::sleep(Duration::from_millis(5)).await;
668                 rx1.await.unwrap();
669                 time::sleep(Duration::from_millis(5)).await;
670             });
671         });
672 
673         jh1.join().unwrap();
674         jh2.join().unwrap();
675     }
676 
677     #[test]
678     // IOCP requires setting the "max thread" concurrency value. The sane,
679     // default, is to set this to the number of cores. Threads that poll I/O
680     // become associated with the IOCP handle. Once those threads sleep for any
681     // reason (mutex), they yield their ownership.
682     //
683     // This test hits an edge case on windows where more threads than cores are
684     // created, none of those threads ever yield due to being at capacity, so
685     // IOCP gets "starved".
686     //
687     // For now, this is a very edge case that is probably not a real production
688     // concern. There also isn't a great/obvious solution to take. For now, the
689     // test is disabled.
690     #[cfg(not(windows))]
691     #[cfg_attr(miri, ignore)] // No `socket` in miri.
692     #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
693     fn io_driver_called_when_under_load() {
694         let rt = rt();
695 
696         // Create a lot of constant load. The scheduler will always be busy.
697         for _ in 0..100 {
698             rt.spawn(async {
699                 loop {
700                     // Don't use Tokio's `yield_now()` to avoid special defer
701                     // logic.
702                     std::future::poll_fn::<(), _>(|cx| {
703                         cx.waker().wake_by_ref();
704                         std::task::Poll::Pending
705                     }).await;
706                 }
707             });
708         }
709 
710         // Do some I/O work
711         rt.block_on(async {
712             let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
713             let addr = assert_ok!(listener.local_addr());
714 
715             let srv = tokio::spawn(async move {
716                 let (mut stream, _) = assert_ok!(listener.accept().await);
717                 assert_ok!(stream.write_all(b"hello world").await);
718             });
719 
720             let cli = tokio::spawn(async move {
721                 let mut stream = assert_ok!(TcpStream::connect(addr).await);
722                 let mut dst = vec![0; 11];
723 
724                 assert_ok!(stream.read_exact(&mut dst).await);
725                 assert_eq!(dst, b"hello world");
726             });
727 
728             assert_ok!(srv.await);
729             assert_ok!(cli.await);
730         });
731     }
732 
733     /// Tests that yielded tasks are not scheduled until **after** resource
734     /// drivers are polled.
735     ///
736     /// The OS does not guarantee when I/O events are delivered, so there may be
737     /// more yields than anticipated. This makes the test slightly flaky. To
738     /// help avoid flakiness, we run the test 10 times and only fail it after
739     /// 10 failures in a row.
740     ///
741     /// Note that if the test fails by panicking rather than by returning false,
742     /// then we fail it immediately. That kind of failure should not happen
743     /// spuriously.
744     #[test]
745     #[cfg(not(target_os="wasi"))]
746     #[cfg_attr(miri, ignore)] // No `socket` in miri.
747     fn yield_defers_until_park() {
748         for _ in 0..10 {
749             if yield_defers_until_park_inner() {
750                 // test passed
751                 return;
752             }
753 
754             // Wait a bit and run the test again.
755             std::thread::sleep(std::time::Duration::from_secs(2));
756         }
757 
758         panic!("yield_defers_until_park is failing consistently");
759     }
760 
761     /// Implementation of `yield_defers_until_park` test. Returns `true` if the
762     /// test passed.
763     #[cfg(not(target_os="wasi"))]
764     fn yield_defers_until_park_inner() -> bool {
765         use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
766         use std::sync::Barrier;
767 
768         let rt = rt();
769 
770         let flag = Arc::new(AtomicBool::new(false));
771         let barrier = Arc::new(Barrier::new(NUM_WORKERS));
772 
773         rt.block_on(async {
774             // Make sure other workers cannot steal tasks
775             #[allow(clippy::reversed_empty_ranges)]
776             for _ in 0..(NUM_WORKERS-1) {
777                 let flag = flag.clone();
778                 let barrier = barrier.clone();
779 
780                 tokio::spawn(async move {
781                     barrier.wait();
782 
783                     while !flag.load(SeqCst) {
784                         std::thread::sleep(std::time::Duration::from_millis(1));
785                     }
786                 });
787             }
788 
789             barrier.wait();
790 
791             let (fail_test, fail_test_recv) = oneshot::channel::<()>();
792             let flag_clone = flag.clone();
793             let jh = tokio::spawn(async move {
794                 // Create a TCP listener
795                 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
796                 let addr = listener.local_addr().unwrap();
797 
798                 tokio::join!(
799                     async {
800                         // Done in a blocking manner intentionally.
801                         let _socket = std::net::TcpStream::connect(addr).unwrap();
802 
803                         // Yield until connected
804                         let mut cnt = 0;
805                         while !flag_clone.load(SeqCst){
806                             tokio::task::yield_now().await;
807                             cnt += 1;
808 
809                             if cnt >= 10 {
810                                 // yielded too many times; report failure and
811                                 // sleep forever so that the `fail_test` branch
812                                 // of the `select!` below triggers.
813                                 let _ = fail_test.send(());
814                                 futures::future::pending::<()>().await;
815                                 break;
816                             }
817                         }
818                     },
819                     async {
820                         let _ = listener.accept().await.unwrap();
821                         flag_clone.store(true, SeqCst);
822                     }
823                 );
824             });
825 
826             // Wait until the spawned task completes or fails. If no message is
827             // sent on `fail_test`, then the test succeeds. Otherwise, it fails.
828             let success = fail_test_recv.await.is_err();
829 
830             if success {
831                 // Setting flag to true ensures that the tasks we spawned at
832                 // the beginning of the test will exit.
833                 // If we don't do this, the test will hang since the runtime waits
834                 // for all spawned tasks to finish when dropping.
835                 flag.store(true, SeqCst);
836                 // Check for panics in spawned task.
837                 jh.abort();
838                 jh.await.unwrap();
839             }
840 
841             success
842         })
843     }
844 
845     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
846     #[cfg_attr(miri, ignore)] // No `socket` in miri.
847     #[test]
848     fn client_server_block_on() {
849         let rt = rt();
850         let (tx, rx) = mpsc::channel();
851 
852         rt.block_on(async move { client_server(tx).await });
853 
854         assert_ok!(rx.try_recv());
855         assert_err!(rx.try_recv());
856     }
857 
858     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads or panic recovery")]
859     #[cfg(panic = "unwind")]
860     #[test]
861     fn panic_in_task() {
862         let rt = rt();
863         let (tx, rx) = oneshot::channel();
864 
865         struct Boom(Option<oneshot::Sender<()>>);
866 
867         impl Future for Boom {
868             type Output = ();
869 
870             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
871                 panic!();
872             }
873         }
874 
875         impl Drop for Boom {
876             fn drop(&mut self) {
877                 assert!(std::thread::panicking());
878                 self.0.take().unwrap().send(()).unwrap();
879             }
880         }
881 
882         rt.spawn(Boom(Some(tx)));
883         assert_ok!(rt.block_on(rx));
884     }
885 
886     #[test]
887     #[should_panic]
888     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
889     fn panic_in_block_on() {
890         let rt = rt();
891         rt.block_on(async { panic!() });
892     }
893 
894     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
895     async fn yield_once() {
896         let mut yielded = false;
897         poll_fn(|cx| {
898             if yielded {
899                 Poll::Ready(())
900             } else {
901                 yielded = true;
902                 cx.waker().wake_by_ref();
903                 Poll::Pending
904             }
905         })
906         .await
907     }
908 
909     #[test]
910     fn enter_and_spawn() {
911         let rt = rt();
912         let handle = {
913             let _enter = rt.enter();
914             tokio::spawn(async {})
915         };
916 
917         assert_ok!(rt.block_on(handle));
918     }
919 
920     #[test]
921     fn eagerly_drops_futures_on_shutdown() {
922         use std::sync::mpsc;
923 
924         struct Never {
925             drop_tx: mpsc::Sender<()>,
926         }
927 
928         impl Future for Never {
929             type Output = ();
930 
931             fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
932                 Poll::Pending
933             }
934         }
935 
936         impl Drop for Never {
937             fn drop(&mut self) {
938                 self.drop_tx.send(()).unwrap();
939             }
940         }
941 
942         let rt = rt();
943 
944         let (drop_tx, drop_rx) = mpsc::channel();
945         let (run_tx, run_rx) = oneshot::channel();
946 
947         rt.block_on(async move {
948             tokio::spawn(async move {
949                 assert_ok!(run_tx.send(()));
950 
951                 Never { drop_tx }.await
952             });
953 
954             assert_ok!(run_rx.await);
955         });
956 
957         drop(rt);
958 
959         assert_ok!(drop_rx.recv());
960     }
961 
962     #[test]
963     fn wake_while_rt_is_dropping() {
964         use tokio::sync::Barrier;
965 
966         struct OnDrop<F: FnMut()>(F);
967 
968         impl<F: FnMut()> Drop for OnDrop<F> {
969             fn drop(&mut self) {
970                 (self.0)()
971             }
972         }
973 
974         let (tx1, rx1) = oneshot::channel();
975         let (tx2, rx2) = oneshot::channel();
976 
977         let barrier = Arc::new(Barrier::new(3));
978         let barrier1 = barrier.clone();
979         let barrier2 = barrier.clone();
980 
981         let rt = rt();
982 
983         rt.spawn(async move {
984             let mut tx2 = Some(tx2);
985             let _d = OnDrop(move || {
986                 let _ = tx2.take().unwrap().send(());
987             });
988 
989             // Ensure a waker gets stored in oneshot 1.
990             let _ = tokio::join!(rx1, barrier1.wait());
991         });
992 
993         rt.spawn(async move {
994             let mut tx1 = Some(tx1);
995             let _d = OnDrop(move || {
996                 let _ = tx1.take().unwrap().send(());
997             });
998 
999             // Ensure a waker gets stored in oneshot 2.
1000             let _ = tokio::join!(rx2, barrier2.wait());
1001         });
1002 
1003         // Wait until every oneshot channel has been polled.
1004         rt.block_on(barrier.wait());
1005 
1006         // Drop the rt. Regardless of which task is dropped first, its destructor will wake the
1007         // other task.
1008         drop(rt);
1009     }
1010 
1011     #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
1012     #[cfg_attr(miri, ignore)] // No `socket` in miri.
1013     #[test]
1014     fn io_notify_while_shutting_down() {
1015         use tokio::net::UdpSocket;
1016         use std::sync::Arc;
1017 
1018         for _ in 1..10 {
1019             let runtime = rt();
1020 
1021             runtime.block_on(async {
1022                 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
1023                 let addr = socket.local_addr().unwrap();
1024                 let send_half = Arc::new(socket);
1025                 let recv_half = send_half.clone();
1026 
1027                 tokio::spawn(async move {
1028                     let mut buf = [0];
1029                     loop {
1030                         recv_half.recv_from(&mut buf).await.unwrap();
1031                         std::thread::sleep(Duration::from_millis(2));
1032                     }
1033                 });
1034 
1035                 tokio::spawn(async move {
1036                     let buf = [0];
1037                     loop {
1038                         send_half.send_to(&buf, &addr).await.unwrap();
1039                         tokio::time::sleep(Duration::from_millis(1)).await;
1040                     }
1041                 });
1042 
1043                 tokio::time::sleep(Duration::from_millis(5)).await;
1044             });
1045         }
1046     }
1047 
1048     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1049     #[test]
1050     fn shutdown_timeout() {
1051         let (tx, rx) = oneshot::channel();
1052         let runtime = rt();
1053 
1054         runtime.block_on(async move {
1055             task::spawn_blocking(move || {
1056                 tx.send(()).unwrap();
1057                 thread::sleep(Duration::from_secs(10_000));
1058             });
1059 
1060             rx.await.unwrap();
1061         });
1062 
1063         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
1064     }
1065 
1066     #[cfg(not(target_os="wasi"))] // Wasi does not support threads
1067     #[test]
1068     fn shutdown_timeout_0() {
1069         let runtime = rt();
1070 
1071         runtime.block_on(async move {
1072             task::spawn_blocking(move || {
1073                 thread::sleep(Duration::from_secs(10_000));
1074             });
1075         });
1076 
1077         let now = Instant::now();
1078         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
1079         assert!(now.elapsed().as_secs() < 1);
1080     }
1081 
1082     #[test]
1083     fn shutdown_wakeup_time() {
1084         let runtime = rt();
1085 
1086         runtime.block_on(async move {
1087             tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1088         });
1089 
1090         Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
1091     }
1092 
1093     // This test is currently ignored on Windows because of a
1094     // rust-lang issue in thread local storage destructors.
1095     // See https://github.com/rust-lang/rust/issues/74875
1096     #[test]
1097     #[cfg(not(windows))]
1098     #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads")]
1099     fn runtime_in_thread_local() {
1100         use std::cell::RefCell;
1101         use std::thread;
1102 
1103         thread_local!(
1104             static R: RefCell<Option<Runtime>> = const { RefCell::new(None) };
1105         );
1106 
1107         thread::spawn(|| {
1108             R.with(|cell| {
1109                 let rt = rt();
1110                 let rt = Arc::try_unwrap(rt).unwrap();
1111                 *cell.borrow_mut() = Some(rt);
1112             });
1113 
1114             let _rt = rt();
1115         }).join().unwrap();
1116     }
1117 
1118     #[cfg(not(target_os="wasi"))] // Wasi does not support bind
1119     async fn client_server(tx: mpsc::Sender<()>) {
1120         let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1121 
1122         // Get the assigned address
1123         let addr = assert_ok!(server.local_addr());
1124 
1125         // Spawn the server
1126         tokio::spawn(async move {
1127             // Accept a socket
1128             let (mut socket, _) = server.accept().await.unwrap();
1129 
1130             // Write some data
1131             socket.write_all(b"hello").await.unwrap();
1132         });
1133 
1134         let mut client = TcpStream::connect(&addr).await.unwrap();
1135 
1136         let mut buf = vec![];
1137         client.read_to_end(&mut buf).await.unwrap();
1138 
1139         assert_eq!(buf, b"hello");
1140         tx.send(()).unwrap();
1141     }
1142 
1143     #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1144     #[cfg_attr(miri, ignore)] // No `socket` in miri.
1145     #[test]
1146     fn local_set_block_on_socket() {
1147         let rt = rt();
1148         let local = task::LocalSet::new();
1149 
1150         local.block_on(&rt, async move {
1151             let (tx, rx) = oneshot::channel();
1152 
1153             let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1154             let addr = listener.local_addr().unwrap();
1155 
1156             task::spawn_local(async move {
1157                 let _ = listener.accept().await;
1158                 tx.send(()).unwrap();
1159             });
1160 
1161             TcpStream::connect(&addr).await.unwrap();
1162             rx.await.unwrap();
1163         });
1164     }
1165 
1166     #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1167     #[cfg_attr(miri, ignore)] // No `socket` in miri.
1168     #[test]
1169     fn local_set_client_server_block_on() {
1170         let rt = rt();
1171         let (tx, rx) = mpsc::channel();
1172 
1173         let local = task::LocalSet::new();
1174 
1175         local.block_on(&rt, async move { client_server_local(tx).await });
1176 
1177         assert_ok!(rx.try_recv());
1178         assert_err!(rx.try_recv());
1179     }
1180 
1181     #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
1182     async fn client_server_local(tx: mpsc::Sender<()>) {
1183         let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
1184 
1185         // Get the assigned address
1186         let addr = assert_ok!(server.local_addr());
1187 
1188         // Spawn the server
1189         task::spawn_local(async move {
1190             // Accept a socket
1191             let (mut socket, _) = server.accept().await.unwrap();
1192 
1193             // Write some data
1194             socket.write_all(b"hello").await.unwrap();
1195         });
1196 
1197         let mut client = TcpStream::connect(&addr).await.unwrap();
1198 
1199         let mut buf = vec![];
1200         client.read_to_end(&mut buf).await.unwrap();
1201 
1202         assert_eq!(buf, b"hello");
1203         tx.send(()).unwrap();
1204     }
1205 
1206     #[test]
1207     fn coop() {
1208         use std::task::Poll::Ready;
1209         use tokio::sync::mpsc;
1210 
1211         let rt = rt();
1212 
1213         rt.block_on(async {
1214             let (send, mut recv) = mpsc::unbounded_channel();
1215 
1216             // Send a bunch of messages.
1217             for _ in 0..1_000 {
1218                 send.send(()).unwrap();
1219             }
1220 
1221             poll_fn(|cx| {
1222                 // At least one response should return pending.
1223                 for _ in 0..1_000 {
1224                     if recv.poll_recv(cx).is_pending() {
1225                         return Ready(());
1226                     }
1227                 }
1228 
1229                 panic!("did not yield");
1230             }).await;
1231         });
1232     }
1233 
1234     #[test]
1235     fn coop_unconstrained() {
1236         use std::task::Poll::Ready;
1237         use tokio::sync::mpsc;
1238 
1239         let rt = rt();
1240 
1241         rt.block_on(async {
1242             let (send, mut recv) = mpsc::unbounded_channel();
1243 
1244             // Send a bunch of messages.
1245             for _ in 0..1_000 {
1246                 send.send(()).unwrap();
1247             }
1248 
1249             tokio::task::unconstrained(poll_fn(|cx| {
1250                 // All the responses should be ready.
1251                 for _ in 0..1_000 {
1252                     assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
1253                 }
1254 
1255                 Ready(())
1256             })).await;
1257         });
1258     }
1259 
1260     #[cfg(tokio_unstable)]
1261     #[test]
1262     fn coop_consume_budget() {
1263         let rt = rt();
1264 
1265         rt.block_on(async {
1266             poll_fn(|cx| {
1267                 let counter = Arc::new(std::sync::Mutex::new(0));
1268                 let counter_clone = Arc::clone(&counter);
1269                 let mut worker = Box::pin(async move {
1270                     // Consume the budget until a yield happens
1271                     for _ in 0..1000 {
1272                         *counter.lock().unwrap() += 1;
1273                         task::consume_budget().await
1274                     }
1275                 });
1276                 // Assert that the worker was yielded and it didn't manage
1277                 // to finish the whole work (assuming the total budget of 128)
1278                 assert!(Pin::new(&mut worker).poll(cx).is_pending());
1279                 assert!(*counter_clone.lock().unwrap() < 1000);
1280                 std::task::Poll::Ready(())
1281             }).await;
1282         });
1283     }
1284 
1285     // Tests that the "next task" scheduler optimization is not able to starve
1286     // other tasks.
1287     #[test]
1288     fn ping_pong_saturation() {
1289         use std::sync::atomic::{Ordering, AtomicBool};
1290         use tokio::sync::mpsc;
1291 
1292         const NUM: usize = 100;
1293 
1294         let rt = rt();
1295 
1296         let running = Arc::new(AtomicBool::new(true));
1297 
1298         rt.block_on(async {
1299             let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
1300 
1301             let mut tasks = vec![];
1302             // Spawn a bunch of tasks that ping-pong between each other to
1303             // saturate the runtime.
1304             for _ in 0..NUM {
1305                 let (tx1, mut rx1) = mpsc::unbounded_channel();
1306                 let (tx2, mut rx2) = mpsc::unbounded_channel();
1307                 let spawned_tx = spawned_tx.clone();
1308                 let running = running.clone();
1309                 tasks.push(task::spawn(async move {
1310                     spawned_tx.send(()).unwrap();
1311 
1312 
1313                     while running.load(Ordering::Relaxed) {
1314                         tx1.send(()).unwrap();
1315                         rx2.recv().await.unwrap();
1316                     }
1317 
1318                     // Close the channel and wait for the other task to exit.
1319                     drop(tx1);
1320                     assert!(rx2.recv().await.is_none());
1321                 }));
1322 
1323                 tasks.push(task::spawn(async move {
1324                     while rx1.recv().await.is_some() {
1325                         tx2.send(()).unwrap();
1326                     }
1327                 }));
1328             }
1329 
1330             for _ in 0..NUM {
1331                 spawned_rx.recv().await.unwrap();
1332             }
1333 
1334             // spawn another task and wait for it to complete
1335             let handle = task::spawn(async {
1336                 for _ in 0..5 {
1337                     // Yielding forces it back into the local queue.
1338                     task::yield_now().await;
1339                 }
1340             });
1341             handle.await.unwrap();
1342             running.store(false, Ordering::Relaxed);
1343             for t in tasks {
1344                 t.await.unwrap();
1345             }
1346         });
1347     }
1348 
1349     #[test]
1350     #[cfg(not(target_os="wasi"))]
1351     fn shutdown_concurrent_spawn() {
1352         const NUM_TASKS: usize = 10_000;
1353         for _ in 0..5 {
1354             let (tx, rx) = std::sync::mpsc::channel();
1355             let rt = rt();
1356 
1357             let mut txs = vec![];
1358 
1359             for _ in 0..NUM_TASKS {
1360                 let (tx, rx) = tokio::sync::oneshot::channel();
1361                 txs.push(tx);
1362                 rt.spawn(async move {
1363                     rx.await.unwrap();
1364                 });
1365             }
1366 
1367             // Prime the tasks
1368             rt.block_on(async { tokio::task::yield_now().await });
1369 
1370             let th = std::thread::spawn(move || {
1371                 tx.send(()).unwrap();
1372                 for tx in txs.drain(..) {
1373                     let _ = tx.send(());
1374                 }
1375             });
1376 
1377             rx.recv().unwrap();
1378             drop(rt);
1379 
1380             th.join().unwrap();
1381         }
1382     }
1383 
1384     #[test]
1385     #[cfg_attr(target_family = "wasm", ignore)]
1386     fn wake_by_ref_from_thread_local() {
1387         wake_from_thread_local(true);
1388     }
1389 
1390     #[test]
1391     #[cfg_attr(target_family = "wasm", ignore)]
1392     fn wake_by_val_from_thread_local() {
1393         wake_from_thread_local(false);
1394     }
1395 
1396     fn wake_from_thread_local(by_ref: bool) {
1397         use std::cell::RefCell;
1398         use std::sync::mpsc::{channel, Sender};
1399         use std::task::Waker;
1400 
1401         struct TLData {
1402             by_ref: bool,
1403             waker: Option<Waker>,
1404             done: Sender<bool>,
1405         }
1406 
1407         impl Drop for TLData {
1408             fn drop(&mut self) {
1409                 if self.by_ref {
1410                     self.waker.take().unwrap().wake_by_ref();
1411                 } else {
1412                     self.waker.take().unwrap().wake();
1413                 }
1414                 let _ = self.done.send(true);
1415             }
1416         }
1417 
1418         std::thread_local! {
1419             static TL_DATA: RefCell<Option<TLData>> = const { RefCell::new(None) };
1420         };
1421 
1422         let (send, recv) = channel();
1423 
1424         std::thread::spawn(move || {
1425             let rt = rt();
1426             rt.block_on(rt.spawn(poll_fn(move |cx| {
1427                 let waker = cx.waker().clone();
1428                 let send = send.clone();
1429                 TL_DATA.with(|tl| {
1430                     tl.replace(Some(TLData {
1431                         by_ref,
1432                         waker: Some(waker),
1433                         done: send,
1434                     }));
1435                 });
1436                 Poll::Ready(())
1437             })))
1438             .unwrap();
1439         })
1440         .join()
1441         .unwrap();
1442 
1443         assert!(recv.recv().unwrap());
1444     }
1445 }
1446