1 #![warn(rust_2018_idioms)]
2 #![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))] // Wasi doesn't support threads
3
4 use tokio::{runtime, task, time};
5 use tokio_test::assert_ok;
6
7 use std::thread;
8 use std::time::Duration;
9
10 mod support {
11 pub(crate) mod mpsc_stream;
12 }
13
14 #[tokio::test]
basic_blocking()15 async fn basic_blocking() {
16 // Run a few times
17 for _ in 0..100 {
18 let out = assert_ok!(
19 tokio::spawn(async {
20 assert_ok!(
21 task::spawn_blocking(|| {
22 thread::sleep(Duration::from_millis(5));
23 "hello"
24 })
25 .await
26 )
27 })
28 .await
29 );
30
31 assert_eq!(out, "hello");
32 }
33 }
34
35 #[tokio::test(flavor = "multi_thread")]
block_in_blocking()36 async fn block_in_blocking() {
37 // Run a few times
38 for _ in 0..100 {
39 let out = assert_ok!(
40 tokio::spawn(async {
41 assert_ok!(
42 task::spawn_blocking(|| {
43 task::block_in_place(|| {
44 thread::sleep(Duration::from_millis(5));
45 });
46 "hello"
47 })
48 .await
49 )
50 })
51 .await
52 );
53
54 assert_eq!(out, "hello");
55 }
56 }
57
58 #[tokio::test(flavor = "multi_thread")]
block_in_block()59 async fn block_in_block() {
60 // Run a few times
61 for _ in 0..100 {
62 let out = assert_ok!(
63 tokio::spawn(async {
64 task::block_in_place(|| {
65 task::block_in_place(|| {
66 thread::sleep(Duration::from_millis(5));
67 });
68 "hello"
69 })
70 })
71 .await
72 );
73
74 assert_eq!(out, "hello");
75 }
76 }
77
78 #[tokio::test(flavor = "current_thread")]
79 #[should_panic]
no_block_in_current_thread_scheduler()80 async fn no_block_in_current_thread_scheduler() {
81 task::block_in_place(|| {});
82 }
83
84 #[test]
yes_block_in_threaded_block_on()85 fn yes_block_in_threaded_block_on() {
86 let rt = runtime::Runtime::new().unwrap();
87 rt.block_on(async {
88 task::block_in_place(|| {});
89 });
90 }
91
92 #[test]
93 #[should_panic]
no_block_in_current_thread_block_on()94 fn no_block_in_current_thread_block_on() {
95 let rt = runtime::Builder::new_current_thread().build().unwrap();
96 rt.block_on(async {
97 task::block_in_place(|| {});
98 });
99 }
100
101 #[test]
can_enter_current_thread_rt_from_within_block_in_place()102 fn can_enter_current_thread_rt_from_within_block_in_place() {
103 let outer = tokio::runtime::Runtime::new().unwrap();
104
105 outer.block_on(async {
106 tokio::task::block_in_place(|| {
107 let inner = tokio::runtime::Builder::new_current_thread()
108 .build()
109 .unwrap();
110
111 inner.block_on(async {})
112 })
113 });
114 }
115
116 #[test]
117 #[cfg(panic = "unwind")]
useful_panic_message_when_dropping_rt_in_rt()118 fn useful_panic_message_when_dropping_rt_in_rt() {
119 use std::panic::{catch_unwind, AssertUnwindSafe};
120
121 let outer = tokio::runtime::Runtime::new().unwrap();
122
123 let result = catch_unwind(AssertUnwindSafe(|| {
124 outer.block_on(async {
125 let _ = tokio::runtime::Builder::new_current_thread()
126 .build()
127 .unwrap();
128 });
129 }));
130
131 assert!(result.is_err());
132 let err = result.unwrap_err();
133 let err: &'static str = err.downcast_ref::<&'static str>().unwrap();
134
135 assert!(
136 err.contains("Cannot drop a runtime"),
137 "Wrong panic message: {err:?}"
138 );
139 }
140
141 #[test]
can_shutdown_with_zero_timeout_in_runtime()142 fn can_shutdown_with_zero_timeout_in_runtime() {
143 let outer = tokio::runtime::Runtime::new().unwrap();
144
145 outer.block_on(async {
146 let rt = tokio::runtime::Builder::new_current_thread()
147 .build()
148 .unwrap();
149 rt.shutdown_timeout(Duration::from_nanos(0));
150 });
151 }
152
153 #[test]
can_shutdown_now_in_runtime()154 fn can_shutdown_now_in_runtime() {
155 let outer = tokio::runtime::Runtime::new().unwrap();
156
157 outer.block_on(async {
158 let rt = tokio::runtime::Builder::new_current_thread()
159 .build()
160 .unwrap();
161 rt.shutdown_background();
162 });
163 }
164
165 #[test]
coop_disabled_in_block_in_place()166 fn coop_disabled_in_block_in_place() {
167 let outer = tokio::runtime::Builder::new_multi_thread()
168 .enable_time()
169 .build()
170 .unwrap();
171
172 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
173
174 for i in 0..200 {
175 tx.send(i).unwrap();
176 }
177 drop(tx);
178
179 outer.block_on(async move {
180 let jh = tokio::spawn(async move {
181 tokio::task::block_in_place(move || {
182 futures::executor::block_on(async move {
183 use tokio_stream::StreamExt;
184 assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
185 })
186 })
187 });
188
189 tokio::time::timeout(Duration::from_secs(1), jh)
190 .await
191 .expect("timed out (probably hanging)")
192 .unwrap()
193 });
194 }
195
196 #[test]
coop_disabled_in_block_in_place_in_block_on()197 fn coop_disabled_in_block_in_place_in_block_on() {
198 let (done_tx, done_rx) = std::sync::mpsc::channel();
199 let done = done_tx.clone();
200 thread::spawn(move || {
201 let outer = tokio::runtime::Runtime::new().unwrap();
202
203 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
204
205 for i in 0..200 {
206 tx.send(i).unwrap();
207 }
208 drop(tx);
209
210 outer.block_on(async move {
211 tokio::task::block_in_place(move || {
212 futures::executor::block_on(async move {
213 use tokio_stream::StreamExt;
214 assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
215 })
216 })
217 });
218
219 let _ = done.send(Ok(()));
220 });
221
222 thread::spawn(move || {
223 thread::sleep(Duration::from_secs(1));
224 let _ = done_tx.send(Err("timed out (probably hanging)"));
225 });
226
227 done_rx.recv().unwrap().unwrap();
228 }
229
230 #[cfg(feature = "test-util")]
231 #[tokio::test(start_paused = true)]
blocking_when_paused()232 async fn blocking_when_paused() {
233 // Do not auto-advance time when we have started a blocking task that has
234 // not yet finished.
235 time::timeout(
236 Duration::from_secs(3),
237 task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
238 )
239 .await
240 .expect("timeout should not trigger")
241 .expect("blocking task should finish");
242
243 // Really: Do not auto-advance time, even if the timeout is short and the
244 // blocking task runs for longer than that. It doesn't matter: Tokio time
245 // is paused; system time is not.
246 time::timeout(
247 Duration::from_millis(1),
248 task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
249 )
250 .await
251 .expect("timeout should not trigger")
252 .expect("blocking task should finish");
253 }
254
255 #[cfg(feature = "test-util")]
256 #[tokio::test(start_paused = true)]
blocking_task_wakes_paused_runtime()257 async fn blocking_task_wakes_paused_runtime() {
258 let t0 = std::time::Instant::now();
259 time::timeout(
260 Duration::from_secs(15),
261 task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
262 )
263 .await
264 .expect("timeout should not trigger")
265 .expect("blocking task should finish");
266 assert!(
267 t0.elapsed() < Duration::from_secs(10),
268 "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
269 );
270 }
271
272 #[cfg(feature = "test-util")]
273 #[tokio::test(start_paused = true)]
unawaited_blocking_task_wakes_paused_runtime()274 async fn unawaited_blocking_task_wakes_paused_runtime() {
275 let t0 = std::time::Instant::now();
276
277 // When this task finishes, time should auto-advance, even though the
278 // JoinHandle has not been awaited yet.
279 let a = task::spawn_blocking(|| {
280 thread::sleep(Duration::from_millis(1));
281 });
282
283 crate::time::sleep(Duration::from_secs(15)).await;
284 a.await.expect("blocking task should finish");
285 assert!(
286 t0.elapsed() < Duration::from_secs(10),
287 "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
288 );
289 }
290
291 #[cfg(panic = "unwind")]
292 #[cfg(feature = "test-util")]
293 #[tokio::test(start_paused = true)]
panicking_blocking_task_wakes_paused_runtime()294 async fn panicking_blocking_task_wakes_paused_runtime() {
295 let t0 = std::time::Instant::now();
296 let result = time::timeout(
297 Duration::from_secs(15),
298 task::spawn_blocking(|| {
299 thread::sleep(Duration::from_millis(1));
300 panic!("blocking task panicked");
301 }),
302 )
303 .await
304 .expect("timeout should not trigger");
305 assert!(result.is_err(), "blocking task should have panicked");
306 assert!(
307 t0.elapsed() < Duration::from_secs(10),
308 "completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
309 );
310 }
311