1 #![cfg(feature = "sync")]
2 
3 use tokio::sync::watch;
4 use tokio_stream::wrappers::WatchStream;
5 use tokio_stream::StreamExt;
6 use tokio_test::assert_pending;
7 use tokio_test::task::spawn;
8 
9 #[tokio::test]
watch_stream_message_not_twice()10 async fn watch_stream_message_not_twice() {
11     let (tx, rx) = watch::channel("hello");
12 
13     let mut counter = 0;
14     let mut stream = WatchStream::new(rx).map(move |payload| {
15         println!("{}", payload);
16         if payload == "goodbye" {
17             counter += 1;
18         }
19         if counter >= 2 {
20             panic!("too many goodbyes");
21         }
22     });
23 
24     let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
25 
26     // Send goodbye just once
27     tx.send("goodbye").unwrap();
28 
29     drop(tx);
30     task.await.unwrap();
31 }
32 
33 #[tokio::test]
watch_stream_from_rx()34 async fn watch_stream_from_rx() {
35     let (tx, rx) = watch::channel("hello");
36 
37     let mut stream = WatchStream::from(rx);
38 
39     assert_eq!(stream.next().await.unwrap(), "hello");
40 
41     tx.send("bye").unwrap();
42 
43     assert_eq!(stream.next().await.unwrap(), "bye");
44 }
45 
46 #[tokio::test]
watch_stream_from_changes()47 async fn watch_stream_from_changes() {
48     let (tx, rx) = watch::channel("hello");
49 
50     let mut stream = WatchStream::from_changes(rx);
51 
52     assert_pending!(spawn(&mut stream).poll_next());
53 
54     tx.send("bye").unwrap();
55 
56     assert_eq!(stream.next().await.unwrap(), "bye");
57 }
58