1 #![allow(clippy::disallowed_names)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 use futures::StreamExt;
6 use tokio::time::{self, sleep, sleep_until, Duration, Instant};
7 use tokio_test::{assert_pending, assert_ready, task};
8 use tokio_util::time::DelayQueue;
9
10 macro_rules! poll {
11 ($queue:ident) => {
12 $queue.enter(|cx, mut queue| queue.poll_expired(cx))
13 };
14 }
15
16 macro_rules! assert_ready_some {
17 ($e:expr) => {{
18 match assert_ready!($e) {
19 Some(v) => v,
20 None => panic!("None"),
21 }
22 }};
23 }
24
25 #[tokio::test]
single_immediate_delay()26 async fn single_immediate_delay() {
27 time::pause();
28
29 let mut queue = task::spawn(DelayQueue::new());
30 let _key = queue.insert_at("foo", Instant::now());
31
32 // Advance time by 1ms to handle thee rounding
33 sleep(ms(1)).await;
34
35 assert_ready_some!(poll!(queue));
36
37 let entry = assert_ready!(poll!(queue));
38 assert!(entry.is_none())
39 }
40
41 #[tokio::test]
multi_immediate_delays()42 async fn multi_immediate_delays() {
43 time::pause();
44
45 let mut queue = task::spawn(DelayQueue::new());
46
47 let _k = queue.insert_at("1", Instant::now());
48 let _k = queue.insert_at("2", Instant::now());
49 let _k = queue.insert_at("3", Instant::now());
50
51 sleep(ms(1)).await;
52
53 let mut res = vec![];
54
55 while res.len() < 3 {
56 let entry = assert_ready_some!(poll!(queue));
57 res.push(entry.into_inner());
58 }
59
60 let entry = assert_ready!(poll!(queue));
61 assert!(entry.is_none());
62
63 res.sort_unstable();
64
65 assert_eq!("1", res[0]);
66 assert_eq!("2", res[1]);
67 assert_eq!("3", res[2]);
68 }
69
70 #[tokio::test]
single_short_delay()71 async fn single_short_delay() {
72 time::pause();
73
74 let mut queue = task::spawn(DelayQueue::new());
75 let _key = queue.insert_at("foo", Instant::now() + ms(5));
76
77 assert_pending!(poll!(queue));
78
79 sleep(ms(1)).await;
80
81 assert!(!queue.is_woken());
82
83 sleep(ms(5)).await;
84
85 assert!(queue.is_woken());
86
87 let entry = assert_ready_some!(poll!(queue));
88 assert_eq!(*entry.get_ref(), "foo");
89
90 let entry = assert_ready!(poll!(queue));
91 assert!(entry.is_none());
92 }
93
94 #[tokio::test]
95 #[cfg_attr(miri, ignore)] // Too slow on miri.
multi_delay_at_start()96 async fn multi_delay_at_start() {
97 time::pause();
98
99 let long = 262_144 + 9 * 4096;
100 let delays = &[1000, 2, 234, long, 60, 10];
101
102 let mut queue = task::spawn(DelayQueue::new());
103
104 // Setup the delays
105 for &i in delays {
106 let _key = queue.insert_at(i, Instant::now() + ms(i));
107 }
108
109 assert_pending!(poll!(queue));
110 assert!(!queue.is_woken());
111
112 let start = Instant::now();
113 for elapsed in 0..1200 {
114 println!("elapsed: {elapsed:?}");
115 let elapsed = elapsed + 1;
116 tokio::time::sleep_until(start + ms(elapsed)).await;
117
118 if delays.contains(&elapsed) {
119 assert!(queue.is_woken());
120 assert_ready!(poll!(queue));
121 assert_pending!(poll!(queue));
122 } else if queue.is_woken() {
123 let cascade = &[192, 960];
124 assert!(
125 cascade.contains(&elapsed),
126 "elapsed={} dt={:?}",
127 elapsed,
128 Instant::now() - start
129 );
130
131 assert_pending!(poll!(queue));
132 }
133 }
134 println!("finished multi_delay_start");
135 }
136
137 #[tokio::test]
insert_in_past_fires_immediately()138 async fn insert_in_past_fires_immediately() {
139 println!("running insert_in_past_fires_immediately");
140 time::pause();
141
142 let mut queue = task::spawn(DelayQueue::new());
143 let now = Instant::now();
144
145 sleep(ms(10)).await;
146
147 queue.insert_at("foo", now);
148
149 assert_ready!(poll!(queue));
150 println!("finished insert_in_past_fires_immediately");
151 }
152
153 #[tokio::test]
remove_entry()154 async fn remove_entry() {
155 time::pause();
156
157 let mut queue = task::spawn(DelayQueue::new());
158
159 let key = queue.insert_at("foo", Instant::now() + ms(5));
160
161 assert_pending!(poll!(queue));
162
163 let entry = queue.remove(&key);
164 assert_eq!(entry.into_inner(), "foo");
165
166 sleep(ms(10)).await;
167
168 let entry = assert_ready!(poll!(queue));
169 assert!(entry.is_none());
170 }
171
172 #[tokio::test]
reset_entry()173 async fn reset_entry() {
174 time::pause();
175
176 let mut queue = task::spawn(DelayQueue::new());
177
178 let now = Instant::now();
179 let key = queue.insert_at("foo", now + ms(5));
180
181 assert_pending!(poll!(queue));
182 sleep(ms(1)).await;
183
184 queue.reset_at(&key, now + ms(10));
185
186 assert_pending!(poll!(queue));
187
188 sleep(ms(7)).await;
189
190 assert!(!queue.is_woken());
191
192 assert_pending!(poll!(queue));
193
194 sleep(ms(3)).await;
195
196 assert!(queue.is_woken());
197
198 let entry = assert_ready_some!(poll!(queue));
199 assert_eq!(*entry.get_ref(), "foo");
200
201 let entry = assert_ready!(poll!(queue));
202 assert!(entry.is_none())
203 }
204
205 // Reproduces tokio-rs/tokio#849.
206 #[tokio::test]
reset_much_later()207 async fn reset_much_later() {
208 time::pause();
209
210 let mut queue = task::spawn(DelayQueue::new());
211
212 let now = Instant::now();
213 sleep(ms(1)).await;
214
215 let key = queue.insert_at("foo", now + ms(200));
216 assert_pending!(poll!(queue));
217
218 sleep(ms(3)).await;
219
220 queue.reset_at(&key, now + ms(10));
221
222 sleep(ms(20)).await;
223
224 assert!(queue.is_woken());
225 }
226
227 // Reproduces tokio-rs/tokio#849.
228 #[tokio::test]
reset_twice()229 async fn reset_twice() {
230 time::pause();
231
232 let mut queue = task::spawn(DelayQueue::new());
233 let now = Instant::now();
234
235 sleep(ms(1)).await;
236
237 let key = queue.insert_at("foo", now + ms(200));
238
239 assert_pending!(poll!(queue));
240
241 sleep(ms(3)).await;
242
243 queue.reset_at(&key, now + ms(50));
244
245 sleep(ms(20)).await;
246
247 queue.reset_at(&key, now + ms(40));
248
249 sleep(ms(20)).await;
250
251 assert!(queue.is_woken());
252 }
253
254 /// Regression test: Given an entry inserted with a deadline in the past, so
255 /// that it is placed directly on the expired queue, reset the entry to a
256 /// deadline in the future. Validate that this leaves the entry and queue in an
257 /// internally consistent state by running an additional reset on the entry
258 /// before polling it to completion.
259 #[tokio::test]
repeatedly_reset_entry_inserted_as_expired()260 async fn repeatedly_reset_entry_inserted_as_expired() {
261 time::pause();
262
263 // Instants before the start of the test seem to break in wasm.
264 time::sleep(ms(1000)).await;
265
266 let mut queue = task::spawn(DelayQueue::new());
267 let now = Instant::now();
268
269 let key = queue.insert_at("foo", now - ms(100));
270
271 queue.reset_at(&key, now + ms(100));
272 queue.reset_at(&key, now + ms(50));
273
274 assert_pending!(poll!(queue));
275
276 time::sleep_until(now + ms(60)).await;
277
278 assert!(queue.is_woken());
279
280 let entry = assert_ready_some!(poll!(queue)).into_inner();
281 assert_eq!(entry, "foo");
282
283 let entry = assert_ready!(poll!(queue));
284 assert!(entry.is_none());
285 }
286
287 #[tokio::test]
remove_expired_item()288 async fn remove_expired_item() {
289 time::pause();
290
291 let mut queue = DelayQueue::new();
292
293 let now = Instant::now();
294
295 sleep(ms(10)).await;
296
297 let key = queue.insert_at("foo", now);
298
299 let entry = queue.remove(&key);
300 assert_eq!(entry.into_inner(), "foo");
301 }
302
303 /// Regression test: it should be possible to remove entries which fall in the
304 /// 0th slot of the internal timer wheel — that is, entries whose expiration
305 /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
306 /// is equal to the wheel's current elapsed time.
307 #[tokio::test]
remove_at_timer_wheel_threshold()308 async fn remove_at_timer_wheel_threshold() {
309 time::pause();
310
311 let mut queue = task::spawn(DelayQueue::new());
312
313 let now = Instant::now();
314
315 let key1 = queue.insert_at("foo", now + ms(64));
316 let key2 = queue.insert_at("bar", now + ms(64));
317
318 sleep(ms(80)).await;
319
320 let entry = assert_ready_some!(poll!(queue)).into_inner();
321
322 match entry {
323 "foo" => {
324 let entry = queue.remove(&key2).into_inner();
325 assert_eq!(entry, "bar");
326 }
327 "bar" => {
328 let entry = queue.remove(&key1).into_inner();
329 assert_eq!(entry, "foo");
330 }
331 other => panic!("other: {other:?}"),
332 }
333 }
334
335 #[tokio::test]
expires_before_last_insert()336 async fn expires_before_last_insert() {
337 time::pause();
338
339 let mut queue = task::spawn(DelayQueue::new());
340
341 let now = Instant::now();
342
343 queue.insert_at("foo", now + ms(10_000));
344
345 // Delay should be set to 8.192s here.
346 assert_pending!(poll!(queue));
347
348 // Delay should be set to the delay of the new item here
349 queue.insert_at("bar", now + ms(600));
350
351 assert_pending!(poll!(queue));
352
353 sleep(ms(600)).await;
354
355 assert!(queue.is_woken());
356
357 let entry = assert_ready_some!(poll!(queue)).into_inner();
358 assert_eq!(entry, "bar");
359 }
360
361 #[tokio::test]
multi_reset()362 async fn multi_reset() {
363 time::pause();
364
365 let mut queue = task::spawn(DelayQueue::new());
366
367 let now = Instant::now();
368
369 let one = queue.insert_at("one", now + ms(200));
370 let two = queue.insert_at("two", now + ms(250));
371
372 assert_pending!(poll!(queue));
373
374 queue.reset_at(&one, now + ms(300));
375 queue.reset_at(&two, now + ms(350));
376 queue.reset_at(&one, now + ms(400));
377
378 sleep(ms(310)).await;
379
380 assert_pending!(poll!(queue));
381
382 sleep(ms(50)).await;
383
384 let entry = assert_ready_some!(poll!(queue));
385 assert_eq!(*entry.get_ref(), "two");
386
387 assert_pending!(poll!(queue));
388
389 sleep(ms(50)).await;
390
391 let entry = assert_ready_some!(poll!(queue));
392 assert_eq!(*entry.get_ref(), "one");
393
394 let entry = assert_ready!(poll!(queue));
395 assert!(entry.is_none())
396 }
397
398 #[tokio::test]
expire_first_key_when_reset_to_expire_earlier()399 async fn expire_first_key_when_reset_to_expire_earlier() {
400 time::pause();
401
402 let mut queue = task::spawn(DelayQueue::new());
403
404 let now = Instant::now();
405
406 let one = queue.insert_at("one", now + ms(200));
407 queue.insert_at("two", now + ms(250));
408
409 assert_pending!(poll!(queue));
410
411 queue.reset_at(&one, now + ms(100));
412
413 sleep(ms(100)).await;
414
415 assert!(queue.is_woken());
416
417 let entry = assert_ready_some!(poll!(queue)).into_inner();
418 assert_eq!(entry, "one");
419 }
420
421 #[tokio::test]
expire_second_key_when_reset_to_expire_earlier()422 async fn expire_second_key_when_reset_to_expire_earlier() {
423 time::pause();
424
425 let mut queue = task::spawn(DelayQueue::new());
426
427 let now = Instant::now();
428
429 queue.insert_at("one", now + ms(200));
430 let two = queue.insert_at("two", now + ms(250));
431
432 assert_pending!(poll!(queue));
433
434 queue.reset_at(&two, now + ms(100));
435
436 sleep(ms(100)).await;
437
438 assert!(queue.is_woken());
439
440 let entry = assert_ready_some!(poll!(queue)).into_inner();
441 assert_eq!(entry, "two");
442 }
443
444 #[tokio::test]
reset_first_expiring_item_to_expire_later()445 async fn reset_first_expiring_item_to_expire_later() {
446 time::pause();
447
448 let mut queue = task::spawn(DelayQueue::new());
449
450 let now = Instant::now();
451
452 let one = queue.insert_at("one", now + ms(200));
453 let _two = queue.insert_at("two", now + ms(250));
454
455 assert_pending!(poll!(queue));
456
457 queue.reset_at(&one, now + ms(300));
458 sleep(ms(250)).await;
459
460 assert!(queue.is_woken());
461
462 let entry = assert_ready_some!(poll!(queue)).into_inner();
463 assert_eq!(entry, "two");
464 }
465
466 #[tokio::test]
insert_before_first_after_poll()467 async fn insert_before_first_after_poll() {
468 time::pause();
469
470 let mut queue = task::spawn(DelayQueue::new());
471
472 let now = Instant::now();
473
474 let _one = queue.insert_at("one", now + ms(200));
475
476 assert_pending!(poll!(queue));
477
478 let _two = queue.insert_at("two", now + ms(100));
479
480 sleep(ms(99)).await;
481
482 assert_pending!(poll!(queue));
483
484 sleep(ms(1)).await;
485
486 assert!(queue.is_woken());
487
488 let entry = assert_ready_some!(poll!(queue)).into_inner();
489 assert_eq!(entry, "two");
490 }
491
492 #[tokio::test]
insert_after_ready_poll()493 async fn insert_after_ready_poll() {
494 time::pause();
495
496 let mut queue = task::spawn(DelayQueue::new());
497
498 let now = Instant::now();
499
500 queue.insert_at("1", now + ms(100));
501 queue.insert_at("2", now + ms(100));
502 queue.insert_at("3", now + ms(100));
503
504 assert_pending!(poll!(queue));
505
506 sleep(ms(100)).await;
507
508 assert!(queue.is_woken());
509
510 let mut res = vec![];
511
512 while res.len() < 3 {
513 let entry = assert_ready_some!(poll!(queue));
514 res.push(entry.into_inner());
515 queue.insert_at("foo", now + ms(500));
516 }
517
518 res.sort_unstable();
519
520 assert_eq!("1", res[0]);
521 assert_eq!("2", res[1]);
522 assert_eq!("3", res[2]);
523 }
524
525 #[tokio::test]
reset_later_after_slot_starts()526 async fn reset_later_after_slot_starts() {
527 time::pause();
528
529 let mut queue = task::spawn(DelayQueue::new());
530
531 let now = Instant::now();
532
533 let foo = queue.insert_at("foo", now + ms(100));
534
535 assert_pending!(poll!(queue));
536
537 sleep_until(now + Duration::from_millis(80)).await;
538
539 assert!(!queue.is_woken());
540
541 // At this point the queue hasn't been polled, so `elapsed` on the wheel
542 // for the queue is still at 0 and hence the 1ms resolution slots cover
543 // [0-64). Resetting the time on the entry to 120 causes it to get put in
544 // the [64-128) slot. As the queue knows that the first entry is within
545 // that slot, but doesn't know when, it must wake immediately to advance
546 // the wheel.
547 queue.reset_at(&foo, now + ms(120));
548 assert!(queue.is_woken());
549
550 assert_pending!(poll!(queue));
551
552 sleep_until(now + Duration::from_millis(119)).await;
553 assert!(!queue.is_woken());
554
555 sleep(ms(1)).await;
556 assert!(queue.is_woken());
557
558 let entry = assert_ready_some!(poll!(queue)).into_inner();
559 assert_eq!(entry, "foo");
560 }
561
562 #[tokio::test]
reset_inserted_expired()563 async fn reset_inserted_expired() {
564 time::pause();
565
566 // Instants before the start of the test seem to break in wasm.
567 time::sleep(ms(1000)).await;
568
569 let mut queue = task::spawn(DelayQueue::new());
570 let now = Instant::now();
571
572 let key = queue.insert_at("foo", now - ms(100));
573
574 // this causes the panic described in #2473
575 queue.reset_at(&key, now + ms(100));
576
577 assert_eq!(1, queue.len());
578
579 sleep(ms(200)).await;
580
581 let entry = assert_ready_some!(poll!(queue)).into_inner();
582 assert_eq!(entry, "foo");
583
584 assert_eq!(queue.len(), 0);
585 }
586
587 #[tokio::test]
reset_earlier_after_slot_starts()588 async fn reset_earlier_after_slot_starts() {
589 time::pause();
590
591 let mut queue = task::spawn(DelayQueue::new());
592
593 let now = Instant::now();
594
595 let foo = queue.insert_at("foo", now + ms(200));
596
597 assert_pending!(poll!(queue));
598
599 sleep_until(now + Duration::from_millis(80)).await;
600
601 assert!(!queue.is_woken());
602
603 // At this point the queue hasn't been polled, so `elapsed` on the wheel
604 // for the queue is still at 0 and hence the 1ms resolution slots cover
605 // [0-64). Resetting the time on the entry to 120 causes it to get put in
606 // the [64-128) slot. As the queue knows that the first entry is within
607 // that slot, but doesn't know when, it must wake immediately to advance
608 // the wheel.
609 queue.reset_at(&foo, now + ms(120));
610 assert!(queue.is_woken());
611
612 assert_pending!(poll!(queue));
613
614 sleep_until(now + Duration::from_millis(119)).await;
615 assert!(!queue.is_woken());
616
617 sleep(ms(1)).await;
618 assert!(queue.is_woken());
619
620 let entry = assert_ready_some!(poll!(queue)).into_inner();
621 assert_eq!(entry, "foo");
622 }
623
624 #[tokio::test]
insert_in_past_after_poll_fires_immediately()625 async fn insert_in_past_after_poll_fires_immediately() {
626 time::pause();
627
628 let mut queue = task::spawn(DelayQueue::new());
629
630 let now = Instant::now();
631
632 queue.insert_at("foo", now + ms(200));
633
634 assert_pending!(poll!(queue));
635
636 sleep(ms(80)).await;
637
638 assert!(!queue.is_woken());
639 queue.insert_at("bar", now + ms(40));
640
641 assert!(queue.is_woken());
642
643 let entry = assert_ready_some!(poll!(queue)).into_inner();
644 assert_eq!(entry, "bar");
645 }
646
647 #[tokio::test]
delay_queue_poll_expired_when_empty()648 async fn delay_queue_poll_expired_when_empty() {
649 let mut delay_queue = task::spawn(DelayQueue::new());
650 let key = delay_queue.insert(0, std::time::Duration::from_secs(10));
651 assert_pending!(poll!(delay_queue));
652
653 delay_queue.remove(&key);
654 assert!(assert_ready!(poll!(delay_queue)).is_none());
655 }
656
657 #[tokio::test(start_paused = true)]
compact_expire_empty()658 async fn compact_expire_empty() {
659 let mut queue = task::spawn(DelayQueue::new());
660
661 let now = Instant::now();
662
663 queue.insert_at("foo1", now + ms(10));
664 queue.insert_at("foo2", now + ms(10));
665
666 sleep(ms(10)).await;
667
668 let mut res = vec![];
669 while res.len() < 2 {
670 let entry = assert_ready_some!(poll!(queue));
671 res.push(entry.into_inner());
672 }
673
674 queue.compact();
675
676 assert_eq!(queue.len(), 0);
677 assert_eq!(queue.capacity(), 0);
678 }
679
680 #[tokio::test(start_paused = true)]
compact_remove_empty()681 async fn compact_remove_empty() {
682 let mut queue = task::spawn(DelayQueue::new());
683
684 let now = Instant::now();
685
686 let key1 = queue.insert_at("foo1", now + ms(10));
687 let key2 = queue.insert_at("foo2", now + ms(10));
688
689 queue.remove(&key1);
690 queue.remove(&key2);
691
692 queue.compact();
693
694 assert_eq!(queue.len(), 0);
695 assert_eq!(queue.capacity(), 0);
696 }
697
698 #[tokio::test(start_paused = true)]
699 // Trigger a re-mapping of keys in the slab due to a `compact` call and
700 // test removal of re-mapped keys
compact_remove_remapped_keys()701 async fn compact_remove_remapped_keys() {
702 let mut queue = task::spawn(DelayQueue::new());
703
704 let now = Instant::now();
705
706 queue.insert_at("foo1", now + ms(10));
707 queue.insert_at("foo2", now + ms(10));
708
709 // should be assigned indices 3 and 4
710 let key3 = queue.insert_at("foo3", now + ms(20));
711 let key4 = queue.insert_at("foo4", now + ms(20));
712
713 sleep(ms(10)).await;
714
715 let mut res = vec![];
716 while res.len() < 2 {
717 let entry = assert_ready_some!(poll!(queue));
718 res.push(entry.into_inner());
719 }
720
721 // items corresponding to `foo3` and `foo4` will be assigned
722 // new indices here
723 queue.compact();
724
725 queue.insert_at("foo5", now + ms(10));
726
727 // test removal of re-mapped keys
728 let expired3 = queue.remove(&key3);
729 let expired4 = queue.remove(&key4);
730
731 assert_eq!(expired3.into_inner(), "foo3");
732 assert_eq!(expired4.into_inner(), "foo4");
733
734 queue.compact();
735 assert_eq!(queue.len(), 1);
736 assert_eq!(queue.capacity(), 1);
737 }
738
739 #[tokio::test(start_paused = true)]
compact_change_deadline()740 async fn compact_change_deadline() {
741 let mut queue = task::spawn(DelayQueue::new());
742
743 let mut now = Instant::now();
744
745 queue.insert_at("foo1", now + ms(10));
746 queue.insert_at("foo2", now + ms(10));
747
748 // should be assigned indices 3 and 4
749 queue.insert_at("foo3", now + ms(20));
750 let key4 = queue.insert_at("foo4", now + ms(20));
751
752 sleep(ms(10)).await;
753
754 let mut res = vec![];
755 while res.len() < 2 {
756 let entry = assert_ready_some!(poll!(queue));
757 res.push(entry.into_inner());
758 }
759
760 // items corresponding to `foo3` and `foo4` should be assigned
761 // new indices
762 queue.compact();
763
764 now = Instant::now();
765
766 queue.insert_at("foo5", now + ms(10));
767 let key6 = queue.insert_at("foo6", now + ms(10));
768
769 queue.reset_at(&key4, now + ms(20));
770 queue.reset_at(&key6, now + ms(20));
771
772 // foo3 and foo5 will expire
773 sleep(ms(10)).await;
774
775 while res.len() < 4 {
776 let entry = assert_ready_some!(poll!(queue));
777 res.push(entry.into_inner());
778 }
779
780 sleep(ms(10)).await;
781
782 while res.len() < 6 {
783 let entry = assert_ready_some!(poll!(queue));
784 res.push(entry.into_inner());
785 }
786
787 let entry = assert_ready!(poll!(queue));
788 assert!(entry.is_none());
789 }
790
791 #[tokio::test(start_paused = true)]
item_expiry_greater_than_wheel()792 async fn item_expiry_greater_than_wheel() {
793 // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted.
794 let mut queue = DelayQueue::new();
795 for _ in 0..2 {
796 tokio::time::advance(Duration::from_millis(1 << 35)).await;
797 queue.insert(0, Duration::from_millis(0));
798 queue.next().await;
799 }
800 // This should not panic
801 let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
802 queue.insert(1, Duration::from_millis(1));
803 }));
804 assert!(no_panic.is_ok());
805 }
806
807 #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
808 #[tokio::test(start_paused = true)]
809 #[cfg(panic = "unwind")]
remove_after_compact()810 async fn remove_after_compact() {
811 let now = Instant::now();
812 let mut queue = DelayQueue::new();
813
814 let foo_key = queue.insert_at("foo", now + ms(10));
815 queue.insert_at("bar", now + ms(20));
816 queue.remove(&foo_key);
817 queue.compact();
818
819 let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
820 queue.remove(&foo_key);
821 }));
822 assert!(panic.is_err());
823 }
824
825 #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
826 #[tokio::test(start_paused = true)]
827 #[cfg(panic = "unwind")]
remove_after_compact_poll()828 async fn remove_after_compact_poll() {
829 let now = Instant::now();
830 let mut queue = task::spawn(DelayQueue::new());
831
832 let foo_key = queue.insert_at("foo", now + ms(10));
833 queue.insert_at("bar", now + ms(20));
834
835 sleep(ms(10)).await;
836 assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key);
837
838 queue.compact();
839
840 let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
841 queue.remove(&foo_key);
842 }));
843 assert!(panic.is_err());
844 }
845
846 #[tokio::test(start_paused = true)]
peek()847 async fn peek() {
848 let mut queue = task::spawn(DelayQueue::new());
849
850 let now = Instant::now();
851
852 let key = queue.insert_at("foo", now + ms(5));
853 let key2 = queue.insert_at("bar", now);
854 let key3 = queue.insert_at("baz", now + ms(10));
855
856 assert_eq!(queue.peek(), Some(key2));
857
858 sleep(ms(6)).await;
859
860 assert_eq!(queue.peek(), Some(key2));
861
862 let entry = assert_ready_some!(poll!(queue));
863 assert_eq!(entry.get_ref(), &"bar");
864
865 assert_eq!(queue.peek(), Some(key));
866
867 let entry = assert_ready_some!(poll!(queue));
868 assert_eq!(entry.get_ref(), &"foo");
869
870 assert_eq!(queue.peek(), Some(key3));
871
872 assert_pending!(poll!(queue));
873
874 sleep(ms(5)).await;
875
876 assert_eq!(queue.peek(), Some(key3));
877
878 let entry = assert_ready_some!(poll!(queue));
879 assert_eq!(entry.get_ref(), &"baz");
880
881 assert!(queue.peek().is_none());
882 }
883
884 #[tokio::test(start_paused = true)]
wake_after_remove_last()885 async fn wake_after_remove_last() {
886 let mut queue = task::spawn(DelayQueue::new());
887 let key = queue.insert("foo", ms(1000));
888
889 assert_pending!(poll!(queue));
890
891 queue.remove(&key);
892
893 assert!(queue.is_woken());
894 assert!(assert_ready!(poll!(queue)).is_none());
895 }
896
ms(n: u64) -> Duration897 fn ms(n: u64) -> Duration {
898 Duration::from_millis(n)
899 }
900