1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 
8 use tokio::sync::watch;
9 use tokio_test::task::spawn;
10 use tokio_test::{
11     assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
12 };
13 
14 #[test]
single_rx_recv()15 fn single_rx_recv() {
16     let (tx, mut rx) = watch::channel("one");
17 
18     {
19         // Not initially notified
20         let mut t = spawn(rx.changed());
21         assert_pending!(t.poll());
22     }
23     assert_eq!(*rx.borrow(), "one");
24 
25     {
26         let mut t = spawn(rx.changed());
27         assert_pending!(t.poll());
28 
29         tx.send("two").unwrap();
30 
31         assert!(t.is_woken());
32 
33         assert_ready_ok!(t.poll());
34     }
35     assert_eq!(*rx.borrow(), "two");
36 
37     {
38         let mut t = spawn(rx.changed());
39         assert_pending!(t.poll());
40 
41         drop(tx);
42 
43         assert!(t.is_woken());
44         assert_ready_err!(t.poll());
45     }
46     assert_eq!(*rx.borrow(), "two");
47 }
48 
49 #[test]
rx_version_underflow()50 fn rx_version_underflow() {
51     let (_tx, mut rx) = watch::channel("one");
52 
53     // Version starts at 2, validate we do not underflow
54     rx.mark_changed();
55     rx.mark_changed();
56 }
57 
58 #[test]
rx_mark_changed()59 fn rx_mark_changed() {
60     let (tx, mut rx) = watch::channel("one");
61 
62     let mut rx2 = rx.clone();
63     let mut rx3 = rx.clone();
64     let mut rx4 = rx.clone();
65     {
66         rx.mark_changed();
67         assert!(rx.has_changed().unwrap());
68 
69         let mut t = spawn(rx.changed());
70         assert_ready_ok!(t.poll());
71     }
72 
73     {
74         assert!(!rx2.has_changed().unwrap());
75 
76         let mut t = spawn(rx2.changed());
77         assert_pending!(t.poll());
78     }
79 
80     {
81         rx3.mark_changed();
82         assert_eq!(*rx3.borrow(), "one");
83 
84         assert!(rx3.has_changed().unwrap());
85 
86         assert_eq!(*rx3.borrow_and_update(), "one");
87 
88         assert!(!rx3.has_changed().unwrap());
89 
90         let mut t = spawn(rx3.changed());
91         assert_pending!(t.poll());
92     }
93 
94     {
95         tx.send("two").unwrap();
96         assert!(rx4.has_changed().unwrap());
97         assert_eq!(*rx4.borrow_and_update(), "two");
98 
99         rx4.mark_changed();
100         assert!(rx4.has_changed().unwrap());
101         assert_eq!(*rx4.borrow_and_update(), "two")
102     }
103 
104     assert_eq!(*rx.borrow(), "two");
105 }
106 
107 #[test]
rx_mark_unchanged()108 fn rx_mark_unchanged() {
109     let (tx, mut rx) = watch::channel("one");
110 
111     let mut rx2 = rx.clone();
112 
113     {
114         assert!(!rx.has_changed().unwrap());
115 
116         rx.mark_changed();
117         assert!(rx.has_changed().unwrap());
118 
119         rx.mark_unchanged();
120         assert!(!rx.has_changed().unwrap());
121 
122         let mut t = spawn(rx.changed());
123         assert_pending!(t.poll());
124     }
125 
126     {
127         assert!(!rx2.has_changed().unwrap());
128 
129         tx.send("two").unwrap();
130         assert!(rx2.has_changed().unwrap());
131 
132         rx2.mark_unchanged();
133         assert!(!rx2.has_changed().unwrap());
134         assert_eq!(*rx2.borrow_and_update(), "two");
135     }
136 
137     assert_eq!(*rx.borrow(), "two");
138 }
139 
140 #[test]
multi_rx()141 fn multi_rx() {
142     let (tx, mut rx1) = watch::channel("one");
143     let mut rx2 = rx1.clone();
144 
145     {
146         let mut t1 = spawn(rx1.changed());
147         let mut t2 = spawn(rx2.changed());
148 
149         assert_pending!(t1.poll());
150         assert_pending!(t2.poll());
151     }
152     assert_eq!(*rx1.borrow(), "one");
153     assert_eq!(*rx2.borrow(), "one");
154 
155     let mut t2 = spawn(rx2.changed());
156 
157     {
158         let mut t1 = spawn(rx1.changed());
159 
160         assert_pending!(t1.poll());
161         assert_pending!(t2.poll());
162 
163         tx.send("two").unwrap();
164 
165         assert!(t1.is_woken());
166         assert!(t2.is_woken());
167 
168         assert_ready_ok!(t1.poll());
169     }
170     assert_eq!(*rx1.borrow(), "two");
171 
172     {
173         let mut t1 = spawn(rx1.changed());
174 
175         assert_pending!(t1.poll());
176 
177         tx.send("three").unwrap();
178 
179         assert!(t1.is_woken());
180         assert!(t2.is_woken());
181 
182         assert_ready_ok!(t1.poll());
183         assert_ready_ok!(t2.poll());
184     }
185     assert_eq!(*rx1.borrow(), "three");
186 
187     drop(t2);
188 
189     assert_eq!(*rx2.borrow(), "three");
190 
191     {
192         let mut t1 = spawn(rx1.changed());
193         let mut t2 = spawn(rx2.changed());
194 
195         assert_pending!(t1.poll());
196         assert_pending!(t2.poll());
197 
198         tx.send("four").unwrap();
199 
200         assert_ready_ok!(t1.poll());
201         assert_ready_ok!(t2.poll());
202     }
203     assert_eq!(*rx1.borrow(), "four");
204     assert_eq!(*rx2.borrow(), "four");
205 }
206 
207 #[test]
rx_observes_final_value()208 fn rx_observes_final_value() {
209     // Initial value
210 
211     let (tx, mut rx) = watch::channel("one");
212     drop(tx);
213 
214     {
215         let mut t1 = spawn(rx.changed());
216         assert_ready_err!(t1.poll());
217     }
218     assert_eq!(*rx.borrow(), "one");
219 
220     // Sending a value
221 
222     let (tx, mut rx) = watch::channel("one");
223 
224     tx.send("two").unwrap();
225 
226     {
227         let mut t1 = spawn(rx.changed());
228         assert_ready_ok!(t1.poll());
229     }
230     assert_eq!(*rx.borrow(), "two");
231 
232     {
233         let mut t1 = spawn(rx.changed());
234         assert_pending!(t1.poll());
235 
236         tx.send("three").unwrap();
237         drop(tx);
238 
239         assert!(t1.is_woken());
240 
241         assert_ready_ok!(t1.poll());
242     }
243     assert_eq!(*rx.borrow(), "three");
244 
245     {
246         let mut t1 = spawn(rx.changed());
247         assert_ready_err!(t1.poll());
248     }
249     assert_eq!(*rx.borrow(), "three");
250 }
251 
252 #[test]
poll_close()253 fn poll_close() {
254     let (tx, rx) = watch::channel("one");
255 
256     {
257         let mut t = spawn(tx.closed());
258         assert_pending!(t.poll());
259 
260         drop(rx);
261 
262         assert!(t.is_woken());
263         assert_ready!(t.poll());
264     }
265 
266     assert!(tx.send("two").is_err());
267 }
268 
269 #[test]
borrow_and_update()270 fn borrow_and_update() {
271     let (tx, mut rx) = watch::channel("one");
272 
273     assert!(!rx.has_changed().unwrap());
274 
275     tx.send("two").unwrap();
276     assert!(rx.has_changed().unwrap());
277     assert_ready!(spawn(rx.changed()).poll()).unwrap();
278     assert_pending!(spawn(rx.changed()).poll());
279     assert!(!rx.has_changed().unwrap());
280 
281     tx.send("three").unwrap();
282     assert!(rx.has_changed().unwrap());
283     assert_eq!(*rx.borrow_and_update(), "three");
284     assert_pending!(spawn(rx.changed()).poll());
285     assert!(!rx.has_changed().unwrap());
286 
287     drop(tx);
288     assert_eq!(*rx.borrow_and_update(), "three");
289     assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
290     assert!(rx.has_changed().is_err());
291 }
292 
293 #[test]
reopened_after_subscribe()294 fn reopened_after_subscribe() {
295     let (tx, rx) = watch::channel("one");
296     assert!(!tx.is_closed());
297 
298     drop(rx);
299     assert!(tx.is_closed());
300 
301     let rx = tx.subscribe();
302     assert!(!tx.is_closed());
303 
304     drop(rx);
305     assert!(tx.is_closed());
306 }
307 
308 #[test]
309 #[cfg(panic = "unwind")]
310 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
send_modify_panic()311 fn send_modify_panic() {
312     let (tx, mut rx) = watch::channel("one");
313 
314     tx.send_modify(|old| *old = "two");
315     assert_eq!(*rx.borrow_and_update(), "two");
316 
317     let mut rx2 = rx.clone();
318     assert_eq!(*rx2.borrow_and_update(), "two");
319 
320     let mut task = spawn(rx2.changed());
321 
322     let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
323         tx.send_modify(|old| {
324             *old = "panicked";
325             panic!();
326         })
327     }));
328     assert!(result.is_err());
329 
330     assert_pending!(task.poll());
331     assert_eq!(*rx.borrow(), "panicked");
332 
333     tx.send_modify(|old| *old = "three");
334     assert_ready_ok!(task.poll());
335     assert_eq!(*rx.borrow_and_update(), "three");
336 }
337 
338 #[tokio::test]
multiple_sender()339 async fn multiple_sender() {
340     let (tx1, mut rx) = watch::channel(0);
341     let tx2 = tx1.clone();
342 
343     let mut t = spawn(async {
344         rx.changed().await.unwrap();
345         let v1 = *rx.borrow_and_update();
346         rx.changed().await.unwrap();
347         let v2 = *rx.borrow_and_update();
348         (v1, v2)
349     });
350 
351     tx1.send(1).unwrap();
352     assert_pending!(t.poll());
353     tx2.send(2).unwrap();
354     assert_ready_eq!(t.poll(), (1, 2));
355 }
356 
357 #[tokio::test]
receiver_is_notified_when_last_sender_is_dropped()358 async fn receiver_is_notified_when_last_sender_is_dropped() {
359     let (tx1, mut rx) = watch::channel(0);
360     let tx2 = tx1.clone();
361 
362     let mut t = spawn(rx.changed());
363     assert_pending!(t.poll());
364 
365     drop(tx1);
366     assert!(!t.is_woken());
367     drop(tx2);
368 
369     assert!(t.is_woken());
370 }
371 
372 #[tokio::test]
receiver_changed_is_cooperative()373 async fn receiver_changed_is_cooperative() {
374     let (tx, mut rx) = watch::channel(());
375 
376     drop(tx);
377 
378     tokio::select! {
379         biased;
380         _ = async {
381             loop {
382                 assert!(rx.changed().await.is_err());
383             }
384         } => {},
385         _ = tokio::task::yield_now() => {},
386     }
387 }
388 
389 #[tokio::test]
receiver_changed_is_cooperative_ok()390 async fn receiver_changed_is_cooperative_ok() {
391     let (tx, mut rx) = watch::channel(());
392 
393     tokio::select! {
394         biased;
395         _ = async {
396             loop {
397                 assert!(tx.send(()).is_ok());
398                 assert!(rx.changed().await.is_ok());
399             }
400         } => {},
401         _ = tokio::task::yield_now() => {},
402     }
403 }
404 
405 #[tokio::test]
receiver_wait_for_is_cooperative()406 async fn receiver_wait_for_is_cooperative() {
407     let (tx, mut rx) = watch::channel(0);
408 
409     drop(tx);
410 
411     tokio::select! {
412         biased;
413         _ = async {
414             loop {
415                 assert!(rx.wait_for(|val| *val == 1).await.is_err());
416             }
417         } => {},
418         _ = tokio::task::yield_now() => {},
419     }
420 }
421 
422 #[tokio::test]
receiver_wait_for_is_cooperative_ok()423 async fn receiver_wait_for_is_cooperative_ok() {
424     let (tx, mut rx) = watch::channel(0);
425 
426     tokio::select! {
427         biased;
428         _ = async {
429             loop {
430                 assert!(tx.send(1).is_ok());
431                 assert!(rx.wait_for(|val| *val == 1).await.is_ok());
432             }
433         } => {},
434         _ = tokio::task::yield_now() => {},
435     }
436 }
437 
438 #[tokio::test]
sender_closed_is_cooperative()439 async fn sender_closed_is_cooperative() {
440     let (tx, rx) = watch::channel(());
441 
442     drop(rx);
443 
444     tokio::select! {
445         _ = async {
446             loop {
447                 tx.closed().await;
448             }
449         } => {},
450         _ = tokio::task::yield_now() => {},
451     }
452 }
453