1 #![cfg(feature = "sync")] 2 3 use tokio::sync::watch; 4 use tokio_stream::wrappers::WatchStream; 5 use tokio_stream::StreamExt; 6 use tokio_test::assert_pending; 7 use tokio_test::task::spawn; 8 9 #[tokio::test] watch_stream_message_not_twice()10async fn watch_stream_message_not_twice() { 11 let (tx, rx) = watch::channel("hello"); 12 13 let mut counter = 0; 14 let mut stream = WatchStream::new(rx).map(move |payload| { 15 println!("{}", payload); 16 if payload == "goodbye" { 17 counter += 1; 18 } 19 if counter >= 2 { 20 panic!("too many goodbyes"); 21 } 22 }); 23 24 let task = tokio::spawn(async move { while stream.next().await.is_some() {} }); 25 26 // Send goodbye just once 27 tx.send("goodbye").unwrap(); 28 29 drop(tx); 30 task.await.unwrap(); 31 } 32 33 #[tokio::test] watch_stream_from_rx()34async fn watch_stream_from_rx() { 35 let (tx, rx) = watch::channel("hello"); 36 37 let mut stream = WatchStream::from(rx); 38 39 assert_eq!(stream.next().await.unwrap(), "hello"); 40 41 tx.send("bye").unwrap(); 42 43 assert_eq!(stream.next().await.unwrap(), "bye"); 44 } 45 46 #[tokio::test] watch_stream_from_changes()47async fn watch_stream_from_changes() { 48 let (tx, rx) = watch::channel("hello"); 49 50 let mut stream = WatchStream::from_changes(rx); 51 52 assert_pending!(spawn(&mut stream).poll_next()); 53 54 tx.send("bye").unwrap(); 55 56 assert_eq!(stream.next().await.unwrap(), "bye"); 57 } 58