1 #![allow(unknown_lints, unexpected_cfgs)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))]
4 
5 use std::sync::mpsc;
6 use std::time::Duration;
7 use tokio::runtime::Runtime;
8 
9 #[test]
num_workers()10 fn num_workers() {
11     let rt = current_thread();
12     assert_eq!(1, rt.metrics().num_workers());
13 
14     let rt = threaded();
15     assert_eq!(2, rt.metrics().num_workers());
16 }
17 
18 #[test]
num_alive_tasks()19 fn num_alive_tasks() {
20     let rt = current_thread();
21     let metrics = rt.metrics();
22     assert_eq!(0, metrics.num_alive_tasks());
23     rt.block_on(rt.spawn(async move {
24         assert_eq!(1, metrics.num_alive_tasks());
25     }))
26     .unwrap();
27 
28     assert_eq!(0, rt.metrics().num_alive_tasks());
29 
30     let rt = threaded();
31     let metrics = rt.metrics();
32     assert_eq!(0, metrics.num_alive_tasks());
33     rt.block_on(rt.spawn(async move {
34         assert_eq!(1, metrics.num_alive_tasks());
35     }))
36     .unwrap();
37 
38     // try for 10 seconds to see if this eventually succeeds.
39     // wake_join() is called before the task is released, so in multithreaded
40     // code, this means we sometimes exit the block_on before the counter decrements.
41     for _ in 0..100 {
42         if rt.metrics().num_alive_tasks() == 0 {
43             break;
44         }
45         std::thread::sleep(std::time::Duration::from_millis(100));
46     }
47     assert_eq!(0, rt.metrics().num_alive_tasks());
48 }
49 
50 #[test]
global_queue_depth_current_thread()51 fn global_queue_depth_current_thread() {
52     use std::thread;
53 
54     let rt = current_thread();
55     let handle = rt.handle().clone();
56     let metrics = rt.metrics();
57 
58     thread::spawn(move || {
59         handle.spawn(async {});
60     })
61     .join()
62     .unwrap();
63 
64     assert_eq!(1, metrics.global_queue_depth());
65 }
66 
67 #[test]
global_queue_depth_multi_thread()68 fn global_queue_depth_multi_thread() {
69     for _ in 0..10 {
70         let rt = threaded();
71         let metrics = rt.metrics();
72 
73         if let Ok(_blocking_tasks) = try_block_threaded(&rt) {
74             for i in 0..10 {
75                 assert_eq!(i, metrics.global_queue_depth());
76                 rt.spawn(async {});
77             }
78 
79             return;
80         }
81     }
82 
83     panic!("exhausted every try to block the runtime");
84 }
85 
try_block_threaded(rt: &Runtime) -> Result<Vec<mpsc::Sender<()>>, mpsc::RecvTimeoutError>86 fn try_block_threaded(rt: &Runtime) -> Result<Vec<mpsc::Sender<()>>, mpsc::RecvTimeoutError> {
87     let (tx, rx) = mpsc::channel();
88 
89     let blocking_tasks = (0..rt.metrics().num_workers())
90         .map(|_| {
91             let tx = tx.clone();
92             let (task, barrier) = mpsc::channel();
93 
94             // Spawn a task per runtime worker to block it.
95             rt.spawn(async move {
96                 tx.send(()).ok();
97                 barrier.recv().ok();
98             });
99 
100             task
101         })
102         .collect();
103 
104     // Make sure the previously spawned tasks are blocking the runtime by
105     // receiving a message from each blocking task.
106     //
107     // If this times out we were unsuccessful in blocking the runtime and hit
108     // a deadlock instead (which might happen and is expected behaviour).
109     for _ in 0..rt.metrics().num_workers() {
110         rx.recv_timeout(Duration::from_secs(1))?;
111     }
112 
113     // Return senders of the mpsc channels used for blocking the runtime as a
114     // surrogate handle for the tasks. Sending a message or dropping the senders
115     // will unblock the runtime.
116     Ok(blocking_tasks)
117 }
118 
current_thread() -> Runtime119 fn current_thread() -> Runtime {
120     tokio::runtime::Builder::new_current_thread()
121         .enable_all()
122         .build()
123         .unwrap()
124 }
125 
threaded() -> Runtime126 fn threaded() -> Runtime {
127     tokio::runtime::Builder::new_multi_thread()
128         .worker_threads(2)
129         .enable_all()
130         .build()
131         .unwrap()
132 }
133