1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7 
8 use std::sync::atomic::AtomicUsize;
9 use std::sync::atomic::Ordering::{Acquire, Release};
10 use tokio::sync::mpsc::{self, channel, unbounded_channel};
11 use tokio::sync::oneshot;
12 
13 #[tokio::test]
weak_sender()14 async fn weak_sender() {
15     let (tx, mut rx) = channel(11);
16 
17     let tx_weak = tokio::spawn(async move {
18         let tx_weak = tx.clone().downgrade();
19 
20         for i in 0..10 {
21             if tx.send(i).await.is_err() {
22                 return None;
23             }
24         }
25 
26         let tx2 = tx_weak
27             .upgrade()
28             .expect("expected to be able to upgrade tx_weak");
29         let _ = tx2.send(20).await;
30         let tx_weak = tx2.downgrade();
31 
32         Some(tx_weak)
33     })
34     .await
35     .unwrap();
36 
37     for i in 0..12 {
38         let recvd = rx.recv().await;
39 
40         match recvd {
41             Some(msg) => {
42                 if i == 10 {
43                     assert_eq!(msg, 20);
44                 }
45             }
46             None => {
47                 assert_eq!(i, 11);
48                 break;
49             }
50         }
51     }
52 
53     let tx_weak = tx_weak.unwrap();
54     let upgraded = tx_weak.upgrade();
55     assert!(upgraded.is_none());
56 }
57 
58 #[tokio::test]
actor_weak_sender()59 async fn actor_weak_sender() {
60     pub struct MyActor {
61         receiver: mpsc::Receiver<ActorMessage>,
62         sender: mpsc::WeakSender<ActorMessage>,
63         next_id: u32,
64         pub received_self_msg: bool,
65     }
66 
67     enum ActorMessage {
68         GetUniqueId { respond_to: oneshot::Sender<u32> },
69         SelfMessage {},
70     }
71 
72     impl MyActor {
73         fn new(
74             receiver: mpsc::Receiver<ActorMessage>,
75             sender: mpsc::WeakSender<ActorMessage>,
76         ) -> Self {
77             MyActor {
78                 receiver,
79                 sender,
80                 next_id: 0,
81                 received_self_msg: false,
82             }
83         }
84 
85         fn handle_message(&mut self, msg: ActorMessage) {
86             match msg {
87                 ActorMessage::GetUniqueId { respond_to } => {
88                     self.next_id += 1;
89 
90                     // The `let _ =` ignores any errors when sending.
91                     //
92                     // This can happen if the `select!` macro is used
93                     // to cancel waiting for the response.
94                     let _ = respond_to.send(self.next_id);
95                 }
96                 ActorMessage::SelfMessage { .. } => {
97                     self.received_self_msg = true;
98                 }
99             }
100         }
101 
102         async fn send_message_to_self(&mut self) {
103             let msg = ActorMessage::SelfMessage {};
104 
105             let sender = self.sender.clone();
106 
107             // cannot move self.sender here
108             if let Some(sender) = sender.upgrade() {
109                 let _ = sender.send(msg).await;
110                 self.sender = sender.downgrade();
111             }
112         }
113 
114         async fn run(&mut self) {
115             let mut i = 0;
116             while let Some(msg) = self.receiver.recv().await {
117                 self.handle_message(msg);
118 
119                 if i == 0 {
120                     self.send_message_to_self().await;
121                 }
122 
123                 i += 1
124             }
125 
126             assert!(self.received_self_msg);
127         }
128     }
129 
130     #[derive(Clone)]
131     pub struct MyActorHandle {
132         sender: mpsc::Sender<ActorMessage>,
133     }
134 
135     impl MyActorHandle {
136         pub fn new() -> (Self, MyActor) {
137             let (sender, receiver) = mpsc::channel(8);
138             let actor = MyActor::new(receiver, sender.clone().downgrade());
139 
140             (Self { sender }, actor)
141         }
142 
143         pub async fn get_unique_id(&self) -> u32 {
144             let (send, recv) = oneshot::channel();
145             let msg = ActorMessage::GetUniqueId { respond_to: send };
146 
147             // Ignore send errors. If this send fails, so does the
148             // recv.await below. There's no reason to check the
149             // failure twice.
150             let _ = self.sender.send(msg).await;
151             recv.await.expect("Actor task has been killed")
152         }
153     }
154 
155     let (handle, mut actor) = MyActorHandle::new();
156 
157     let actor_handle = tokio::spawn(async move { actor.run().await });
158 
159     let _ = tokio::spawn(async move {
160         let _ = handle.get_unique_id().await;
161         drop(handle);
162     })
163     .await;
164 
165     let _ = actor_handle.await;
166 }
167 
168 static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0);
169 
170 #[derive(Debug)]
171 struct Msg;
172 
173 impl Drop for Msg {
drop(&mut self)174     fn drop(&mut self) {
175         NUM_DROPPED.fetch_add(1, Release);
176     }
177 }
178 
179 // Tests that no pending messages are put onto the channel after `Rx` was
180 // dropped.
181 //
182 // Note: After the introduction of `WeakSender`, which internally
183 // used `Arc` and doesn't call a drop of the channel after the last strong
184 // `Sender` was dropped while more than one `WeakSender` remains, we want to
185 // ensure that no messages are kept in the channel, which were sent after
186 // the receiver was dropped.
187 #[tokio::test]
test_msgs_dropped_on_rx_drop()188 async fn test_msgs_dropped_on_rx_drop() {
189     let (tx, mut rx) = mpsc::channel(3);
190 
191     tx.send(Msg {}).await.unwrap();
192     tx.send(Msg {}).await.unwrap();
193 
194     // This msg will be pending and should be dropped when `rx` is dropped
195     let sent_fut = tx.send(Msg {});
196 
197     let _ = rx.recv().await.unwrap();
198     let _ = rx.recv().await.unwrap();
199 
200     sent_fut.await.unwrap();
201 
202     drop(rx);
203 
204     assert_eq!(NUM_DROPPED.load(Acquire), 3);
205 
206     // This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
207     assert!(tx.send(Msg {}).await.is_err());
208 
209     assert_eq!(NUM_DROPPED.load(Acquire), 4);
210 }
211 
212 // Tests that a `WeakSender` is upgradeable when other `Sender`s exist.
213 #[test]
downgrade_upgrade_sender_success()214 fn downgrade_upgrade_sender_success() {
215     let (tx, _rx) = mpsc::channel::<i32>(1);
216     let weak_tx = tx.downgrade();
217     assert!(weak_tx.upgrade().is_some());
218 }
219 
220 // Tests that a `WeakSender` fails to upgrade when no other `Sender` exists.
221 #[test]
downgrade_upgrade_sender_failure()222 fn downgrade_upgrade_sender_failure() {
223     let (tx, _rx) = mpsc::channel::<i32>(1);
224     let weak_tx = tx.downgrade();
225     drop(tx);
226     assert!(weak_tx.upgrade().is_none());
227 }
228 
229 // Tests that a `WeakSender` cannot be upgraded after a `Sender` was dropped,
230 // which existed at the time of the `downgrade` call.
231 #[test]
downgrade_drop_upgrade()232 fn downgrade_drop_upgrade() {
233     let (tx, _rx) = mpsc::channel::<i32>(1);
234 
235     // the cloned `Tx` is dropped right away
236     let weak_tx = tx.clone().downgrade();
237     drop(tx);
238     assert!(weak_tx.upgrade().is_none());
239 }
240 
241 // Tests that we can upgrade a weak sender with an outstanding permit
242 // but no other strong senders.
243 #[tokio::test]
downgrade_get_permit_upgrade_no_senders()244 async fn downgrade_get_permit_upgrade_no_senders() {
245     let (tx, _rx) = mpsc::channel::<i32>(1);
246     let weak_tx = tx.downgrade();
247     let _permit = tx.reserve_owned().await.unwrap();
248     assert!(weak_tx.upgrade().is_some());
249 }
250 
251 // Tests that you can downgrade and upgrade a sender with an outstanding permit
252 // but no other senders left.
253 #[tokio::test]
downgrade_upgrade_get_permit_no_senders()254 async fn downgrade_upgrade_get_permit_no_senders() {
255     let (tx, _rx) = mpsc::channel::<i32>(1);
256     let tx2 = tx.clone();
257     let _permit = tx.reserve_owned().await.unwrap();
258     let weak_tx = tx2.downgrade();
259     drop(tx2);
260     assert!(weak_tx.upgrade().is_some());
261 }
262 
263 // Tests that `downgrade` does not change the `tx_count` of the channel.
264 #[test]
test_tx_count_weak_sender()265 fn test_tx_count_weak_sender() {
266     let (tx, _rx) = mpsc::channel::<i32>(1);
267     let tx_weak = tx.downgrade();
268     let tx_weak2 = tx.downgrade();
269     drop(tx);
270 
271     assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
272 }
273 
274 #[tokio::test]
weak_unbounded_sender()275 async fn weak_unbounded_sender() {
276     let (tx, mut rx) = unbounded_channel();
277 
278     let tx_weak = tokio::spawn(async move {
279         let tx_weak = tx.clone().downgrade();
280 
281         for i in 0..10 {
282             if tx.send(i).is_err() {
283                 return None;
284             }
285         }
286 
287         let tx2 = tx_weak
288             .upgrade()
289             .expect("expected to be able to upgrade tx_weak");
290         let _ = tx2.send(20);
291         let tx_weak = tx2.downgrade();
292 
293         Some(tx_weak)
294     })
295     .await
296     .unwrap();
297 
298     for i in 0..12 {
299         let recvd = rx.recv().await;
300 
301         match recvd {
302             Some(msg) => {
303                 if i == 10 {
304                     assert_eq!(msg, 20);
305                 }
306             }
307             None => {
308                 assert_eq!(i, 11);
309                 break;
310             }
311         }
312     }
313 
314     let tx_weak = tx_weak.unwrap();
315     let upgraded = tx_weak.upgrade();
316     assert!(upgraded.is_none());
317 }
318 
319 #[tokio::test]
actor_weak_unbounded_sender()320 async fn actor_weak_unbounded_sender() {
321     pub struct MyActor {
322         receiver: mpsc::UnboundedReceiver<ActorMessage>,
323         sender: mpsc::WeakUnboundedSender<ActorMessage>,
324         next_id: u32,
325         pub received_self_msg: bool,
326     }
327 
328     enum ActorMessage {
329         GetUniqueId { respond_to: oneshot::Sender<u32> },
330         SelfMessage {},
331     }
332 
333     impl MyActor {
334         fn new(
335             receiver: mpsc::UnboundedReceiver<ActorMessage>,
336             sender: mpsc::WeakUnboundedSender<ActorMessage>,
337         ) -> Self {
338             MyActor {
339                 receiver,
340                 sender,
341                 next_id: 0,
342                 received_self_msg: false,
343             }
344         }
345 
346         fn handle_message(&mut self, msg: ActorMessage) {
347             match msg {
348                 ActorMessage::GetUniqueId { respond_to } => {
349                     self.next_id += 1;
350 
351                     // The `let _ =` ignores any errors when sending.
352                     //
353                     // This can happen if the `select!` macro is used
354                     // to cancel waiting for the response.
355                     let _ = respond_to.send(self.next_id);
356                 }
357                 ActorMessage::SelfMessage { .. } => {
358                     self.received_self_msg = true;
359                 }
360             }
361         }
362 
363         async fn send_message_to_self(&mut self) {
364             let msg = ActorMessage::SelfMessage {};
365 
366             let sender = self.sender.clone();
367 
368             // cannot move self.sender here
369             if let Some(sender) = sender.upgrade() {
370                 let _ = sender.send(msg);
371                 self.sender = sender.downgrade();
372             }
373         }
374 
375         async fn run(&mut self) {
376             let mut i = 0;
377             while let Some(msg) = self.receiver.recv().await {
378                 self.handle_message(msg);
379 
380                 if i == 0 {
381                     self.send_message_to_self().await;
382                 }
383 
384                 i += 1
385             }
386 
387             assert!(self.received_self_msg);
388         }
389     }
390 
391     #[derive(Clone)]
392     pub struct MyActorHandle {
393         sender: mpsc::UnboundedSender<ActorMessage>,
394     }
395 
396     impl MyActorHandle {
397         pub fn new() -> (Self, MyActor) {
398             let (sender, receiver) = mpsc::unbounded_channel();
399             let actor = MyActor::new(receiver, sender.clone().downgrade());
400 
401             (Self { sender }, actor)
402         }
403 
404         pub async fn get_unique_id(&self) -> u32 {
405             let (send, recv) = oneshot::channel();
406             let msg = ActorMessage::GetUniqueId { respond_to: send };
407 
408             // Ignore send errors. If this send fails, so does the
409             // recv.await below. There's no reason to check the
410             // failure twice.
411             let _ = self.sender.send(msg);
412             recv.await.expect("Actor task has been killed")
413         }
414     }
415 
416     let (handle, mut actor) = MyActorHandle::new();
417 
418     let actor_handle = tokio::spawn(async move { actor.run().await });
419 
420     let _ = tokio::spawn(async move {
421         let _ = handle.get_unique_id().await;
422         drop(handle);
423     })
424     .await;
425 
426     let _ = actor_handle.await;
427 }
428 
429 static NUM_DROPPED_UNBOUNDED: AtomicUsize = AtomicUsize::new(0);
430 
431 #[derive(Debug)]
432 struct MsgUnbounded;
433 
434 impl Drop for MsgUnbounded {
drop(&mut self)435     fn drop(&mut self) {
436         NUM_DROPPED_UNBOUNDED.fetch_add(1, Release);
437     }
438 }
439 
440 // Tests that no pending messages are put onto the channel after `Rx` was
441 // dropped.
442 //
443 // Note: After the introduction of `UnboundedWeakSender`, which internally
444 // used `Arc` and doesn't call a drop of the channel after the last strong
445 // `UnboundedSender` was dropped while more than one `UnboundedWeakSender`
446 // remains, we want to ensure that no messages are kept in the channel, which
447 // were sent after the receiver was dropped.
448 #[tokio::test]
test_msgs_dropped_on_unbounded_rx_drop()449 async fn test_msgs_dropped_on_unbounded_rx_drop() {
450     let (tx, mut rx) = mpsc::unbounded_channel();
451 
452     tx.send(MsgUnbounded {}).unwrap();
453     tx.send(MsgUnbounded {}).unwrap();
454 
455     // This msg will be pending and should be dropped when `rx` is dropped
456     let sent = tx.send(MsgUnbounded {});
457 
458     let _ = rx.recv().await.unwrap();
459     let _ = rx.recv().await.unwrap();
460 
461     sent.unwrap();
462 
463     drop(rx);
464 
465     assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 3);
466 
467     // This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
468     assert!(tx.send(MsgUnbounded {}).is_err());
469 
470     assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 4);
471 }
472 
473 // Tests that an `WeakUnboundedSender` is upgradeable when other
474 // `UnboundedSender`s exist.
475 #[test]
downgrade_upgrade_unbounded_sender_success()476 fn downgrade_upgrade_unbounded_sender_success() {
477     let (tx, _rx) = mpsc::unbounded_channel::<i32>();
478     let weak_tx = tx.downgrade();
479     assert!(weak_tx.upgrade().is_some());
480 }
481 
482 // Tests that a `WeakUnboundedSender` fails to upgrade when no other
483 // `UnboundedSender` exists.
484 #[test]
downgrade_upgrade_unbounded_sender_failure()485 fn downgrade_upgrade_unbounded_sender_failure() {
486     let (tx, _rx) = mpsc::unbounded_channel::<i32>();
487     let weak_tx = tx.downgrade();
488     drop(tx);
489     assert!(weak_tx.upgrade().is_none());
490 }
491 
492 // Tests that an `WeakUnboundedSender` cannot be upgraded after an
493 // `UnboundedSender` was dropped, which existed at the time of the `downgrade` call.
494 #[test]
downgrade_drop_upgrade_unbounded()495 fn downgrade_drop_upgrade_unbounded() {
496     let (tx, _rx) = mpsc::unbounded_channel::<i32>();
497 
498     // the cloned `Tx` is dropped right away
499     let weak_tx = tx.clone().downgrade();
500     drop(tx);
501     assert!(weak_tx.upgrade().is_none());
502 }
503 
504 // Tests that `downgrade` does not change the `tx_count` of the channel.
505 #[test]
test_tx_count_weak_unbounded_sender()506 fn test_tx_count_weak_unbounded_sender() {
507     let (tx, _rx) = mpsc::unbounded_channel::<i32>();
508     let tx_weak = tx.downgrade();
509     let tx_weak2 = tx.downgrade();
510     drop(tx);
511 
512     assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
513 }
514 
515 #[tokio::test]
test_rx_is_closed_when_dropping_all_senders_except_weak_senders()516 async fn test_rx_is_closed_when_dropping_all_senders_except_weak_senders() {
517     // is_closed should return true after dropping all senders except for a weak sender
518     let (tx, rx) = mpsc::channel::<()>(10);
519     let _weak_sender = tx.clone().downgrade();
520     drop(tx);
521     assert!(rx.is_closed());
522 }
523 
524 #[tokio::test]
test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_senders()525 async fn test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_senders() {
526     // is_closed should return true after dropping all senders except for a weak sender
527     let (tx, rx) = mpsc::unbounded_channel::<()>();
528     let _weak_sender = tx.clone().downgrade();
529     drop(tx);
530     assert!(rx.is_closed());
531 }
532 
533 #[tokio::test]
sender_strong_count_when_cloned()534 async fn sender_strong_count_when_cloned() {
535     let (tx, rx) = mpsc::channel::<()>(1);
536 
537     let tx2 = tx.clone();
538 
539     assert_eq!(tx.strong_count(), 2);
540     assert_eq!(tx2.strong_count(), 2);
541     assert_eq!(rx.sender_strong_count(), 2);
542 }
543 
544 #[tokio::test]
sender_weak_count_when_downgraded()545 async fn sender_weak_count_when_downgraded() {
546     let (tx, _rx) = mpsc::channel::<()>(1);
547 
548     let weak = tx.downgrade();
549 
550     assert_eq!(tx.weak_count(), 1);
551     assert_eq!(weak.weak_count(), 1);
552 }
553 
554 #[tokio::test]
sender_strong_count_when_dropped()555 async fn sender_strong_count_when_dropped() {
556     let (tx, rx) = mpsc::channel::<()>(1);
557 
558     let tx2 = tx.clone();
559 
560     drop(tx2);
561 
562     assert_eq!(tx.strong_count(), 1);
563     assert_eq!(rx.sender_strong_count(), 1);
564 }
565 
566 #[tokio::test]
sender_weak_count_when_dropped()567 async fn sender_weak_count_when_dropped() {
568     let (tx, rx) = mpsc::channel::<()>(1);
569 
570     let weak = tx.downgrade();
571 
572     drop(weak);
573 
574     assert_eq!(tx.weak_count(), 0);
575     assert_eq!(rx.sender_weak_count(), 0);
576 }
577 
578 #[tokio::test]
sender_strong_and_weak_conut()579 async fn sender_strong_and_weak_conut() {
580     let (tx, rx) = mpsc::channel::<()>(1);
581 
582     let tx2 = tx.clone();
583 
584     let weak = tx.downgrade();
585     let weak2 = tx2.downgrade();
586 
587     assert_eq!(tx.strong_count(), 2);
588     assert_eq!(tx2.strong_count(), 2);
589     assert_eq!(weak.strong_count(), 2);
590     assert_eq!(weak2.strong_count(), 2);
591     assert_eq!(rx.sender_strong_count(), 2);
592 
593     assert_eq!(tx.weak_count(), 2);
594     assert_eq!(tx2.weak_count(), 2);
595     assert_eq!(weak.weak_count(), 2);
596     assert_eq!(weak2.weak_count(), 2);
597     assert_eq!(rx.sender_weak_count(), 2);
598 
599     drop(tx2);
600     drop(weak2);
601 
602     assert_eq!(tx.strong_count(), 1);
603     assert_eq!(weak.strong_count(), 1);
604     assert_eq!(rx.sender_strong_count(), 1);
605 
606     assert_eq!(tx.weak_count(), 1);
607     assert_eq!(weak.weak_count(), 1);
608     assert_eq!(rx.sender_weak_count(), 1);
609 }
610 
611 #[tokio::test]
unbounded_sender_strong_count_when_cloned()612 async fn unbounded_sender_strong_count_when_cloned() {
613     let (tx, rx) = mpsc::unbounded_channel::<()>();
614 
615     let tx2 = tx.clone();
616 
617     assert_eq!(tx.strong_count(), 2);
618     assert_eq!(tx2.strong_count(), 2);
619     assert_eq!(rx.sender_strong_count(), 2);
620 }
621 
622 #[tokio::test]
unbounded_sender_weak_count_when_downgraded()623 async fn unbounded_sender_weak_count_when_downgraded() {
624     let (tx, rx) = mpsc::unbounded_channel::<()>();
625 
626     let weak = tx.downgrade();
627 
628     assert_eq!(tx.weak_count(), 1);
629     assert_eq!(weak.weak_count(), 1);
630     assert_eq!(rx.sender_weak_count(), 1);
631 }
632 
633 #[tokio::test]
unbounded_sender_strong_count_when_dropped()634 async fn unbounded_sender_strong_count_when_dropped() {
635     let (tx, rx) = mpsc::unbounded_channel::<()>();
636 
637     let tx2 = tx.clone();
638 
639     drop(tx2);
640 
641     assert_eq!(tx.strong_count(), 1);
642     assert_eq!(rx.sender_strong_count(), 1);
643 }
644 
645 #[tokio::test]
unbounded_sender_weak_count_when_dropped()646 async fn unbounded_sender_weak_count_when_dropped() {
647     let (tx, rx) = mpsc::unbounded_channel::<()>();
648 
649     let weak = tx.downgrade();
650 
651     drop(weak);
652 
653     assert_eq!(tx.weak_count(), 0);
654     assert_eq!(rx.sender_weak_count(), 0);
655 }
656 
657 #[tokio::test]
unbounded_sender_strong_and_weak_conut()658 async fn unbounded_sender_strong_and_weak_conut() {
659     let (tx, rx) = mpsc::unbounded_channel::<()>();
660 
661     let tx2 = tx.clone();
662 
663     let weak = tx.downgrade();
664     let weak2 = tx2.downgrade();
665 
666     assert_eq!(tx.strong_count(), 2);
667     assert_eq!(tx2.strong_count(), 2);
668     assert_eq!(weak.strong_count(), 2);
669     assert_eq!(weak2.strong_count(), 2);
670     assert_eq!(rx.sender_strong_count(), 2);
671 
672     assert_eq!(tx.weak_count(), 2);
673     assert_eq!(tx2.weak_count(), 2);
674     assert_eq!(weak.weak_count(), 2);
675     assert_eq!(weak2.weak_count(), 2);
676     assert_eq!(rx.sender_weak_count(), 2);
677 
678     drop(tx2);
679     drop(weak2);
680 
681     assert_eq!(tx.strong_count(), 1);
682     assert_eq!(weak.strong_count(), 1);
683     assert_eq!(rx.sender_strong_count(), 1);
684 
685     assert_eq!(tx.weak_count(), 1);
686     assert_eq!(weak.weak_count(), 1);
687     assert_eq!(rx.sender_weak_count(), 1);
688 }
689