1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7
8 use tokio::sync::broadcast;
9 use tokio_test::task;
10 use tokio_test::{
11 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12 };
13
14 use std::sync::Arc;
15
16 macro_rules! assert_recv {
17 ($e:expr) => {
18 match $e.try_recv() {
19 Ok(value) => value,
20 Err(e) => panic!("expected recv; got = {:?}", e),
21 }
22 };
23 }
24
25 macro_rules! assert_empty {
26 ($e:expr) => {
27 match $e.try_recv() {
28 Ok(value) => panic!("expected empty; got = {:?}", value),
29 Err(broadcast::error::TryRecvError::Empty) => {}
30 Err(e) => panic!("expected empty; got = {:?}", e),
31 }
32 };
33 }
34
35 macro_rules! assert_lagged {
36 ($e:expr, $n:expr) => {
37 match assert_err!($e) {
38 broadcast::error::TryRecvError::Lagged(n) => {
39 assert_eq!(n, $n);
40 }
41 _ => panic!("did not lag"),
42 }
43 };
44 }
45
46 macro_rules! assert_closed {
47 ($e:expr) => {
48 match assert_err!($e) {
49 broadcast::error::TryRecvError::Closed => {}
50 _ => panic!("is not closed"),
51 }
52 };
53 }
54
55 #[allow(unused)]
56 trait AssertSend: Send + Sync {}
57 impl AssertSend for broadcast::Sender<i32> {}
58 impl AssertSend for broadcast::Receiver<i32> {}
59
60 #[test]
send_try_recv_bounded()61 fn send_try_recv_bounded() {
62 let (tx, mut rx) = broadcast::channel(16);
63
64 assert_empty!(rx);
65
66 let n = assert_ok!(tx.send("hello"));
67 assert_eq!(n, 1);
68
69 let val = assert_recv!(rx);
70 assert_eq!(val, "hello");
71
72 assert_empty!(rx);
73 }
74
75 #[test]
send_two_recv()76 fn send_two_recv() {
77 let (tx, mut rx1) = broadcast::channel(16);
78 let mut rx2 = tx.subscribe();
79
80 assert_empty!(rx1);
81 assert_empty!(rx2);
82
83 let n = assert_ok!(tx.send("hello"));
84 assert_eq!(n, 2);
85
86 let val = assert_recv!(rx1);
87 assert_eq!(val, "hello");
88
89 let val = assert_recv!(rx2);
90 assert_eq!(val, "hello");
91
92 assert_empty!(rx1);
93 assert_empty!(rx2);
94 }
95
96 #[test]
send_recv_bounded()97 fn send_recv_bounded() {
98 let (tx, mut rx) = broadcast::channel(16);
99
100 let mut recv = task::spawn(rx.recv());
101
102 assert_pending!(recv.poll());
103
104 assert_ok!(tx.send("hello"));
105
106 assert!(recv.is_woken());
107 let val = assert_ready_ok!(recv.poll());
108 assert_eq!(val, "hello");
109 }
110
111 #[test]
send_two_recv_bounded()112 fn send_two_recv_bounded() {
113 let (tx, mut rx1) = broadcast::channel(16);
114 let mut rx2 = tx.subscribe();
115
116 let mut recv1 = task::spawn(rx1.recv());
117 let mut recv2 = task::spawn(rx2.recv());
118
119 assert_pending!(recv1.poll());
120 assert_pending!(recv2.poll());
121
122 assert_ok!(tx.send("hello"));
123
124 assert!(recv1.is_woken());
125 assert!(recv2.is_woken());
126
127 let val1 = assert_ready_ok!(recv1.poll());
128 let val2 = assert_ready_ok!(recv2.poll());
129 assert_eq!(val1, "hello");
130 assert_eq!(val2, "hello");
131
132 drop((recv1, recv2));
133
134 let mut recv1 = task::spawn(rx1.recv());
135 let mut recv2 = task::spawn(rx2.recv());
136
137 assert_pending!(recv1.poll());
138
139 assert_ok!(tx.send("world"));
140
141 assert!(recv1.is_woken());
142 assert!(!recv2.is_woken());
143
144 let val1 = assert_ready_ok!(recv1.poll());
145 let val2 = assert_ready_ok!(recv2.poll());
146 assert_eq!(val1, "world");
147 assert_eq!(val2, "world");
148 }
149
150 #[test]
change_tasks()151 fn change_tasks() {
152 let (tx, mut rx) = broadcast::channel(1);
153
154 let mut recv = Box::pin(rx.recv());
155
156 let mut task1 = task::spawn(&mut recv);
157 assert_pending!(task1.poll());
158
159 let mut task2 = task::spawn(&mut recv);
160 assert_pending!(task2.poll());
161
162 tx.send("hello").unwrap();
163
164 assert!(task2.is_woken());
165 }
166
167 #[test]
send_slow_rx()168 fn send_slow_rx() {
169 let (tx, mut rx1) = broadcast::channel(16);
170 let mut rx2 = tx.subscribe();
171
172 {
173 let mut recv2 = task::spawn(rx2.recv());
174
175 {
176 let mut recv1 = task::spawn(rx1.recv());
177
178 assert_pending!(recv1.poll());
179 assert_pending!(recv2.poll());
180
181 assert_ok!(tx.send("one"));
182
183 assert!(recv1.is_woken());
184 assert!(recv2.is_woken());
185
186 assert_ok!(tx.send("two"));
187
188 let val = assert_ready_ok!(recv1.poll());
189 assert_eq!(val, "one");
190 }
191
192 let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
193 assert_eq!(val, "two");
194
195 let mut recv1 = task::spawn(rx1.recv());
196
197 assert_pending!(recv1.poll());
198
199 assert_ok!(tx.send("three"));
200
201 assert!(recv1.is_woken());
202
203 let val = assert_ready_ok!(recv1.poll());
204 assert_eq!(val, "three");
205
206 let val = assert_ready_ok!(recv2.poll());
207 assert_eq!(val, "one");
208 }
209
210 let val = assert_recv!(rx2);
211 assert_eq!(val, "two");
212
213 let val = assert_recv!(rx2);
214 assert_eq!(val, "three");
215 }
216
217 #[test]
drop_rx_while_values_remain()218 fn drop_rx_while_values_remain() {
219 let (tx, mut rx1) = broadcast::channel(16);
220 let mut rx2 = tx.subscribe();
221
222 assert_ok!(tx.send("one"));
223 assert_ok!(tx.send("two"));
224
225 assert_recv!(rx1);
226 assert_recv!(rx2);
227
228 drop(rx2);
229 drop(rx1);
230 }
231
232 #[test]
lagging_rx()233 fn lagging_rx() {
234 let (tx, mut rx1) = broadcast::channel(2);
235 let mut rx2 = tx.subscribe();
236
237 assert_ok!(tx.send("one"));
238 assert_ok!(tx.send("two"));
239
240 assert_eq!("one", assert_recv!(rx1));
241
242 assert_ok!(tx.send("three"));
243
244 // Lagged too far
245 let x = dbg!(rx2.try_recv());
246 assert_lagged!(x, 1);
247
248 // Calling again gets the next value
249 assert_eq!("two", assert_recv!(rx2));
250
251 assert_eq!("two", assert_recv!(rx1));
252 assert_eq!("three", assert_recv!(rx1));
253
254 assert_ok!(tx.send("four"));
255 assert_ok!(tx.send("five"));
256
257 assert_lagged!(rx2.try_recv(), 1);
258
259 assert_ok!(tx.send("six"));
260
261 assert_lagged!(rx2.try_recv(), 1);
262 }
263
264 #[test]
send_no_rx()265 fn send_no_rx() {
266 let (tx, _) = broadcast::channel(16);
267
268 assert_err!(tx.send("hello"));
269
270 let mut rx = tx.subscribe();
271
272 assert_ok!(tx.send("world"));
273
274 let val = assert_recv!(rx);
275 assert_eq!("world", val);
276 }
277
278 #[test]
279 #[should_panic]
280 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
zero_capacity()281 fn zero_capacity() {
282 broadcast::channel::<()>(0);
283 }
284
285 #[test]
286 #[should_panic]
287 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
capacity_too_big()288 fn capacity_too_big() {
289 broadcast::channel::<()>(1 + (usize::MAX >> 1));
290 }
291
292 #[test]
293 #[cfg(panic = "unwind")]
294 #[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
panic_in_clone()295 fn panic_in_clone() {
296 use std::panic::{self, AssertUnwindSafe};
297
298 #[derive(Eq, PartialEq, Debug)]
299 struct MyVal(usize);
300
301 impl Clone for MyVal {
302 fn clone(&self) -> MyVal {
303 assert_ne!(0, self.0);
304 MyVal(self.0)
305 }
306 }
307
308 let (tx, mut rx) = broadcast::channel(16);
309
310 assert_ok!(tx.send(MyVal(0)));
311 assert_ok!(tx.send(MyVal(1)));
312
313 let res = panic::catch_unwind(AssertUnwindSafe(|| {
314 let _ = rx.try_recv();
315 }));
316
317 assert_err!(res);
318
319 let val = assert_recv!(rx);
320 assert_eq!(val, MyVal(1));
321 }
322
323 #[test]
dropping_tx_notifies_rx()324 fn dropping_tx_notifies_rx() {
325 let (tx, mut rx1) = broadcast::channel::<()>(16);
326 let mut rx2 = tx.subscribe();
327
328 let tx2 = tx.clone();
329
330 let mut recv1 = task::spawn(rx1.recv());
331 let mut recv2 = task::spawn(rx2.recv());
332
333 assert_pending!(recv1.poll());
334 assert_pending!(recv2.poll());
335
336 drop(tx);
337
338 assert_pending!(recv1.poll());
339 assert_pending!(recv2.poll());
340
341 drop(tx2);
342
343 assert!(recv1.is_woken());
344 assert!(recv2.is_woken());
345
346 let err = assert_ready_err!(recv1.poll());
347 assert!(is_closed(err));
348
349 let err = assert_ready_err!(recv2.poll());
350 assert!(is_closed(err));
351 }
352
353 #[test]
unconsumed_messages_are_dropped()354 fn unconsumed_messages_are_dropped() {
355 let (tx, rx) = broadcast::channel(16);
356
357 let msg = Arc::new(());
358
359 assert_ok!(tx.send(msg.clone()));
360
361 assert_eq!(2, Arc::strong_count(&msg));
362
363 drop(rx);
364
365 assert_eq!(1, Arc::strong_count(&msg));
366 }
367
368 #[test]
single_capacity_recvs()369 fn single_capacity_recvs() {
370 let (tx, mut rx) = broadcast::channel(1);
371
372 assert_ok!(tx.send(1));
373
374 assert_eq!(assert_recv!(rx), 1);
375 assert_empty!(rx);
376 }
377
378 #[test]
single_capacity_recvs_after_drop_1()379 fn single_capacity_recvs_after_drop_1() {
380 let (tx, mut rx) = broadcast::channel(1);
381
382 assert_ok!(tx.send(1));
383 drop(tx);
384
385 assert_eq!(assert_recv!(rx), 1);
386 assert_closed!(rx.try_recv());
387 }
388
389 #[test]
single_capacity_recvs_after_drop_2()390 fn single_capacity_recvs_after_drop_2() {
391 let (tx, mut rx) = broadcast::channel(1);
392
393 assert_ok!(tx.send(1));
394 assert_ok!(tx.send(2));
395 drop(tx);
396
397 assert_lagged!(rx.try_recv(), 1);
398 assert_eq!(assert_recv!(rx), 2);
399 assert_closed!(rx.try_recv());
400 }
401
402 #[test]
dropping_sender_does_not_overwrite()403 fn dropping_sender_does_not_overwrite() {
404 let (tx, mut rx) = broadcast::channel(2);
405
406 assert_ok!(tx.send(1));
407 assert_ok!(tx.send(2));
408 drop(tx);
409
410 assert_eq!(assert_recv!(rx), 1);
411 assert_eq!(assert_recv!(rx), 2);
412 assert_closed!(rx.try_recv());
413 }
414
415 #[test]
lagging_receiver_recovers_after_wrap_closed_1()416 fn lagging_receiver_recovers_after_wrap_closed_1() {
417 let (tx, mut rx) = broadcast::channel(2);
418
419 assert_ok!(tx.send(1));
420 assert_ok!(tx.send(2));
421 assert_ok!(tx.send(3));
422 drop(tx);
423
424 assert_lagged!(rx.try_recv(), 1);
425 assert_eq!(assert_recv!(rx), 2);
426 assert_eq!(assert_recv!(rx), 3);
427 assert_closed!(rx.try_recv());
428 }
429
430 #[test]
lagging_receiver_recovers_after_wrap_closed_2()431 fn lagging_receiver_recovers_after_wrap_closed_2() {
432 let (tx, mut rx) = broadcast::channel(2);
433
434 assert_ok!(tx.send(1));
435 assert_ok!(tx.send(2));
436 assert_ok!(tx.send(3));
437 assert_ok!(tx.send(4));
438 drop(tx);
439
440 assert_lagged!(rx.try_recv(), 2);
441 assert_eq!(assert_recv!(rx), 3);
442 assert_eq!(assert_recv!(rx), 4);
443 assert_closed!(rx.try_recv());
444 }
445
446 #[test]
lagging_receiver_recovers_after_wrap_open()447 fn lagging_receiver_recovers_after_wrap_open() {
448 let (tx, mut rx) = broadcast::channel(2);
449
450 assert_ok!(tx.send(1));
451 assert_ok!(tx.send(2));
452 assert_ok!(tx.send(3));
453
454 assert_lagged!(rx.try_recv(), 1);
455 assert_eq!(assert_recv!(rx), 2);
456 assert_eq!(assert_recv!(rx), 3);
457 assert_empty!(rx);
458 }
459
460 #[test]
receiver_len_with_lagged()461 fn receiver_len_with_lagged() {
462 let (tx, mut rx) = broadcast::channel(3);
463
464 tx.send(10).unwrap();
465 tx.send(20).unwrap();
466 tx.send(30).unwrap();
467 tx.send(40).unwrap();
468
469 assert_eq!(rx.len(), 4);
470 assert_eq!(assert_recv!(rx), 10);
471
472 tx.send(50).unwrap();
473 tx.send(60).unwrap();
474
475 assert_eq!(rx.len(), 5);
476 assert_lagged!(rx.try_recv(), 1);
477 }
478
is_closed(err: broadcast::error::RecvError) -> bool479 fn is_closed(err: broadcast::error::RecvError) -> bool {
480 matches!(err, broadcast::error::RecvError::Closed)
481 }
482
483 #[test]
resubscribe_points_to_tail()484 fn resubscribe_points_to_tail() {
485 let (tx, mut rx) = broadcast::channel(3);
486 tx.send(1).unwrap();
487
488 let mut rx_resub = rx.resubscribe();
489
490 // verify we're one behind at the start
491 assert_empty!(rx_resub);
492 assert_eq!(assert_recv!(rx), 1);
493
494 // verify we do not affect rx
495 tx.send(2).unwrap();
496 assert_eq!(assert_recv!(rx_resub), 2);
497 tx.send(3).unwrap();
498 assert_eq!(assert_recv!(rx), 2);
499 assert_eq!(assert_recv!(rx), 3);
500 assert_empty!(rx);
501
502 assert_eq!(assert_recv!(rx_resub), 3);
503 assert_empty!(rx_resub);
504 }
505
506 #[test]
resubscribe_lagged()507 fn resubscribe_lagged() {
508 let (tx, mut rx) = broadcast::channel(1);
509 tx.send(1).unwrap();
510 tx.send(2).unwrap();
511
512 let mut rx_resub = rx.resubscribe();
513 assert_lagged!(rx.try_recv(), 1);
514 assert_empty!(rx_resub);
515
516 assert_eq!(assert_recv!(rx), 2);
517 assert_empty!(rx);
518 assert_empty!(rx_resub);
519 }
520
521 #[test]
resubscribe_to_closed_channel()522 fn resubscribe_to_closed_channel() {
523 let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2);
524 drop(tx);
525
526 let mut rx_resub = rx.resubscribe();
527 assert_closed!(rx_resub.try_recv());
528 }
529
530 #[test]
sender_len()531 fn sender_len() {
532 let (tx, mut rx1) = broadcast::channel(4);
533 let mut rx2 = tx.subscribe();
534
535 assert_eq!(tx.len(), 0);
536 assert!(tx.is_empty());
537
538 tx.send(1).unwrap();
539 tx.send(2).unwrap();
540 tx.send(3).unwrap();
541
542 assert_eq!(tx.len(), 3);
543 assert!(!tx.is_empty());
544
545 assert_recv!(rx1);
546 assert_recv!(rx1);
547
548 assert_eq!(tx.len(), 3);
549 assert!(!tx.is_empty());
550
551 assert_recv!(rx2);
552
553 assert_eq!(tx.len(), 2);
554 assert!(!tx.is_empty());
555
556 tx.send(4).unwrap();
557 tx.send(5).unwrap();
558 tx.send(6).unwrap();
559
560 assert_eq!(tx.len(), 4);
561 assert!(!tx.is_empty());
562 }
563
564 #[test]
565 #[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
sender_len_random()566 fn sender_len_random() {
567 use rand::Rng;
568
569 let (tx, mut rx1) = broadcast::channel(16);
570 let mut rx2 = tx.subscribe();
571
572 for _ in 0..1000 {
573 match rand::thread_rng().gen_range(0..4) {
574 0 => {
575 let _ = rx1.try_recv();
576 }
577 1 => {
578 let _ = rx2.try_recv();
579 }
580 _ => {
581 tx.send(0).unwrap();
582 }
583 }
584
585 let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
586 assert_eq!(tx.len(), expected_len);
587 }
588 }
589
590 #[test]
send_in_waker_drop()591 fn send_in_waker_drop() {
592 use futures::task::ArcWake;
593 use std::future::Future;
594 use std::task::Context;
595
596 struct SendOnDrop(broadcast::Sender<()>);
597
598 impl Drop for SendOnDrop {
599 fn drop(&mut self) {
600 let _ = self.0.send(());
601 }
602 }
603
604 impl ArcWake for SendOnDrop {
605 fn wake_by_ref(_arc_self: &Arc<Self>) {}
606 }
607
608 // Test if there is no deadlock when replacing the old waker.
609
610 let (tx, mut rx) = broadcast::channel(16);
611
612 let mut fut = Box::pin(async {
613 let _ = rx.recv().await;
614 });
615
616 // Store our special waker in the receiving future.
617 let waker = futures::task::waker(Arc::new(SendOnDrop(tx)));
618 let mut cx = Context::from_waker(&waker);
619 assert!(fut.as_mut().poll(&mut cx).is_pending());
620 drop(waker);
621
622 // Second poll shouldn't deadlock.
623 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
624 let _ = fut.as_mut().poll(&mut cx);
625
626 // Test if there is no deadlock when calling waker.wake().
627
628 let (tx, mut rx) = broadcast::channel(16);
629
630 let mut fut = Box::pin(async {
631 let _ = rx.recv().await;
632 });
633
634 // Store our special waker in the receiving future.
635 let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone())));
636 let mut cx = Context::from_waker(&waker);
637 assert!(fut.as_mut().poll(&mut cx).is_pending());
638 drop(waker);
639
640 // Shouldn't deadlock.
641 let _ = tx.send(());
642 }
643
644 #[tokio::test]
receiver_recv_is_cooperative()645 async fn receiver_recv_is_cooperative() {
646 let (tx, mut rx) = broadcast::channel(8);
647
648 tokio::select! {
649 biased;
650 _ = async {
651 loop {
652 assert!(tx.send(()).is_ok());
653 assert!(rx.recv().await.is_ok());
654 }
655 } => {},
656 _ = tokio::task::yield_now() => {},
657 }
658 }
659