1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 // Too slow on miri.
4 #![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))]
5
6 use tokio::io::{AsyncReadExt, AsyncWriteExt};
7 use tokio::net::{TcpListener, TcpStream};
8 use tokio::runtime;
9 use tokio::sync::oneshot;
10 use tokio_test::{assert_err, assert_ok};
11
12 use std::future::{poll_fn, Future};
13 use std::pin::Pin;
14 use std::sync::atomic::Ordering::Relaxed;
15 use std::sync::atomic::{AtomicUsize, Ordering};
16 use std::sync::{mpsc, Arc, Mutex};
17 use std::task::{Context, Poll, Waker};
18
19 macro_rules! cfg_metrics {
20 ($($t:tt)*) => {
21 #[cfg(all(tokio_unstable, target_has_atomic = "64"))]
22 {
23 $( $t )*
24 }
25 }
26 }
27
28 #[test]
single_thread()29 fn single_thread() {
30 // No panic when starting a runtime w/ a single thread
31 let _ = runtime::Builder::new_multi_thread()
32 .enable_all()
33 .worker_threads(1)
34 .build()
35 .unwrap();
36 }
37
38 #[test]
many_oneshot_futures()39 fn many_oneshot_futures() {
40 // used for notifying the main thread
41 const NUM: usize = 1_000;
42
43 for _ in 0..5 {
44 let (tx, rx) = mpsc::channel();
45
46 let rt = rt();
47 let cnt = Arc::new(AtomicUsize::new(0));
48
49 for _ in 0..NUM {
50 let cnt = cnt.clone();
51 let tx = tx.clone();
52
53 rt.spawn(async move {
54 let num = cnt.fetch_add(1, Relaxed) + 1;
55
56 if num == NUM {
57 tx.send(()).unwrap();
58 }
59 });
60 }
61
62 rx.recv().unwrap();
63
64 // Wait for the pool to shutdown
65 drop(rt);
66 }
67 }
68
69 #[test]
spawn_two()70 fn spawn_two() {
71 let rt = rt();
72
73 let out = rt.block_on(async {
74 let (tx, rx) = oneshot::channel();
75
76 tokio::spawn(async move {
77 tokio::spawn(async move {
78 tx.send("ZOMG").unwrap();
79 });
80 });
81
82 assert_ok!(rx.await)
83 });
84
85 assert_eq!(out, "ZOMG");
86
87 cfg_metrics! {
88 let metrics = rt.metrics();
89 drop(rt);
90 assert_eq!(1, metrics.remote_schedule_count());
91
92 let mut local = 0;
93 for i in 0..metrics.num_workers() {
94 local += metrics.worker_local_schedule_count(i);
95 }
96
97 assert_eq!(1, local);
98 }
99 }
100
101 #[test]
many_multishot_futures()102 fn many_multishot_futures() {
103 const CHAIN: usize = 200;
104 const CYCLES: usize = 5;
105 const TRACKS: usize = 50;
106
107 for _ in 0..50 {
108 let rt = rt();
109 let mut start_txs = Vec::with_capacity(TRACKS);
110 let mut final_rxs = Vec::with_capacity(TRACKS);
111
112 for _ in 0..TRACKS {
113 let (start_tx, mut chain_rx) = tokio::sync::mpsc::channel(10);
114
115 for _ in 0..CHAIN {
116 let (next_tx, next_rx) = tokio::sync::mpsc::channel(10);
117
118 // Forward all the messages
119 rt.spawn(async move {
120 while let Some(v) = chain_rx.recv().await {
121 next_tx.send(v).await.unwrap();
122 }
123 });
124
125 chain_rx = next_rx;
126 }
127
128 // This final task cycles if needed
129 let (final_tx, final_rx) = tokio::sync::mpsc::channel(10);
130 let cycle_tx = start_tx.clone();
131 let mut rem = CYCLES;
132
133 rt.spawn(async move {
134 for _ in 0..CYCLES {
135 let msg = chain_rx.recv().await.unwrap();
136
137 rem -= 1;
138
139 if rem == 0 {
140 final_tx.send(msg).await.unwrap();
141 } else {
142 cycle_tx.send(msg).await.unwrap();
143 }
144 }
145 });
146
147 start_txs.push(start_tx);
148 final_rxs.push(final_rx);
149 }
150
151 {
152 rt.block_on(async move {
153 for start_tx in start_txs {
154 start_tx.send("ping").await.unwrap();
155 }
156
157 for mut final_rx in final_rxs {
158 final_rx.recv().await.unwrap();
159 }
160 });
161 }
162 }
163 }
164
165 #[test]
lifo_slot_budget()166 fn lifo_slot_budget() {
167 async fn my_fn() {
168 spawn_another();
169 }
170
171 fn spawn_another() {
172 tokio::spawn(my_fn());
173 }
174
175 let rt = runtime::Builder::new_multi_thread()
176 .enable_all()
177 .worker_threads(1)
178 .build()
179 .unwrap();
180
181 let (send, recv) = oneshot::channel();
182
183 rt.spawn(async move {
184 tokio::spawn(my_fn());
185 let _ = send.send(());
186 });
187
188 let _ = rt.block_on(recv);
189 }
190
191 #[test]
192 #[cfg_attr(miri, ignore)] // No `socket` in miri.
spawn_shutdown()193 fn spawn_shutdown() {
194 let rt = rt();
195 let (tx, rx) = mpsc::channel();
196
197 rt.block_on(async {
198 tokio::spawn(client_server(tx.clone()));
199 });
200
201 // Use spawner
202 rt.spawn(client_server(tx));
203
204 assert_ok!(rx.recv());
205 assert_ok!(rx.recv());
206
207 drop(rt);
208 assert_err!(rx.try_recv());
209 }
210
client_server(tx: mpsc::Sender<()>)211 async fn client_server(tx: mpsc::Sender<()>) {
212 let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
213
214 // Get the assigned address
215 let addr = assert_ok!(server.local_addr());
216
217 // Spawn the server
218 tokio::spawn(async move {
219 // Accept a socket
220 let (mut socket, _) = server.accept().await.unwrap();
221
222 // Write some data
223 socket.write_all(b"hello").await.unwrap();
224 });
225
226 let mut client = TcpStream::connect(&addr).await.unwrap();
227
228 let mut buf = vec![];
229 client.read_to_end(&mut buf).await.unwrap();
230
231 assert_eq!(buf, b"hello");
232 tx.send(()).unwrap();
233 }
234
235 #[test]
drop_threadpool_drops_futures()236 fn drop_threadpool_drops_futures() {
237 for _ in 0..1_000 {
238 let num_inc = Arc::new(AtomicUsize::new(0));
239 let num_dec = Arc::new(AtomicUsize::new(0));
240 let num_drop = Arc::new(AtomicUsize::new(0));
241
242 struct Never(Arc<AtomicUsize>);
243
244 impl Future for Never {
245 type Output = ();
246
247 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
248 Poll::Pending
249 }
250 }
251
252 impl Drop for Never {
253 fn drop(&mut self) {
254 self.0.fetch_add(1, Relaxed);
255 }
256 }
257
258 let a = num_inc.clone();
259 let b = num_dec.clone();
260
261 let rt = runtime::Builder::new_multi_thread()
262 .enable_all()
263 .on_thread_start(move || {
264 a.fetch_add(1, Relaxed);
265 })
266 .on_thread_stop(move || {
267 b.fetch_add(1, Relaxed);
268 })
269 .build()
270 .unwrap();
271
272 rt.spawn(Never(num_drop.clone()));
273
274 // Wait for the pool to shutdown
275 drop(rt);
276
277 // Assert that only a single thread was spawned.
278 let a = num_inc.load(Relaxed);
279 assert!(a >= 1);
280
281 // Assert that all threads shutdown
282 let b = num_dec.load(Relaxed);
283 assert_eq!(a, b);
284
285 // Assert that the future was dropped
286 let c = num_drop.load(Relaxed);
287 assert_eq!(c, 1);
288 }
289 }
290
291 #[test]
start_stop_callbacks_called()292 fn start_stop_callbacks_called() {
293 use std::sync::atomic::{AtomicUsize, Ordering};
294
295 let after_start = Arc::new(AtomicUsize::new(0));
296 let before_stop = Arc::new(AtomicUsize::new(0));
297
298 let after_inner = after_start.clone();
299 let before_inner = before_stop.clone();
300 let rt = tokio::runtime::Builder::new_multi_thread()
301 .enable_all()
302 .on_thread_start(move || {
303 after_inner.clone().fetch_add(1, Ordering::Relaxed);
304 })
305 .on_thread_stop(move || {
306 before_inner.clone().fetch_add(1, Ordering::Relaxed);
307 })
308 .build()
309 .unwrap();
310
311 let (tx, rx) = oneshot::channel();
312
313 rt.spawn(async move {
314 assert_ok!(tx.send(()));
315 });
316
317 assert_ok!(rt.block_on(rx));
318
319 drop(rt);
320
321 assert!(after_start.load(Ordering::Relaxed) > 0);
322 assert!(before_stop.load(Ordering::Relaxed) > 0);
323 }
324
325 #[test]
326 // too slow on miri
327 #[cfg_attr(miri, ignore)]
blocking()328 fn blocking() {
329 // used for notifying the main thread
330 const NUM: usize = 1_000;
331
332 for _ in 0..10 {
333 let (tx, rx) = mpsc::channel();
334
335 let rt = rt();
336 let cnt = Arc::new(AtomicUsize::new(0));
337
338 // there are four workers in the pool
339 // so, if we run 4 blocking tasks, we know that handoff must have happened
340 let block = Arc::new(std::sync::Barrier::new(5));
341 for _ in 0..4 {
342 let block = block.clone();
343 rt.spawn(async move {
344 tokio::task::block_in_place(move || {
345 block.wait();
346 block.wait();
347 })
348 });
349 }
350 block.wait();
351
352 for _ in 0..NUM {
353 let cnt = cnt.clone();
354 let tx = tx.clone();
355
356 rt.spawn(async move {
357 let num = cnt.fetch_add(1, Relaxed) + 1;
358
359 if num == NUM {
360 tx.send(()).unwrap();
361 }
362 });
363 }
364
365 rx.recv().unwrap();
366
367 // Wait for the pool to shutdown
368 block.wait();
369 }
370 }
371
372 #[test]
multi_threadpool()373 fn multi_threadpool() {
374 use tokio::sync::oneshot;
375
376 let rt1 = rt();
377 let rt2 = rt();
378
379 let (tx, rx) = oneshot::channel();
380 let (done_tx, done_rx) = mpsc::channel();
381
382 rt2.spawn(async move {
383 rx.await.unwrap();
384 done_tx.send(()).unwrap();
385 });
386
387 rt1.spawn(async move {
388 tx.send(()).unwrap();
389 });
390
391 done_rx.recv().unwrap();
392 }
393
394 // When `block_in_place` returns, it attempts to reclaim the yielded runtime
395 // worker. In this case, the remainder of the task is on the runtime worker and
396 // must take part in the cooperative task budgeting system.
397 //
398 // The test ensures that, when this happens, attempting to consume from a
399 // channel yields occasionally even if there are values ready to receive.
400 #[test]
coop_and_block_in_place()401 fn coop_and_block_in_place() {
402 let rt = tokio::runtime::Builder::new_multi_thread()
403 // Setting max threads to 1 prevents another thread from claiming the
404 // runtime worker yielded as part of `block_in_place` and guarantees the
405 // same thread will reclaim the worker at the end of the
406 // `block_in_place` call.
407 .max_blocking_threads(1)
408 .build()
409 .unwrap();
410
411 rt.block_on(async move {
412 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
413
414 // Fill the channel
415 for _ in 0..1024 {
416 tx.send(()).await.unwrap();
417 }
418
419 drop(tx);
420
421 tokio::spawn(async move {
422 // Block in place without doing anything
423 tokio::task::block_in_place(|| {});
424
425 // Receive all the values, this should trigger a `Pending` as the
426 // coop limit will be reached.
427 poll_fn(|cx| {
428 while let Poll::Ready(v) = {
429 tokio::pin! {
430 let fut = rx.recv();
431 }
432
433 Pin::new(&mut fut).poll(cx)
434 } {
435 if v.is_none() {
436 panic!("did not yield");
437 }
438 }
439
440 Poll::Ready(())
441 })
442 .await
443 })
444 .await
445 .unwrap();
446 });
447 }
448
449 #[test]
yield_after_block_in_place()450 fn yield_after_block_in_place() {
451 let rt = tokio::runtime::Builder::new_multi_thread()
452 .worker_threads(1)
453 .build()
454 .unwrap();
455
456 rt.block_on(async {
457 tokio::spawn(async move {
458 // Block in place then enter a new runtime
459 tokio::task::block_in_place(|| {
460 let rt = tokio::runtime::Builder::new_current_thread()
461 .build()
462 .unwrap();
463
464 rt.block_on(async {});
465 });
466
467 // Yield, then complete
468 tokio::task::yield_now().await;
469 })
470 .await
471 .unwrap()
472 });
473 }
474
475 // Testing this does not panic
476 #[test]
max_blocking_threads()477 fn max_blocking_threads() {
478 let _rt = tokio::runtime::Builder::new_multi_thread()
479 .max_blocking_threads(1)
480 .build()
481 .unwrap();
482 }
483
484 #[test]
485 #[should_panic]
max_blocking_threads_set_to_zero()486 fn max_blocking_threads_set_to_zero() {
487 let _rt = tokio::runtime::Builder::new_multi_thread()
488 .max_blocking_threads(0)
489 .build()
490 .unwrap();
491 }
492
493 /// Regression test for #6445.
494 ///
495 /// After #6445, setting `global_queue_interval` to 1 is now technically valid.
496 /// This test confirms that there is no regression in `multi_thread_runtime`
497 /// when global_queue_interval is set to 1.
498 #[test]
global_queue_interval_set_to_one()499 fn global_queue_interval_set_to_one() {
500 let rt = tokio::runtime::Builder::new_multi_thread()
501 .global_queue_interval(1)
502 .build()
503 .unwrap();
504
505 // Perform a simple work.
506 let cnt = Arc::new(AtomicUsize::new(0));
507 rt.block_on(async {
508 let mut set = tokio::task::JoinSet::new();
509 for _ in 0..10 {
510 let cnt = cnt.clone();
511 set.spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) });
512 }
513
514 while let Some(res) = set.join_next().await {
515 res.unwrap();
516 }
517 });
518 assert_eq!(cnt.load(Relaxed), 10);
519 }
520
521 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
hang_on_shutdown()522 async fn hang_on_shutdown() {
523 let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
524 tokio::spawn(async move {
525 tokio::task::block_in_place(|| sync_rx.recv().ok());
526 });
527
528 tokio::spawn(async {
529 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
530 drop(sync_tx);
531 });
532 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
533 }
534
535 /// Demonstrates tokio-rs/tokio#3869
536 #[test]
wake_during_shutdown()537 fn wake_during_shutdown() {
538 struct Shared {
539 waker: Option<Waker>,
540 }
541
542 struct MyFuture {
543 shared: Arc<Mutex<Shared>>,
544 put_waker: bool,
545 }
546
547 impl MyFuture {
548 fn new() -> (Self, Self) {
549 let shared = Arc::new(Mutex::new(Shared { waker: None }));
550 let f1 = MyFuture {
551 shared: shared.clone(),
552 put_waker: true,
553 };
554 let f2 = MyFuture {
555 shared,
556 put_waker: false,
557 };
558 (f1, f2)
559 }
560 }
561
562 impl Future for MyFuture {
563 type Output = ();
564
565 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
566 let me = Pin::into_inner(self);
567 let mut lock = me.shared.lock().unwrap();
568 if me.put_waker {
569 lock.waker = Some(cx.waker().clone());
570 }
571 Poll::Pending
572 }
573 }
574
575 impl Drop for MyFuture {
576 fn drop(&mut self) {
577 let mut lock = self.shared.lock().unwrap();
578 if !self.put_waker {
579 lock.waker.take().unwrap().wake();
580 }
581 drop(lock);
582 }
583 }
584
585 let rt = tokio::runtime::Builder::new_multi_thread()
586 .worker_threads(1)
587 .enable_all()
588 .build()
589 .unwrap();
590
591 let (f1, f2) = MyFuture::new();
592
593 rt.spawn(f1);
594 rt.spawn(f2);
595
596 rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
597 }
598
599 #[should_panic]
600 #[tokio::test]
test_block_in_place1()601 async fn test_block_in_place1() {
602 tokio::task::block_in_place(|| {});
603 }
604
605 #[tokio::test(flavor = "multi_thread")]
test_block_in_place2()606 async fn test_block_in_place2() {
607 tokio::task::block_in_place(|| {});
608 }
609
610 #[should_panic]
611 #[tokio::main(flavor = "current_thread")]
612 #[test]
test_block_in_place3()613 async fn test_block_in_place3() {
614 tokio::task::block_in_place(|| {});
615 }
616
617 #[tokio::main]
618 #[test]
test_block_in_place4()619 async fn test_block_in_place4() {
620 tokio::task::block_in_place(|| {});
621 }
622
623 // Repro for tokio-rs/tokio#5239
624 #[test]
test_nested_block_in_place_with_block_on_between()625 fn test_nested_block_in_place_with_block_on_between() {
626 let rt = runtime::Builder::new_multi_thread()
627 .worker_threads(1)
628 // Needs to be more than 0
629 .max_blocking_threads(1)
630 .build()
631 .unwrap();
632
633 // Triggered by a race condition, so run a few times to make sure it is OK.
634 for _ in 0..100 {
635 let h = rt.handle().clone();
636
637 rt.block_on(async move {
638 tokio::spawn(async move {
639 tokio::task::block_in_place(|| {
640 h.block_on(async {
641 tokio::task::block_in_place(|| {});
642 });
643 })
644 })
645 .await
646 .unwrap()
647 });
648 }
649 }
650
651 // Testing the tuning logic is tricky as it is inherently timing based, and more
652 // of a heuristic than an exact behavior. This test checks that the interval
653 // changes over time based on load factors. There are no assertions, completion
654 // is sufficient. If there is a regression, this test will hang. In theory, we
655 // could add limits, but that would be likely to fail on CI.
656 #[test]
657 #[cfg(not(tokio_no_tuning_tests))]
test_tuning()658 fn test_tuning() {
659 use std::sync::atomic::AtomicBool;
660 use std::time::Duration;
661
662 let rt = runtime::Builder::new_multi_thread()
663 .worker_threads(1)
664 .build()
665 .unwrap();
666
667 fn iter(flag: Arc<AtomicBool>, counter: Arc<AtomicUsize>, stall: bool) {
668 if flag.load(Relaxed) {
669 if stall {
670 std::thread::sleep(Duration::from_micros(5));
671 }
672
673 counter.fetch_add(1, Relaxed);
674 tokio::spawn(async move { iter(flag, counter, stall) });
675 }
676 }
677
678 let flag = Arc::new(AtomicBool::new(true));
679 let counter = Arc::new(AtomicUsize::new(61));
680 let interval = Arc::new(AtomicUsize::new(61));
681
682 {
683 let flag = flag.clone();
684 let counter = counter.clone();
685 rt.spawn(async move { iter(flag, counter, true) });
686 }
687
688 // Now, hammer the injection queue until the interval drops.
689 let mut n = 0;
690 loop {
691 let curr = interval.load(Relaxed);
692
693 if curr <= 8 {
694 n += 1;
695 } else {
696 n = 0;
697 }
698
699 // Make sure we get a few good rounds. Jitter in the tuning could result
700 // in one "good" value without being representative of reaching a good
701 // state.
702 if n == 3 {
703 break;
704 }
705
706 if Arc::strong_count(&interval) < 5_000 {
707 let counter = counter.clone();
708 let interval = interval.clone();
709
710 rt.spawn(async move {
711 let prev = counter.swap(0, Relaxed);
712 interval.store(prev, Relaxed);
713 });
714
715 std::thread::yield_now();
716 }
717 }
718
719 flag.store(false, Relaxed);
720
721 let w = Arc::downgrade(&interval);
722 drop(interval);
723
724 while w.strong_count() > 0 {
725 std::thread::sleep(Duration::from_micros(500));
726 }
727
728 // Now, run it again with a faster task
729 let flag = Arc::new(AtomicBool::new(true));
730 // Set it high, we know it shouldn't ever really be this high
731 let counter = Arc::new(AtomicUsize::new(10_000));
732 let interval = Arc::new(AtomicUsize::new(10_000));
733
734 {
735 let flag = flag.clone();
736 let counter = counter.clone();
737 rt.spawn(async move { iter(flag, counter, false) });
738 }
739
740 // Now, hammer the injection queue until the interval reaches the expected range.
741 let mut n = 0;
742 loop {
743 let curr = interval.load(Relaxed);
744
745 if curr <= 1_000 && curr > 32 {
746 n += 1;
747 } else {
748 n = 0;
749 }
750
751 if n == 3 {
752 break;
753 }
754
755 if Arc::strong_count(&interval) <= 5_000 {
756 let counter = counter.clone();
757 let interval = interval.clone();
758
759 rt.spawn(async move {
760 let prev = counter.swap(0, Relaxed);
761 interval.store(prev, Relaxed);
762 });
763 }
764
765 std::thread::yield_now();
766 }
767
768 flag.store(false, Relaxed);
769 }
770
rt() -> runtime::Runtime771 fn rt() -> runtime::Runtime {
772 runtime::Runtime::new().unwrap()
773 }
774
775 #[cfg(tokio_unstable)]
776 mod unstable {
777 use super::*;
778
779 #[test]
test_disable_lifo_slot()780 fn test_disable_lifo_slot() {
781 use std::sync::mpsc::{channel, RecvTimeoutError};
782
783 let rt = runtime::Builder::new_multi_thread()
784 .disable_lifo_slot()
785 .worker_threads(2)
786 .build()
787 .unwrap();
788
789 // Spawn a background thread to poke the runtime periodically.
790 //
791 // This is necessary because we may end up triggering the issue in:
792 // <https://github.com/tokio-rs/tokio/issues/4730>
793 //
794 // Spawning a task will wake up the second worker, which will then steal
795 // the task. However, the steal will fail if the task is in the LIFO
796 // slot, because the LIFO slot cannot be stolen.
797 //
798 // Note that this only happens rarely. Most of the time, this thread is
799 // not necessary.
800 let (kill_bg_thread, recv) = channel::<()>();
801 let handle = rt.handle().clone();
802 let bg_thread = std::thread::spawn(move || {
803 let one_sec = std::time::Duration::from_secs(1);
804 while recv.recv_timeout(one_sec) == Err(RecvTimeoutError::Timeout) {
805 handle.spawn(async {});
806 }
807 });
808
809 rt.block_on(async {
810 tokio::spawn(async {
811 // Spawn another task and block the thread until completion. If the LIFO slot
812 // is used then the test doesn't complete.
813 futures::executor::block_on(tokio::spawn(async {})).unwrap();
814 })
815 .await
816 .unwrap();
817 });
818
819 drop(kill_bg_thread);
820 bg_thread.join().unwrap();
821 }
822
823 #[test]
runtime_id_is_same()824 fn runtime_id_is_same() {
825 let rt = rt();
826
827 let handle1 = rt.handle();
828 let handle2 = rt.handle();
829
830 assert_eq!(handle1.id(), handle2.id());
831 }
832
833 #[test]
runtime_ids_different()834 fn runtime_ids_different() {
835 let rt1 = rt();
836 let rt2 = rt();
837
838 assert_ne!(rt1.handle().id(), rt2.handle().id());
839 }
840 }
841