1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7
8 use tokio::sync::watch;
9 use tokio_test::task::spawn;
10 use tokio_test::{
11 assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
12 };
13
14 #[test]
single_rx_recv()15 fn single_rx_recv() {
16 let (tx, mut rx) = watch::channel("one");
17
18 {
19 // Not initially notified
20 let mut t = spawn(rx.changed());
21 assert_pending!(t.poll());
22 }
23 assert_eq!(*rx.borrow(), "one");
24
25 {
26 let mut t = spawn(rx.changed());
27 assert_pending!(t.poll());
28
29 tx.send("two").unwrap();
30
31 assert!(t.is_woken());
32
33 assert_ready_ok!(t.poll());
34 }
35 assert_eq!(*rx.borrow(), "two");
36
37 {
38 let mut t = spawn(rx.changed());
39 assert_pending!(t.poll());
40
41 drop(tx);
42
43 assert!(t.is_woken());
44 assert_ready_err!(t.poll());
45 }
46 assert_eq!(*rx.borrow(), "two");
47 }
48
49 #[test]
rx_version_underflow()50 fn rx_version_underflow() {
51 let (_tx, mut rx) = watch::channel("one");
52
53 // Version starts at 2, validate we do not underflow
54 rx.mark_changed();
55 rx.mark_changed();
56 }
57
58 #[test]
rx_mark_changed()59 fn rx_mark_changed() {
60 let (tx, mut rx) = watch::channel("one");
61
62 let mut rx2 = rx.clone();
63 let mut rx3 = rx.clone();
64 let mut rx4 = rx.clone();
65 {
66 rx.mark_changed();
67 assert!(rx.has_changed().unwrap());
68
69 let mut t = spawn(rx.changed());
70 assert_ready_ok!(t.poll());
71 }
72
73 {
74 assert!(!rx2.has_changed().unwrap());
75
76 let mut t = spawn(rx2.changed());
77 assert_pending!(t.poll());
78 }
79
80 {
81 rx3.mark_changed();
82 assert_eq!(*rx3.borrow(), "one");
83
84 assert!(rx3.has_changed().unwrap());
85
86 assert_eq!(*rx3.borrow_and_update(), "one");
87
88 assert!(!rx3.has_changed().unwrap());
89
90 let mut t = spawn(rx3.changed());
91 assert_pending!(t.poll());
92 }
93
94 {
95 tx.send("two").unwrap();
96 assert!(rx4.has_changed().unwrap());
97 assert_eq!(*rx4.borrow_and_update(), "two");
98
99 rx4.mark_changed();
100 assert!(rx4.has_changed().unwrap());
101 assert_eq!(*rx4.borrow_and_update(), "two")
102 }
103
104 assert_eq!(*rx.borrow(), "two");
105 }
106
107 #[test]
rx_mark_unchanged()108 fn rx_mark_unchanged() {
109 let (tx, mut rx) = watch::channel("one");
110
111 let mut rx2 = rx.clone();
112
113 {
114 assert!(!rx.has_changed().unwrap());
115
116 rx.mark_changed();
117 assert!(rx.has_changed().unwrap());
118
119 rx.mark_unchanged();
120 assert!(!rx.has_changed().unwrap());
121
122 let mut t = spawn(rx.changed());
123 assert_pending!(t.poll());
124 }
125
126 {
127 assert!(!rx2.has_changed().unwrap());
128
129 tx.send("two").unwrap();
130 assert!(rx2.has_changed().unwrap());
131
132 rx2.mark_unchanged();
133 assert!(!rx2.has_changed().unwrap());
134 assert_eq!(*rx2.borrow_and_update(), "two");
135 }
136
137 assert_eq!(*rx.borrow(), "two");
138 }
139
140 #[test]
multi_rx()141 fn multi_rx() {
142 let (tx, mut rx1) = watch::channel("one");
143 let mut rx2 = rx1.clone();
144
145 {
146 let mut t1 = spawn(rx1.changed());
147 let mut t2 = spawn(rx2.changed());
148
149 assert_pending!(t1.poll());
150 assert_pending!(t2.poll());
151 }
152 assert_eq!(*rx1.borrow(), "one");
153 assert_eq!(*rx2.borrow(), "one");
154
155 let mut t2 = spawn(rx2.changed());
156
157 {
158 let mut t1 = spawn(rx1.changed());
159
160 assert_pending!(t1.poll());
161 assert_pending!(t2.poll());
162
163 tx.send("two").unwrap();
164
165 assert!(t1.is_woken());
166 assert!(t2.is_woken());
167
168 assert_ready_ok!(t1.poll());
169 }
170 assert_eq!(*rx1.borrow(), "two");
171
172 {
173 let mut t1 = spawn(rx1.changed());
174
175 assert_pending!(t1.poll());
176
177 tx.send("three").unwrap();
178
179 assert!(t1.is_woken());
180 assert!(t2.is_woken());
181
182 assert_ready_ok!(t1.poll());
183 assert_ready_ok!(t2.poll());
184 }
185 assert_eq!(*rx1.borrow(), "three");
186
187 drop(t2);
188
189 assert_eq!(*rx2.borrow(), "three");
190
191 {
192 let mut t1 = spawn(rx1.changed());
193 let mut t2 = spawn(rx2.changed());
194
195 assert_pending!(t1.poll());
196 assert_pending!(t2.poll());
197
198 tx.send("four").unwrap();
199
200 assert_ready_ok!(t1.poll());
201 assert_ready_ok!(t2.poll());
202 }
203 assert_eq!(*rx1.borrow(), "four");
204 assert_eq!(*rx2.borrow(), "four");
205 }
206
207 #[test]
rx_observes_final_value()208 fn rx_observes_final_value() {
209 // Initial value
210
211 let (tx, mut rx) = watch::channel("one");
212 drop(tx);
213
214 {
215 let mut t1 = spawn(rx.changed());
216 assert_ready_err!(t1.poll());
217 }
218 assert_eq!(*rx.borrow(), "one");
219
220 // Sending a value
221
222 let (tx, mut rx) = watch::channel("one");
223
224 tx.send("two").unwrap();
225
226 {
227 let mut t1 = spawn(rx.changed());
228 assert_ready_ok!(t1.poll());
229 }
230 assert_eq!(*rx.borrow(), "two");
231
232 {
233 let mut t1 = spawn(rx.changed());
234 assert_pending!(t1.poll());
235
236 tx.send("three").unwrap();
237 drop(tx);
238
239 assert!(t1.is_woken());
240
241 assert_ready_ok!(t1.poll());
242 }
243 assert_eq!(*rx.borrow(), "three");
244
245 {
246 let mut t1 = spawn(rx.changed());
247 assert_ready_err!(t1.poll());
248 }
249 assert_eq!(*rx.borrow(), "three");
250 }
251
252 #[test]
poll_close()253 fn poll_close() {
254 let (tx, rx) = watch::channel("one");
255
256 {
257 let mut t = spawn(tx.closed());
258 assert_pending!(t.poll());
259
260 drop(rx);
261
262 assert!(t.is_woken());
263 assert_ready!(t.poll());
264 }
265
266 assert!(tx.send("two").is_err());
267 }
268
269 #[test]
borrow_and_update()270 fn borrow_and_update() {
271 let (tx, mut rx) = watch::channel("one");
272
273 assert!(!rx.has_changed().unwrap());
274
275 tx.send("two").unwrap();
276 assert!(rx.has_changed().unwrap());
277 assert_ready!(spawn(rx.changed()).poll()).unwrap();
278 assert_pending!(spawn(rx.changed()).poll());
279 assert!(!rx.has_changed().unwrap());
280
281 tx.send("three").unwrap();
282 assert!(rx.has_changed().unwrap());
283 assert_eq!(*rx.borrow_and_update(), "three");
284 assert_pending!(spawn(rx.changed()).poll());
285 assert!(!rx.has_changed().unwrap());
286
287 drop(tx);
288 assert_eq!(*rx.borrow_and_update(), "three");
289 assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
290 assert!(rx.has_changed().is_err());
291 }
292
293 #[test]
reopened_after_subscribe()294 fn reopened_after_subscribe() {
295 let (tx, rx) = watch::channel("one");
296 assert!(!tx.is_closed());
297
298 drop(rx);
299 assert!(tx.is_closed());
300
301 let rx = tx.subscribe();
302 assert!(!tx.is_closed());
303
304 drop(rx);
305 assert!(tx.is_closed());
306 }
307
308 #[test]
309 #[cfg(panic = "unwind")]
310 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
send_modify_panic()311 fn send_modify_panic() {
312 let (tx, mut rx) = watch::channel("one");
313
314 tx.send_modify(|old| *old = "two");
315 assert_eq!(*rx.borrow_and_update(), "two");
316
317 let mut rx2 = rx.clone();
318 assert_eq!(*rx2.borrow_and_update(), "two");
319
320 let mut task = spawn(rx2.changed());
321
322 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
323 tx.send_modify(|old| {
324 *old = "panicked";
325 panic!();
326 })
327 }));
328 assert!(result.is_err());
329
330 assert_pending!(task.poll());
331 assert_eq!(*rx.borrow(), "panicked");
332
333 tx.send_modify(|old| *old = "three");
334 assert_ready_ok!(task.poll());
335 assert_eq!(*rx.borrow_and_update(), "three");
336 }
337
338 #[tokio::test]
multiple_sender()339 async fn multiple_sender() {
340 let (tx1, mut rx) = watch::channel(0);
341 let tx2 = tx1.clone();
342
343 let mut t = spawn(async {
344 rx.changed().await.unwrap();
345 let v1 = *rx.borrow_and_update();
346 rx.changed().await.unwrap();
347 let v2 = *rx.borrow_and_update();
348 (v1, v2)
349 });
350
351 tx1.send(1).unwrap();
352 assert_pending!(t.poll());
353 tx2.send(2).unwrap();
354 assert_ready_eq!(t.poll(), (1, 2));
355 }
356
357 #[tokio::test]
receiver_is_notified_when_last_sender_is_dropped()358 async fn receiver_is_notified_when_last_sender_is_dropped() {
359 let (tx1, mut rx) = watch::channel(0);
360 let tx2 = tx1.clone();
361
362 let mut t = spawn(rx.changed());
363 assert_pending!(t.poll());
364
365 drop(tx1);
366 assert!(!t.is_woken());
367 drop(tx2);
368
369 assert!(t.is_woken());
370 }
371
372 #[tokio::test]
receiver_changed_is_cooperative()373 async fn receiver_changed_is_cooperative() {
374 let (tx, mut rx) = watch::channel(());
375
376 drop(tx);
377
378 tokio::select! {
379 biased;
380 _ = async {
381 loop {
382 assert!(rx.changed().await.is_err());
383 }
384 } => {},
385 _ = tokio::task::yield_now() => {},
386 }
387 }
388
389 #[tokio::test]
receiver_changed_is_cooperative_ok()390 async fn receiver_changed_is_cooperative_ok() {
391 let (tx, mut rx) = watch::channel(());
392
393 tokio::select! {
394 biased;
395 _ = async {
396 loop {
397 assert!(tx.send(()).is_ok());
398 assert!(rx.changed().await.is_ok());
399 }
400 } => {},
401 _ = tokio::task::yield_now() => {},
402 }
403 }
404
405 #[tokio::test]
receiver_wait_for_is_cooperative()406 async fn receiver_wait_for_is_cooperative() {
407 let (tx, mut rx) = watch::channel(0);
408
409 drop(tx);
410
411 tokio::select! {
412 biased;
413 _ = async {
414 loop {
415 assert!(rx.wait_for(|val| *val == 1).await.is_err());
416 }
417 } => {},
418 _ = tokio::task::yield_now() => {},
419 }
420 }
421
422 #[tokio::test]
receiver_wait_for_is_cooperative_ok()423 async fn receiver_wait_for_is_cooperative_ok() {
424 let (tx, mut rx) = watch::channel(0);
425
426 tokio::select! {
427 biased;
428 _ = async {
429 loop {
430 assert!(tx.send(1).is_ok());
431 assert!(rx.wait_for(|val| *val == 1).await.is_ok());
432 }
433 } => {},
434 _ = tokio::task::yield_now() => {},
435 }
436 }
437
438 #[tokio::test]
sender_closed_is_cooperative()439 async fn sender_closed_is_cooperative() {
440 let (tx, rx) = watch::channel(());
441
442 drop(rx);
443
444 tokio::select! {
445 _ = async {
446 loop {
447 tx.closed().await;
448 }
449 } => {},
450 _ = tokio::task::yield_now() => {},
451 }
452 }
453