1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))]
4
5 use std::sync::mpsc;
6 use std::time::Duration;
7 use tokio::runtime::Runtime;
8
9 #[test]
num_workers()10 fn num_workers() {
11 let rt = current_thread();
12 assert_eq!(1, rt.metrics().num_workers());
13
14 let rt = threaded();
15 assert_eq!(2, rt.metrics().num_workers());
16 }
17
18 #[test]
num_alive_tasks()19 fn num_alive_tasks() {
20 let rt = current_thread();
21 let metrics = rt.metrics();
22 assert_eq!(0, metrics.num_alive_tasks());
23 rt.block_on(rt.spawn(async move {
24 assert_eq!(1, metrics.num_alive_tasks());
25 }))
26 .unwrap();
27
28 assert_eq!(0, rt.metrics().num_alive_tasks());
29
30 let rt = threaded();
31 let metrics = rt.metrics();
32 assert_eq!(0, metrics.num_alive_tasks());
33 rt.block_on(rt.spawn(async move {
34 assert_eq!(1, metrics.num_alive_tasks());
35 }))
36 .unwrap();
37
38 // try for 10 seconds to see if this eventually succeeds.
39 // wake_join() is called before the task is released, so in multithreaded
40 // code, this means we sometimes exit the block_on before the counter decrements.
41 for _ in 0..100 {
42 if rt.metrics().num_alive_tasks() == 0 {
43 break;
44 }
45 std::thread::sleep(std::time::Duration::from_millis(100));
46 }
47 assert_eq!(0, rt.metrics().num_alive_tasks());
48 }
49
50 #[test]
global_queue_depth_current_thread()51 fn global_queue_depth_current_thread() {
52 use std::thread;
53
54 let rt = current_thread();
55 let handle = rt.handle().clone();
56 let metrics = rt.metrics();
57
58 thread::spawn(move || {
59 handle.spawn(async {});
60 })
61 .join()
62 .unwrap();
63
64 assert_eq!(1, metrics.global_queue_depth());
65 }
66
67 #[test]
global_queue_depth_multi_thread()68 fn global_queue_depth_multi_thread() {
69 for _ in 0..10 {
70 let rt = threaded();
71 let metrics = rt.metrics();
72
73 if let Ok(_blocking_tasks) = try_block_threaded(&rt) {
74 for i in 0..10 {
75 assert_eq!(i, metrics.global_queue_depth());
76 rt.spawn(async {});
77 }
78
79 return;
80 }
81 }
82
83 panic!("exhausted every try to block the runtime");
84 }
85
try_block_threaded(rt: &Runtime) -> Result<Vec<mpsc::Sender<()>>, mpsc::RecvTimeoutError>86 fn try_block_threaded(rt: &Runtime) -> Result<Vec<mpsc::Sender<()>>, mpsc::RecvTimeoutError> {
87 let (tx, rx) = mpsc::channel();
88
89 let blocking_tasks = (0..rt.metrics().num_workers())
90 .map(|_| {
91 let tx = tx.clone();
92 let (task, barrier) = mpsc::channel();
93
94 // Spawn a task per runtime worker to block it.
95 rt.spawn(async move {
96 tx.send(()).ok();
97 barrier.recv().ok();
98 });
99
100 task
101 })
102 .collect();
103
104 // Make sure the previously spawned tasks are blocking the runtime by
105 // receiving a message from each blocking task.
106 //
107 // If this times out we were unsuccessful in blocking the runtime and hit
108 // a deadlock instead (which might happen and is expected behaviour).
109 for _ in 0..rt.metrics().num_workers() {
110 rx.recv_timeout(Duration::from_secs(1))?;
111 }
112
113 // Return senders of the mpsc channels used for blocking the runtime as a
114 // surrogate handle for the tasks. Sending a message or dropping the senders
115 // will unblock the runtime.
116 Ok(blocking_tasks)
117 }
118
current_thread() -> Runtime119 fn current_thread() -> Runtime {
120 tokio::runtime::Builder::new_current_thread()
121 .enable_all()
122 .build()
123 .unwrap()
124 }
125
threaded() -> Runtime126 fn threaded() -> Runtime {
127 tokio::runtime::Builder::new_multi_thread()
128 .worker_threads(2)
129 .enable_all()
130 .build()
131 .unwrap()
132 }
133