1 #![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
2
3 use std::future::Future;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 use tokio::sync::oneshot;
7
8 #[tokio::test(flavor = "multi_thread")]
local()9 async fn local() {
10 tokio::task_local! {
11 static REQ_ID: u32;
12 pub static FOO: bool;
13 }
14
15 let j1 = tokio::spawn(REQ_ID.scope(1, async move {
16 assert_eq!(REQ_ID.get(), 1);
17 assert_eq!(REQ_ID.get(), 1);
18 }));
19
20 let j2 = tokio::spawn(REQ_ID.scope(2, async move {
21 REQ_ID.with(|v| {
22 assert_eq!(REQ_ID.get(), 2);
23 assert_eq!(*v, 2);
24 });
25
26 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
27
28 assert_eq!(REQ_ID.get(), 2);
29 }));
30
31 let j3 = tokio::spawn(FOO.scope(true, async move {
32 assert!(FOO.get());
33 }));
34
35 j1.await.unwrap();
36 j2.await.unwrap();
37 j3.await.unwrap();
38 }
39
40 #[tokio::test]
task_local_available_on_abort()41 async fn task_local_available_on_abort() {
42 tokio::task_local! {
43 static KEY: u32;
44 }
45
46 struct MyFuture {
47 tx_poll: Option<oneshot::Sender<()>>,
48 tx_drop: Option<oneshot::Sender<u32>>,
49 }
50 impl Future for MyFuture {
51 type Output = ();
52
53 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
54 if let Some(tx_poll) = self.tx_poll.take() {
55 let _ = tx_poll.send(());
56 }
57 Poll::Pending
58 }
59 }
60 impl Drop for MyFuture {
61 fn drop(&mut self) {
62 let _ = self.tx_drop.take().unwrap().send(KEY.get());
63 }
64 }
65
66 let (tx_drop, rx_drop) = oneshot::channel();
67 let (tx_poll, rx_poll) = oneshot::channel();
68
69 let h = tokio::spawn(KEY.scope(
70 42,
71 MyFuture {
72 tx_poll: Some(tx_poll),
73 tx_drop: Some(tx_drop),
74 },
75 ));
76
77 rx_poll.await.unwrap();
78 h.abort();
79 assert_eq!(rx_drop.await.unwrap(), 42);
80
81 let err = h.await.unwrap_err();
82 if !err.is_cancelled() {
83 if let Ok(panic) = err.try_into_panic() {
84 std::panic::resume_unwind(panic);
85 } else {
86 panic!();
87 }
88 }
89 }
90
91 #[tokio::test]
task_local_available_on_completion_drop()92 async fn task_local_available_on_completion_drop() {
93 tokio::task_local! {
94 static KEY: u32;
95 }
96
97 struct MyFuture {
98 tx: Option<oneshot::Sender<u32>>,
99 }
100 impl Future for MyFuture {
101 type Output = ();
102
103 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
104 Poll::Ready(())
105 }
106 }
107 impl Drop for MyFuture {
108 fn drop(&mut self) {
109 let _ = self.tx.take().unwrap().send(KEY.get());
110 }
111 }
112
113 let (tx, rx) = oneshot::channel();
114
115 let h = tokio::spawn(KEY.scope(42, MyFuture { tx: Some(tx) }));
116
117 assert_eq!(rx.await.unwrap(), 42);
118 h.await.unwrap();
119 }
120
121 #[tokio::test]
take_value()122 async fn take_value() {
123 tokio::task_local! {
124 static KEY: u32
125 }
126 let fut = KEY.scope(1, async {});
127 let mut pinned = Box::pin(fut);
128 assert_eq!(pinned.as_mut().take_value(), Some(1));
129 assert_eq!(pinned.as_mut().take_value(), None);
130 }
131
132 #[tokio::test]
poll_after_take_value_should_fail()133 async fn poll_after_take_value_should_fail() {
134 tokio::task_local! {
135 static KEY: u32
136 }
137 let fut = KEY.scope(1, async {
138 let result = KEY.try_with(|_| {});
139 // The task local value no longer exists.
140 assert!(result.is_err());
141 });
142 let mut fut = Box::pin(fut);
143 fut.as_mut().take_value();
144
145 // Poll the future after `take_value` has been called
146 fut.await;
147 }
148