1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 
8 use tokio::sync::broadcast;
9 use tokio_test::task;
10 use tokio_test::{
11     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12 };
13 
14 use std::sync::Arc;
15 
16 macro_rules! assert_recv {
17     ($e:expr) => {
18         match $e.try_recv() {
19             Ok(value) => value,
20             Err(e) => panic!("expected recv; got = {:?}", e),
21         }
22     };
23 }
24 
25 macro_rules! assert_empty {
26     ($e:expr) => {
27         match $e.try_recv() {
28             Ok(value) => panic!("expected empty; got = {:?}", value),
29             Err(broadcast::error::TryRecvError::Empty) => {}
30             Err(e) => panic!("expected empty; got = {:?}", e),
31         }
32     };
33 }
34 
35 macro_rules! assert_lagged {
36     ($e:expr, $n:expr) => {
37         match assert_err!($e) {
38             broadcast::error::TryRecvError::Lagged(n) => {
39                 assert_eq!(n, $n);
40             }
41             _ => panic!("did not lag"),
42         }
43     };
44 }
45 
46 macro_rules! assert_closed {
47     ($e:expr) => {
48         match assert_err!($e) {
49             broadcast::error::TryRecvError::Closed => {}
50             _ => panic!("is not closed"),
51         }
52     };
53 }
54 
55 #[allow(unused)]
56 trait AssertSend: Send + Sync {}
57 impl AssertSend for broadcast::Sender<i32> {}
58 impl AssertSend for broadcast::Receiver<i32> {}
59 
60 #[test]
send_try_recv_bounded()61 fn send_try_recv_bounded() {
62     let (tx, mut rx) = broadcast::channel(16);
63 
64     assert_empty!(rx);
65 
66     let n = assert_ok!(tx.send("hello"));
67     assert_eq!(n, 1);
68 
69     let val = assert_recv!(rx);
70     assert_eq!(val, "hello");
71 
72     assert_empty!(rx);
73 }
74 
75 #[test]
send_two_recv()76 fn send_two_recv() {
77     let (tx, mut rx1) = broadcast::channel(16);
78     let mut rx2 = tx.subscribe();
79 
80     assert_empty!(rx1);
81     assert_empty!(rx2);
82 
83     let n = assert_ok!(tx.send("hello"));
84     assert_eq!(n, 2);
85 
86     let val = assert_recv!(rx1);
87     assert_eq!(val, "hello");
88 
89     let val = assert_recv!(rx2);
90     assert_eq!(val, "hello");
91 
92     assert_empty!(rx1);
93     assert_empty!(rx2);
94 }
95 
96 #[test]
send_recv_bounded()97 fn send_recv_bounded() {
98     let (tx, mut rx) = broadcast::channel(16);
99 
100     let mut recv = task::spawn(rx.recv());
101 
102     assert_pending!(recv.poll());
103 
104     assert_ok!(tx.send("hello"));
105 
106     assert!(recv.is_woken());
107     let val = assert_ready_ok!(recv.poll());
108     assert_eq!(val, "hello");
109 }
110 
111 #[test]
send_two_recv_bounded()112 fn send_two_recv_bounded() {
113     let (tx, mut rx1) = broadcast::channel(16);
114     let mut rx2 = tx.subscribe();
115 
116     let mut recv1 = task::spawn(rx1.recv());
117     let mut recv2 = task::spawn(rx2.recv());
118 
119     assert_pending!(recv1.poll());
120     assert_pending!(recv2.poll());
121 
122     assert_ok!(tx.send("hello"));
123 
124     assert!(recv1.is_woken());
125     assert!(recv2.is_woken());
126 
127     let val1 = assert_ready_ok!(recv1.poll());
128     let val2 = assert_ready_ok!(recv2.poll());
129     assert_eq!(val1, "hello");
130     assert_eq!(val2, "hello");
131 
132     drop((recv1, recv2));
133 
134     let mut recv1 = task::spawn(rx1.recv());
135     let mut recv2 = task::spawn(rx2.recv());
136 
137     assert_pending!(recv1.poll());
138 
139     assert_ok!(tx.send("world"));
140 
141     assert!(recv1.is_woken());
142     assert!(!recv2.is_woken());
143 
144     let val1 = assert_ready_ok!(recv1.poll());
145     let val2 = assert_ready_ok!(recv2.poll());
146     assert_eq!(val1, "world");
147     assert_eq!(val2, "world");
148 }
149 
150 #[test]
change_tasks()151 fn change_tasks() {
152     let (tx, mut rx) = broadcast::channel(1);
153 
154     let mut recv = Box::pin(rx.recv());
155 
156     let mut task1 = task::spawn(&mut recv);
157     assert_pending!(task1.poll());
158 
159     let mut task2 = task::spawn(&mut recv);
160     assert_pending!(task2.poll());
161 
162     tx.send("hello").unwrap();
163 
164     assert!(task2.is_woken());
165 }
166 
167 #[test]
send_slow_rx()168 fn send_slow_rx() {
169     let (tx, mut rx1) = broadcast::channel(16);
170     let mut rx2 = tx.subscribe();
171 
172     {
173         let mut recv2 = task::spawn(rx2.recv());
174 
175         {
176             let mut recv1 = task::spawn(rx1.recv());
177 
178             assert_pending!(recv1.poll());
179             assert_pending!(recv2.poll());
180 
181             assert_ok!(tx.send("one"));
182 
183             assert!(recv1.is_woken());
184             assert!(recv2.is_woken());
185 
186             assert_ok!(tx.send("two"));
187 
188             let val = assert_ready_ok!(recv1.poll());
189             assert_eq!(val, "one");
190         }
191 
192         let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
193         assert_eq!(val, "two");
194 
195         let mut recv1 = task::spawn(rx1.recv());
196 
197         assert_pending!(recv1.poll());
198 
199         assert_ok!(tx.send("three"));
200 
201         assert!(recv1.is_woken());
202 
203         let val = assert_ready_ok!(recv1.poll());
204         assert_eq!(val, "three");
205 
206         let val = assert_ready_ok!(recv2.poll());
207         assert_eq!(val, "one");
208     }
209 
210     let val = assert_recv!(rx2);
211     assert_eq!(val, "two");
212 
213     let val = assert_recv!(rx2);
214     assert_eq!(val, "three");
215 }
216 
217 #[test]
drop_rx_while_values_remain()218 fn drop_rx_while_values_remain() {
219     let (tx, mut rx1) = broadcast::channel(16);
220     let mut rx2 = tx.subscribe();
221 
222     assert_ok!(tx.send("one"));
223     assert_ok!(tx.send("two"));
224 
225     assert_recv!(rx1);
226     assert_recv!(rx2);
227 
228     drop(rx2);
229     drop(rx1);
230 }
231 
232 #[test]
lagging_rx()233 fn lagging_rx() {
234     let (tx, mut rx1) = broadcast::channel(2);
235     let mut rx2 = tx.subscribe();
236 
237     assert_ok!(tx.send("one"));
238     assert_ok!(tx.send("two"));
239 
240     assert_eq!("one", assert_recv!(rx1));
241 
242     assert_ok!(tx.send("three"));
243 
244     // Lagged too far
245     let x = dbg!(rx2.try_recv());
246     assert_lagged!(x, 1);
247 
248     // Calling again gets the next value
249     assert_eq!("two", assert_recv!(rx2));
250 
251     assert_eq!("two", assert_recv!(rx1));
252     assert_eq!("three", assert_recv!(rx1));
253 
254     assert_ok!(tx.send("four"));
255     assert_ok!(tx.send("five"));
256 
257     assert_lagged!(rx2.try_recv(), 1);
258 
259     assert_ok!(tx.send("six"));
260 
261     assert_lagged!(rx2.try_recv(), 1);
262 }
263 
264 #[test]
send_no_rx()265 fn send_no_rx() {
266     let (tx, _) = broadcast::channel(16);
267 
268     assert_err!(tx.send("hello"));
269 
270     let mut rx = tx.subscribe();
271 
272     assert_ok!(tx.send("world"));
273 
274     let val = assert_recv!(rx);
275     assert_eq!("world", val);
276 }
277 
278 #[test]
279 #[should_panic]
280 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
zero_capacity()281 fn zero_capacity() {
282     broadcast::channel::<()>(0);
283 }
284 
285 #[test]
286 #[should_panic]
287 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
capacity_too_big()288 fn capacity_too_big() {
289     broadcast::channel::<()>(1 + (usize::MAX >> 1));
290 }
291 
292 #[test]
293 #[cfg(panic = "unwind")]
294 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
panic_in_clone()295 fn panic_in_clone() {
296     use std::panic::{self, AssertUnwindSafe};
297 
298     #[derive(Eq, PartialEq, Debug)]
299     struct MyVal(usize);
300 
301     impl Clone for MyVal {
302         fn clone(&self) -> MyVal {
303             assert_ne!(0, self.0);
304             MyVal(self.0)
305         }
306     }
307 
308     let (tx, mut rx) = broadcast::channel(16);
309 
310     assert_ok!(tx.send(MyVal(0)));
311     assert_ok!(tx.send(MyVal(1)));
312 
313     let res = panic::catch_unwind(AssertUnwindSafe(|| {
314         let _ = rx.try_recv();
315     }));
316 
317     assert_err!(res);
318 
319     let val = assert_recv!(rx);
320     assert_eq!(val, MyVal(1));
321 }
322 
323 #[test]
dropping_tx_notifies_rx()324 fn dropping_tx_notifies_rx() {
325     let (tx, mut rx1) = broadcast::channel::<()>(16);
326     let mut rx2 = tx.subscribe();
327 
328     let tx2 = tx.clone();
329 
330     let mut recv1 = task::spawn(rx1.recv());
331     let mut recv2 = task::spawn(rx2.recv());
332 
333     assert_pending!(recv1.poll());
334     assert_pending!(recv2.poll());
335 
336     drop(tx);
337 
338     assert_pending!(recv1.poll());
339     assert_pending!(recv2.poll());
340 
341     drop(tx2);
342 
343     assert!(recv1.is_woken());
344     assert!(recv2.is_woken());
345 
346     let err = assert_ready_err!(recv1.poll());
347     assert!(is_closed(err));
348 
349     let err = assert_ready_err!(recv2.poll());
350     assert!(is_closed(err));
351 }
352 
353 #[test]
unconsumed_messages_are_dropped()354 fn unconsumed_messages_are_dropped() {
355     let (tx, rx) = broadcast::channel(16);
356 
357     let msg = Arc::new(());
358 
359     assert_ok!(tx.send(msg.clone()));
360 
361     assert_eq!(2, Arc::strong_count(&msg));
362 
363     drop(rx);
364 
365     assert_eq!(1, Arc::strong_count(&msg));
366 }
367 
368 #[test]
single_capacity_recvs()369 fn single_capacity_recvs() {
370     let (tx, mut rx) = broadcast::channel(1);
371 
372     assert_ok!(tx.send(1));
373 
374     assert_eq!(assert_recv!(rx), 1);
375     assert_empty!(rx);
376 }
377 
378 #[test]
single_capacity_recvs_after_drop_1()379 fn single_capacity_recvs_after_drop_1() {
380     let (tx, mut rx) = broadcast::channel(1);
381 
382     assert_ok!(tx.send(1));
383     drop(tx);
384 
385     assert_eq!(assert_recv!(rx), 1);
386     assert_closed!(rx.try_recv());
387 }
388 
389 #[test]
single_capacity_recvs_after_drop_2()390 fn single_capacity_recvs_after_drop_2() {
391     let (tx, mut rx) = broadcast::channel(1);
392 
393     assert_ok!(tx.send(1));
394     assert_ok!(tx.send(2));
395     drop(tx);
396 
397     assert_lagged!(rx.try_recv(), 1);
398     assert_eq!(assert_recv!(rx), 2);
399     assert_closed!(rx.try_recv());
400 }
401 
402 #[test]
dropping_sender_does_not_overwrite()403 fn dropping_sender_does_not_overwrite() {
404     let (tx, mut rx) = broadcast::channel(2);
405 
406     assert_ok!(tx.send(1));
407     assert_ok!(tx.send(2));
408     drop(tx);
409 
410     assert_eq!(assert_recv!(rx), 1);
411     assert_eq!(assert_recv!(rx), 2);
412     assert_closed!(rx.try_recv());
413 }
414 
415 #[test]
lagging_receiver_recovers_after_wrap_closed_1()416 fn lagging_receiver_recovers_after_wrap_closed_1() {
417     let (tx, mut rx) = broadcast::channel(2);
418 
419     assert_ok!(tx.send(1));
420     assert_ok!(tx.send(2));
421     assert_ok!(tx.send(3));
422     drop(tx);
423 
424     assert_lagged!(rx.try_recv(), 1);
425     assert_eq!(assert_recv!(rx), 2);
426     assert_eq!(assert_recv!(rx), 3);
427     assert_closed!(rx.try_recv());
428 }
429 
430 #[test]
lagging_receiver_recovers_after_wrap_closed_2()431 fn lagging_receiver_recovers_after_wrap_closed_2() {
432     let (tx, mut rx) = broadcast::channel(2);
433 
434     assert_ok!(tx.send(1));
435     assert_ok!(tx.send(2));
436     assert_ok!(tx.send(3));
437     assert_ok!(tx.send(4));
438     drop(tx);
439 
440     assert_lagged!(rx.try_recv(), 2);
441     assert_eq!(assert_recv!(rx), 3);
442     assert_eq!(assert_recv!(rx), 4);
443     assert_closed!(rx.try_recv());
444 }
445 
446 #[test]
lagging_receiver_recovers_after_wrap_open()447 fn lagging_receiver_recovers_after_wrap_open() {
448     let (tx, mut rx) = broadcast::channel(2);
449 
450     assert_ok!(tx.send(1));
451     assert_ok!(tx.send(2));
452     assert_ok!(tx.send(3));
453 
454     assert_lagged!(rx.try_recv(), 1);
455     assert_eq!(assert_recv!(rx), 2);
456     assert_eq!(assert_recv!(rx), 3);
457     assert_empty!(rx);
458 }
459 
460 #[test]
receiver_len_with_lagged()461 fn receiver_len_with_lagged() {
462     let (tx, mut rx) = broadcast::channel(3);
463 
464     tx.send(10).unwrap();
465     tx.send(20).unwrap();
466     tx.send(30).unwrap();
467     tx.send(40).unwrap();
468 
469     assert_eq!(rx.len(), 4);
470     assert_eq!(assert_recv!(rx), 10);
471 
472     tx.send(50).unwrap();
473     tx.send(60).unwrap();
474 
475     assert_eq!(rx.len(), 5);
476     assert_lagged!(rx.try_recv(), 1);
477 }
478 
is_closed(err: broadcast::error::RecvError) -> bool479 fn is_closed(err: broadcast::error::RecvError) -> bool {
480     matches!(err, broadcast::error::RecvError::Closed)
481 }
482 
483 #[test]
resubscribe_points_to_tail()484 fn resubscribe_points_to_tail() {
485     let (tx, mut rx) = broadcast::channel(3);
486     tx.send(1).unwrap();
487 
488     let mut rx_resub = rx.resubscribe();
489 
490     // verify we're one behind at the start
491     assert_empty!(rx_resub);
492     assert_eq!(assert_recv!(rx), 1);
493 
494     // verify we do not affect rx
495     tx.send(2).unwrap();
496     assert_eq!(assert_recv!(rx_resub), 2);
497     tx.send(3).unwrap();
498     assert_eq!(assert_recv!(rx), 2);
499     assert_eq!(assert_recv!(rx), 3);
500     assert_empty!(rx);
501 
502     assert_eq!(assert_recv!(rx_resub), 3);
503     assert_empty!(rx_resub);
504 }
505 
506 #[test]
resubscribe_lagged()507 fn resubscribe_lagged() {
508     let (tx, mut rx) = broadcast::channel(1);
509     tx.send(1).unwrap();
510     tx.send(2).unwrap();
511 
512     let mut rx_resub = rx.resubscribe();
513     assert_lagged!(rx.try_recv(), 1);
514     assert_empty!(rx_resub);
515 
516     assert_eq!(assert_recv!(rx), 2);
517     assert_empty!(rx);
518     assert_empty!(rx_resub);
519 }
520 
521 #[test]
resubscribe_to_closed_channel()522 fn resubscribe_to_closed_channel() {
523     let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2);
524     drop(tx);
525 
526     let mut rx_resub = rx.resubscribe();
527     assert_closed!(rx_resub.try_recv());
528 }
529 
530 #[test]
sender_len()531 fn sender_len() {
532     let (tx, mut rx1) = broadcast::channel(4);
533     let mut rx2 = tx.subscribe();
534 
535     assert_eq!(tx.len(), 0);
536     assert!(tx.is_empty());
537 
538     tx.send(1).unwrap();
539     tx.send(2).unwrap();
540     tx.send(3).unwrap();
541 
542     assert_eq!(tx.len(), 3);
543     assert!(!tx.is_empty());
544 
545     assert_recv!(rx1);
546     assert_recv!(rx1);
547 
548     assert_eq!(tx.len(), 3);
549     assert!(!tx.is_empty());
550 
551     assert_recv!(rx2);
552 
553     assert_eq!(tx.len(), 2);
554     assert!(!tx.is_empty());
555 
556     tx.send(4).unwrap();
557     tx.send(5).unwrap();
558     tx.send(6).unwrap();
559 
560     assert_eq!(tx.len(), 4);
561     assert!(!tx.is_empty());
562 }
563 
564 #[test]
565 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
sender_len_random()566 fn sender_len_random() {
567     use rand::Rng;
568 
569     let (tx, mut rx1) = broadcast::channel(16);
570     let mut rx2 = tx.subscribe();
571 
572     for _ in 0..1000 {
573         match rand::thread_rng().gen_range(0..4) {
574             0 => {
575                 let _ = rx1.try_recv();
576             }
577             1 => {
578                 let _ = rx2.try_recv();
579             }
580             _ => {
581                 tx.send(0).unwrap();
582             }
583         }
584 
585         let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
586         assert_eq!(tx.len(), expected_len);
587     }
588 }
589 
590 #[test]
send_in_waker_drop()591 fn send_in_waker_drop() {
592     use futures::task::ArcWake;
593     use std::future::Future;
594     use std::task::Context;
595 
596     struct SendOnDrop(broadcast::Sender<()>);
597 
598     impl Drop for SendOnDrop {
599         fn drop(&mut self) {
600             let _ = self.0.send(());
601         }
602     }
603 
604     impl ArcWake for SendOnDrop {
605         fn wake_by_ref(_arc_self: &Arc<Self>) {}
606     }
607 
608     // Test if there is no deadlock when replacing the old waker.
609 
610     let (tx, mut rx) = broadcast::channel(16);
611 
612     let mut fut = Box::pin(async {
613         let _ = rx.recv().await;
614     });
615 
616     // Store our special waker in the receiving future.
617     let waker = futures::task::waker(Arc::new(SendOnDrop(tx)));
618     let mut cx = Context::from_waker(&waker);
619     assert!(fut.as_mut().poll(&mut cx).is_pending());
620     drop(waker);
621 
622     // Second poll shouldn't deadlock.
623     let mut cx = Context::from_waker(futures::task::noop_waker_ref());
624     let _ = fut.as_mut().poll(&mut cx);
625 
626     // Test if there is no deadlock when calling waker.wake().
627 
628     let (tx, mut rx) = broadcast::channel(16);
629 
630     let mut fut = Box::pin(async {
631         let _ = rx.recv().await;
632     });
633 
634     // Store our special waker in the receiving future.
635     let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone())));
636     let mut cx = Context::from_waker(&waker);
637     assert!(fut.as_mut().poll(&mut cx).is_pending());
638     drop(waker);
639 
640     // Shouldn't deadlock.
641     let _ = tx.send(());
642 }
643 
644 #[tokio::test]
receiver_recv_is_cooperative()645 async fn receiver_recv_is_cooperative() {
646     let (tx, mut rx) = broadcast::channel(8);
647 
648     tokio::select! {
649         biased;
650         _ = async {
651             loop {
652                 assert!(tx.send(()).is_ok());
653                 assert!(rx.recv().await.is_ok());
654             }
655         } => {},
656         _ = tokio::task::yield_now() => {},
657     }
658 }
659