1 #![allow(clippy::redundant_clone)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 #[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6 use wasm_bindgen_test::wasm_bindgen_test as test;
7
8 use std::sync::atomic::AtomicUsize;
9 use std::sync::atomic::Ordering::{Acquire, Release};
10 use tokio::sync::mpsc::{self, channel, unbounded_channel};
11 use tokio::sync::oneshot;
12
13 #[tokio::test]
weak_sender()14 async fn weak_sender() {
15 let (tx, mut rx) = channel(11);
16
17 let tx_weak = tokio::spawn(async move {
18 let tx_weak = tx.clone().downgrade();
19
20 for i in 0..10 {
21 if tx.send(i).await.is_err() {
22 return None;
23 }
24 }
25
26 let tx2 = tx_weak
27 .upgrade()
28 .expect("expected to be able to upgrade tx_weak");
29 let _ = tx2.send(20).await;
30 let tx_weak = tx2.downgrade();
31
32 Some(tx_weak)
33 })
34 .await
35 .unwrap();
36
37 for i in 0..12 {
38 let recvd = rx.recv().await;
39
40 match recvd {
41 Some(msg) => {
42 if i == 10 {
43 assert_eq!(msg, 20);
44 }
45 }
46 None => {
47 assert_eq!(i, 11);
48 break;
49 }
50 }
51 }
52
53 let tx_weak = tx_weak.unwrap();
54 let upgraded = tx_weak.upgrade();
55 assert!(upgraded.is_none());
56 }
57
58 #[tokio::test]
actor_weak_sender()59 async fn actor_weak_sender() {
60 pub struct MyActor {
61 receiver: mpsc::Receiver<ActorMessage>,
62 sender: mpsc::WeakSender<ActorMessage>,
63 next_id: u32,
64 pub received_self_msg: bool,
65 }
66
67 enum ActorMessage {
68 GetUniqueId { respond_to: oneshot::Sender<u32> },
69 SelfMessage {},
70 }
71
72 impl MyActor {
73 fn new(
74 receiver: mpsc::Receiver<ActorMessage>,
75 sender: mpsc::WeakSender<ActorMessage>,
76 ) -> Self {
77 MyActor {
78 receiver,
79 sender,
80 next_id: 0,
81 received_self_msg: false,
82 }
83 }
84
85 fn handle_message(&mut self, msg: ActorMessage) {
86 match msg {
87 ActorMessage::GetUniqueId { respond_to } => {
88 self.next_id += 1;
89
90 // The `let _ =` ignores any errors when sending.
91 //
92 // This can happen if the `select!` macro is used
93 // to cancel waiting for the response.
94 let _ = respond_to.send(self.next_id);
95 }
96 ActorMessage::SelfMessage { .. } => {
97 self.received_self_msg = true;
98 }
99 }
100 }
101
102 async fn send_message_to_self(&mut self) {
103 let msg = ActorMessage::SelfMessage {};
104
105 let sender = self.sender.clone();
106
107 // cannot move self.sender here
108 if let Some(sender) = sender.upgrade() {
109 let _ = sender.send(msg).await;
110 self.sender = sender.downgrade();
111 }
112 }
113
114 async fn run(&mut self) {
115 let mut i = 0;
116 while let Some(msg) = self.receiver.recv().await {
117 self.handle_message(msg);
118
119 if i == 0 {
120 self.send_message_to_self().await;
121 }
122
123 i += 1
124 }
125
126 assert!(self.received_self_msg);
127 }
128 }
129
130 #[derive(Clone)]
131 pub struct MyActorHandle {
132 sender: mpsc::Sender<ActorMessage>,
133 }
134
135 impl MyActorHandle {
136 pub fn new() -> (Self, MyActor) {
137 let (sender, receiver) = mpsc::channel(8);
138 let actor = MyActor::new(receiver, sender.clone().downgrade());
139
140 (Self { sender }, actor)
141 }
142
143 pub async fn get_unique_id(&self) -> u32 {
144 let (send, recv) = oneshot::channel();
145 let msg = ActorMessage::GetUniqueId { respond_to: send };
146
147 // Ignore send errors. If this send fails, so does the
148 // recv.await below. There's no reason to check the
149 // failure twice.
150 let _ = self.sender.send(msg).await;
151 recv.await.expect("Actor task has been killed")
152 }
153 }
154
155 let (handle, mut actor) = MyActorHandle::new();
156
157 let actor_handle = tokio::spawn(async move { actor.run().await });
158
159 let _ = tokio::spawn(async move {
160 let _ = handle.get_unique_id().await;
161 drop(handle);
162 })
163 .await;
164
165 let _ = actor_handle.await;
166 }
167
168 static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0);
169
170 #[derive(Debug)]
171 struct Msg;
172
173 impl Drop for Msg {
drop(&mut self)174 fn drop(&mut self) {
175 NUM_DROPPED.fetch_add(1, Release);
176 }
177 }
178
179 // Tests that no pending messages are put onto the channel after `Rx` was
180 // dropped.
181 //
182 // Note: After the introduction of `WeakSender`, which internally
183 // used `Arc` and doesn't call a drop of the channel after the last strong
184 // `Sender` was dropped while more than one `WeakSender` remains, we want to
185 // ensure that no messages are kept in the channel, which were sent after
186 // the receiver was dropped.
187 #[tokio::test]
test_msgs_dropped_on_rx_drop()188 async fn test_msgs_dropped_on_rx_drop() {
189 let (tx, mut rx) = mpsc::channel(3);
190
191 tx.send(Msg {}).await.unwrap();
192 tx.send(Msg {}).await.unwrap();
193
194 // This msg will be pending and should be dropped when `rx` is dropped
195 let sent_fut = tx.send(Msg {});
196
197 let _ = rx.recv().await.unwrap();
198 let _ = rx.recv().await.unwrap();
199
200 sent_fut.await.unwrap();
201
202 drop(rx);
203
204 assert_eq!(NUM_DROPPED.load(Acquire), 3);
205
206 // This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
207 assert!(tx.send(Msg {}).await.is_err());
208
209 assert_eq!(NUM_DROPPED.load(Acquire), 4);
210 }
211
212 // Tests that a `WeakSender` is upgradeable when other `Sender`s exist.
213 #[test]
downgrade_upgrade_sender_success()214 fn downgrade_upgrade_sender_success() {
215 let (tx, _rx) = mpsc::channel::<i32>(1);
216 let weak_tx = tx.downgrade();
217 assert!(weak_tx.upgrade().is_some());
218 }
219
220 // Tests that a `WeakSender` fails to upgrade when no other `Sender` exists.
221 #[test]
downgrade_upgrade_sender_failure()222 fn downgrade_upgrade_sender_failure() {
223 let (tx, _rx) = mpsc::channel::<i32>(1);
224 let weak_tx = tx.downgrade();
225 drop(tx);
226 assert!(weak_tx.upgrade().is_none());
227 }
228
229 // Tests that a `WeakSender` cannot be upgraded after a `Sender` was dropped,
230 // which existed at the time of the `downgrade` call.
231 #[test]
downgrade_drop_upgrade()232 fn downgrade_drop_upgrade() {
233 let (tx, _rx) = mpsc::channel::<i32>(1);
234
235 // the cloned `Tx` is dropped right away
236 let weak_tx = tx.clone().downgrade();
237 drop(tx);
238 assert!(weak_tx.upgrade().is_none());
239 }
240
241 // Tests that we can upgrade a weak sender with an outstanding permit
242 // but no other strong senders.
243 #[tokio::test]
downgrade_get_permit_upgrade_no_senders()244 async fn downgrade_get_permit_upgrade_no_senders() {
245 let (tx, _rx) = mpsc::channel::<i32>(1);
246 let weak_tx = tx.downgrade();
247 let _permit = tx.reserve_owned().await.unwrap();
248 assert!(weak_tx.upgrade().is_some());
249 }
250
251 // Tests that you can downgrade and upgrade a sender with an outstanding permit
252 // but no other senders left.
253 #[tokio::test]
downgrade_upgrade_get_permit_no_senders()254 async fn downgrade_upgrade_get_permit_no_senders() {
255 let (tx, _rx) = mpsc::channel::<i32>(1);
256 let tx2 = tx.clone();
257 let _permit = tx.reserve_owned().await.unwrap();
258 let weak_tx = tx2.downgrade();
259 drop(tx2);
260 assert!(weak_tx.upgrade().is_some());
261 }
262
263 // Tests that `downgrade` does not change the `tx_count` of the channel.
264 #[test]
test_tx_count_weak_sender()265 fn test_tx_count_weak_sender() {
266 let (tx, _rx) = mpsc::channel::<i32>(1);
267 let tx_weak = tx.downgrade();
268 let tx_weak2 = tx.downgrade();
269 drop(tx);
270
271 assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
272 }
273
274 #[tokio::test]
weak_unbounded_sender()275 async fn weak_unbounded_sender() {
276 let (tx, mut rx) = unbounded_channel();
277
278 let tx_weak = tokio::spawn(async move {
279 let tx_weak = tx.clone().downgrade();
280
281 for i in 0..10 {
282 if tx.send(i).is_err() {
283 return None;
284 }
285 }
286
287 let tx2 = tx_weak
288 .upgrade()
289 .expect("expected to be able to upgrade tx_weak");
290 let _ = tx2.send(20);
291 let tx_weak = tx2.downgrade();
292
293 Some(tx_weak)
294 })
295 .await
296 .unwrap();
297
298 for i in 0..12 {
299 let recvd = rx.recv().await;
300
301 match recvd {
302 Some(msg) => {
303 if i == 10 {
304 assert_eq!(msg, 20);
305 }
306 }
307 None => {
308 assert_eq!(i, 11);
309 break;
310 }
311 }
312 }
313
314 let tx_weak = tx_weak.unwrap();
315 let upgraded = tx_weak.upgrade();
316 assert!(upgraded.is_none());
317 }
318
319 #[tokio::test]
actor_weak_unbounded_sender()320 async fn actor_weak_unbounded_sender() {
321 pub struct MyActor {
322 receiver: mpsc::UnboundedReceiver<ActorMessage>,
323 sender: mpsc::WeakUnboundedSender<ActorMessage>,
324 next_id: u32,
325 pub received_self_msg: bool,
326 }
327
328 enum ActorMessage {
329 GetUniqueId { respond_to: oneshot::Sender<u32> },
330 SelfMessage {},
331 }
332
333 impl MyActor {
334 fn new(
335 receiver: mpsc::UnboundedReceiver<ActorMessage>,
336 sender: mpsc::WeakUnboundedSender<ActorMessage>,
337 ) -> Self {
338 MyActor {
339 receiver,
340 sender,
341 next_id: 0,
342 received_self_msg: false,
343 }
344 }
345
346 fn handle_message(&mut self, msg: ActorMessage) {
347 match msg {
348 ActorMessage::GetUniqueId { respond_to } => {
349 self.next_id += 1;
350
351 // The `let _ =` ignores any errors when sending.
352 //
353 // This can happen if the `select!` macro is used
354 // to cancel waiting for the response.
355 let _ = respond_to.send(self.next_id);
356 }
357 ActorMessage::SelfMessage { .. } => {
358 self.received_self_msg = true;
359 }
360 }
361 }
362
363 async fn send_message_to_self(&mut self) {
364 let msg = ActorMessage::SelfMessage {};
365
366 let sender = self.sender.clone();
367
368 // cannot move self.sender here
369 if let Some(sender) = sender.upgrade() {
370 let _ = sender.send(msg);
371 self.sender = sender.downgrade();
372 }
373 }
374
375 async fn run(&mut self) {
376 let mut i = 0;
377 while let Some(msg) = self.receiver.recv().await {
378 self.handle_message(msg);
379
380 if i == 0 {
381 self.send_message_to_self().await;
382 }
383
384 i += 1
385 }
386
387 assert!(self.received_self_msg);
388 }
389 }
390
391 #[derive(Clone)]
392 pub struct MyActorHandle {
393 sender: mpsc::UnboundedSender<ActorMessage>,
394 }
395
396 impl MyActorHandle {
397 pub fn new() -> (Self, MyActor) {
398 let (sender, receiver) = mpsc::unbounded_channel();
399 let actor = MyActor::new(receiver, sender.clone().downgrade());
400
401 (Self { sender }, actor)
402 }
403
404 pub async fn get_unique_id(&self) -> u32 {
405 let (send, recv) = oneshot::channel();
406 let msg = ActorMessage::GetUniqueId { respond_to: send };
407
408 // Ignore send errors. If this send fails, so does the
409 // recv.await below. There's no reason to check the
410 // failure twice.
411 let _ = self.sender.send(msg);
412 recv.await.expect("Actor task has been killed")
413 }
414 }
415
416 let (handle, mut actor) = MyActorHandle::new();
417
418 let actor_handle = tokio::spawn(async move { actor.run().await });
419
420 let _ = tokio::spawn(async move {
421 let _ = handle.get_unique_id().await;
422 drop(handle);
423 })
424 .await;
425
426 let _ = actor_handle.await;
427 }
428
429 static NUM_DROPPED_UNBOUNDED: AtomicUsize = AtomicUsize::new(0);
430
431 #[derive(Debug)]
432 struct MsgUnbounded;
433
434 impl Drop for MsgUnbounded {
drop(&mut self)435 fn drop(&mut self) {
436 NUM_DROPPED_UNBOUNDED.fetch_add(1, Release);
437 }
438 }
439
440 // Tests that no pending messages are put onto the channel after `Rx` was
441 // dropped.
442 //
443 // Note: After the introduction of `UnboundedWeakSender`, which internally
444 // used `Arc` and doesn't call a drop of the channel after the last strong
445 // `UnboundedSender` was dropped while more than one `UnboundedWeakSender`
446 // remains, we want to ensure that no messages are kept in the channel, which
447 // were sent after the receiver was dropped.
448 #[tokio::test]
test_msgs_dropped_on_unbounded_rx_drop()449 async fn test_msgs_dropped_on_unbounded_rx_drop() {
450 let (tx, mut rx) = mpsc::unbounded_channel();
451
452 tx.send(MsgUnbounded {}).unwrap();
453 tx.send(MsgUnbounded {}).unwrap();
454
455 // This msg will be pending and should be dropped when `rx` is dropped
456 let sent = tx.send(MsgUnbounded {});
457
458 let _ = rx.recv().await.unwrap();
459 let _ = rx.recv().await.unwrap();
460
461 sent.unwrap();
462
463 drop(rx);
464
465 assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 3);
466
467 // This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
468 assert!(tx.send(MsgUnbounded {}).is_err());
469
470 assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 4);
471 }
472
473 // Tests that an `WeakUnboundedSender` is upgradeable when other
474 // `UnboundedSender`s exist.
475 #[test]
downgrade_upgrade_unbounded_sender_success()476 fn downgrade_upgrade_unbounded_sender_success() {
477 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
478 let weak_tx = tx.downgrade();
479 assert!(weak_tx.upgrade().is_some());
480 }
481
482 // Tests that a `WeakUnboundedSender` fails to upgrade when no other
483 // `UnboundedSender` exists.
484 #[test]
downgrade_upgrade_unbounded_sender_failure()485 fn downgrade_upgrade_unbounded_sender_failure() {
486 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
487 let weak_tx = tx.downgrade();
488 drop(tx);
489 assert!(weak_tx.upgrade().is_none());
490 }
491
492 // Tests that an `WeakUnboundedSender` cannot be upgraded after an
493 // `UnboundedSender` was dropped, which existed at the time of the `downgrade` call.
494 #[test]
downgrade_drop_upgrade_unbounded()495 fn downgrade_drop_upgrade_unbounded() {
496 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
497
498 // the cloned `Tx` is dropped right away
499 let weak_tx = tx.clone().downgrade();
500 drop(tx);
501 assert!(weak_tx.upgrade().is_none());
502 }
503
504 // Tests that `downgrade` does not change the `tx_count` of the channel.
505 #[test]
test_tx_count_weak_unbounded_sender()506 fn test_tx_count_weak_unbounded_sender() {
507 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
508 let tx_weak = tx.downgrade();
509 let tx_weak2 = tx.downgrade();
510 drop(tx);
511
512 assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
513 }
514
515 #[tokio::test]
test_rx_is_closed_when_dropping_all_senders_except_weak_senders()516 async fn test_rx_is_closed_when_dropping_all_senders_except_weak_senders() {
517 // is_closed should return true after dropping all senders except for a weak sender
518 let (tx, rx) = mpsc::channel::<()>(10);
519 let _weak_sender = tx.clone().downgrade();
520 drop(tx);
521 assert!(rx.is_closed());
522 }
523
524 #[tokio::test]
test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_senders()525 async fn test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_senders() {
526 // is_closed should return true after dropping all senders except for a weak sender
527 let (tx, rx) = mpsc::unbounded_channel::<()>();
528 let _weak_sender = tx.clone().downgrade();
529 drop(tx);
530 assert!(rx.is_closed());
531 }
532
533 #[tokio::test]
sender_strong_count_when_cloned()534 async fn sender_strong_count_when_cloned() {
535 let (tx, rx) = mpsc::channel::<()>(1);
536
537 let tx2 = tx.clone();
538
539 assert_eq!(tx.strong_count(), 2);
540 assert_eq!(tx2.strong_count(), 2);
541 assert_eq!(rx.sender_strong_count(), 2);
542 }
543
544 #[tokio::test]
sender_weak_count_when_downgraded()545 async fn sender_weak_count_when_downgraded() {
546 let (tx, _rx) = mpsc::channel::<()>(1);
547
548 let weak = tx.downgrade();
549
550 assert_eq!(tx.weak_count(), 1);
551 assert_eq!(weak.weak_count(), 1);
552 }
553
554 #[tokio::test]
sender_strong_count_when_dropped()555 async fn sender_strong_count_when_dropped() {
556 let (tx, rx) = mpsc::channel::<()>(1);
557
558 let tx2 = tx.clone();
559
560 drop(tx2);
561
562 assert_eq!(tx.strong_count(), 1);
563 assert_eq!(rx.sender_strong_count(), 1);
564 }
565
566 #[tokio::test]
sender_weak_count_when_dropped()567 async fn sender_weak_count_when_dropped() {
568 let (tx, rx) = mpsc::channel::<()>(1);
569
570 let weak = tx.downgrade();
571
572 drop(weak);
573
574 assert_eq!(tx.weak_count(), 0);
575 assert_eq!(rx.sender_weak_count(), 0);
576 }
577
578 #[tokio::test]
sender_strong_and_weak_conut()579 async fn sender_strong_and_weak_conut() {
580 let (tx, rx) = mpsc::channel::<()>(1);
581
582 let tx2 = tx.clone();
583
584 let weak = tx.downgrade();
585 let weak2 = tx2.downgrade();
586
587 assert_eq!(tx.strong_count(), 2);
588 assert_eq!(tx2.strong_count(), 2);
589 assert_eq!(weak.strong_count(), 2);
590 assert_eq!(weak2.strong_count(), 2);
591 assert_eq!(rx.sender_strong_count(), 2);
592
593 assert_eq!(tx.weak_count(), 2);
594 assert_eq!(tx2.weak_count(), 2);
595 assert_eq!(weak.weak_count(), 2);
596 assert_eq!(weak2.weak_count(), 2);
597 assert_eq!(rx.sender_weak_count(), 2);
598
599 drop(tx2);
600 drop(weak2);
601
602 assert_eq!(tx.strong_count(), 1);
603 assert_eq!(weak.strong_count(), 1);
604 assert_eq!(rx.sender_strong_count(), 1);
605
606 assert_eq!(tx.weak_count(), 1);
607 assert_eq!(weak.weak_count(), 1);
608 assert_eq!(rx.sender_weak_count(), 1);
609 }
610
611 #[tokio::test]
unbounded_sender_strong_count_when_cloned()612 async fn unbounded_sender_strong_count_when_cloned() {
613 let (tx, rx) = mpsc::unbounded_channel::<()>();
614
615 let tx2 = tx.clone();
616
617 assert_eq!(tx.strong_count(), 2);
618 assert_eq!(tx2.strong_count(), 2);
619 assert_eq!(rx.sender_strong_count(), 2);
620 }
621
622 #[tokio::test]
unbounded_sender_weak_count_when_downgraded()623 async fn unbounded_sender_weak_count_when_downgraded() {
624 let (tx, rx) = mpsc::unbounded_channel::<()>();
625
626 let weak = tx.downgrade();
627
628 assert_eq!(tx.weak_count(), 1);
629 assert_eq!(weak.weak_count(), 1);
630 assert_eq!(rx.sender_weak_count(), 1);
631 }
632
633 #[tokio::test]
unbounded_sender_strong_count_when_dropped()634 async fn unbounded_sender_strong_count_when_dropped() {
635 let (tx, rx) = mpsc::unbounded_channel::<()>();
636
637 let tx2 = tx.clone();
638
639 drop(tx2);
640
641 assert_eq!(tx.strong_count(), 1);
642 assert_eq!(rx.sender_strong_count(), 1);
643 }
644
645 #[tokio::test]
unbounded_sender_weak_count_when_dropped()646 async fn unbounded_sender_weak_count_when_dropped() {
647 let (tx, rx) = mpsc::unbounded_channel::<()>();
648
649 let weak = tx.downgrade();
650
651 drop(weak);
652
653 assert_eq!(tx.weak_count(), 0);
654 assert_eq!(rx.sender_weak_count(), 0);
655 }
656
657 #[tokio::test]
unbounded_sender_strong_and_weak_conut()658 async fn unbounded_sender_strong_and_weak_conut() {
659 let (tx, rx) = mpsc::unbounded_channel::<()>();
660
661 let tx2 = tx.clone();
662
663 let weak = tx.downgrade();
664 let weak2 = tx2.downgrade();
665
666 assert_eq!(tx.strong_count(), 2);
667 assert_eq!(tx2.strong_count(), 2);
668 assert_eq!(weak.strong_count(), 2);
669 assert_eq!(weak2.strong_count(), 2);
670 assert_eq!(rx.sender_strong_count(), 2);
671
672 assert_eq!(tx.weak_count(), 2);
673 assert_eq!(tx2.weak_count(), 2);
674 assert_eq!(weak.weak_count(), 2);
675 assert_eq!(weak2.weak_count(), 2);
676 assert_eq!(rx.sender_weak_count(), 2);
677
678 drop(tx2);
679 drop(weak2);
680
681 assert_eq!(tx.strong_count(), 1);
682 assert_eq!(weak.strong_count(), 1);
683 assert_eq!(rx.sender_strong_count(), 1);
684
685 assert_eq!(tx.weak_count(), 1);
686 assert_eq!(weak.weak_count(), 1);
687 assert_eq!(rx.sender_weak_count(), 1);
688 }
689