1 #![allow(clippy::disallowed_names)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4 
5 use futures::StreamExt;
6 use tokio::time::{self, sleep, sleep_until, Duration, Instant};
7 use tokio_test::{assert_pending, assert_ready, task};
8 use tokio_util::time::DelayQueue;
9 
10 macro_rules! poll {
11     ($queue:ident) => {
12         $queue.enter(|cx, mut queue| queue.poll_expired(cx))
13     };
14 }
15 
16 macro_rules! assert_ready_some {
17     ($e:expr) => {{
18         match assert_ready!($e) {
19             Some(v) => v,
20             None => panic!("None"),
21         }
22     }};
23 }
24 
25 #[tokio::test]
single_immediate_delay()26 async fn single_immediate_delay() {
27     time::pause();
28 
29     let mut queue = task::spawn(DelayQueue::new());
30     let _key = queue.insert_at("foo", Instant::now());
31 
32     // Advance time by 1ms to handle thee rounding
33     sleep(ms(1)).await;
34 
35     assert_ready_some!(poll!(queue));
36 
37     let entry = assert_ready!(poll!(queue));
38     assert!(entry.is_none())
39 }
40 
41 #[tokio::test]
multi_immediate_delays()42 async fn multi_immediate_delays() {
43     time::pause();
44 
45     let mut queue = task::spawn(DelayQueue::new());
46 
47     let _k = queue.insert_at("1", Instant::now());
48     let _k = queue.insert_at("2", Instant::now());
49     let _k = queue.insert_at("3", Instant::now());
50 
51     sleep(ms(1)).await;
52 
53     let mut res = vec![];
54 
55     while res.len() < 3 {
56         let entry = assert_ready_some!(poll!(queue));
57         res.push(entry.into_inner());
58     }
59 
60     let entry = assert_ready!(poll!(queue));
61     assert!(entry.is_none());
62 
63     res.sort_unstable();
64 
65     assert_eq!("1", res[0]);
66     assert_eq!("2", res[1]);
67     assert_eq!("3", res[2]);
68 }
69 
70 #[tokio::test]
single_short_delay()71 async fn single_short_delay() {
72     time::pause();
73 
74     let mut queue = task::spawn(DelayQueue::new());
75     let _key = queue.insert_at("foo", Instant::now() + ms(5));
76 
77     assert_pending!(poll!(queue));
78 
79     sleep(ms(1)).await;
80 
81     assert!(!queue.is_woken());
82 
83     sleep(ms(5)).await;
84 
85     assert!(queue.is_woken());
86 
87     let entry = assert_ready_some!(poll!(queue));
88     assert_eq!(*entry.get_ref(), "foo");
89 
90     let entry = assert_ready!(poll!(queue));
91     assert!(entry.is_none());
92 }
93 
94 #[tokio::test]
95 #[cfg_attr(miri, ignore)] // Too slow on miri.
multi_delay_at_start()96 async fn multi_delay_at_start() {
97     time::pause();
98 
99     let long = 262_144 + 9 * 4096;
100     let delays = &[1000, 2, 234, long, 60, 10];
101 
102     let mut queue = task::spawn(DelayQueue::new());
103 
104     // Setup the delays
105     for &i in delays {
106         let _key = queue.insert_at(i, Instant::now() + ms(i));
107     }
108 
109     assert_pending!(poll!(queue));
110     assert!(!queue.is_woken());
111 
112     let start = Instant::now();
113     for elapsed in 0..1200 {
114         println!("elapsed: {elapsed:?}");
115         let elapsed = elapsed + 1;
116         tokio::time::sleep_until(start + ms(elapsed)).await;
117 
118         if delays.contains(&elapsed) {
119             assert!(queue.is_woken());
120             assert_ready!(poll!(queue));
121             assert_pending!(poll!(queue));
122         } else if queue.is_woken() {
123             let cascade = &[192, 960];
124             assert!(
125                 cascade.contains(&elapsed),
126                 "elapsed={} dt={:?}",
127                 elapsed,
128                 Instant::now() - start
129             );
130 
131             assert_pending!(poll!(queue));
132         }
133     }
134     println!("finished multi_delay_start");
135 }
136 
137 #[tokio::test]
insert_in_past_fires_immediately()138 async fn insert_in_past_fires_immediately() {
139     println!("running insert_in_past_fires_immediately");
140     time::pause();
141 
142     let mut queue = task::spawn(DelayQueue::new());
143     let now = Instant::now();
144 
145     sleep(ms(10)).await;
146 
147     queue.insert_at("foo", now);
148 
149     assert_ready!(poll!(queue));
150     println!("finished insert_in_past_fires_immediately");
151 }
152 
153 #[tokio::test]
remove_entry()154 async fn remove_entry() {
155     time::pause();
156 
157     let mut queue = task::spawn(DelayQueue::new());
158 
159     let key = queue.insert_at("foo", Instant::now() + ms(5));
160 
161     assert_pending!(poll!(queue));
162 
163     let entry = queue.remove(&key);
164     assert_eq!(entry.into_inner(), "foo");
165 
166     sleep(ms(10)).await;
167 
168     let entry = assert_ready!(poll!(queue));
169     assert!(entry.is_none());
170 }
171 
172 #[tokio::test]
reset_entry()173 async fn reset_entry() {
174     time::pause();
175 
176     let mut queue = task::spawn(DelayQueue::new());
177 
178     let now = Instant::now();
179     let key = queue.insert_at("foo", now + ms(5));
180 
181     assert_pending!(poll!(queue));
182     sleep(ms(1)).await;
183 
184     queue.reset_at(&key, now + ms(10));
185 
186     assert_pending!(poll!(queue));
187 
188     sleep(ms(7)).await;
189 
190     assert!(!queue.is_woken());
191 
192     assert_pending!(poll!(queue));
193 
194     sleep(ms(3)).await;
195 
196     assert!(queue.is_woken());
197 
198     let entry = assert_ready_some!(poll!(queue));
199     assert_eq!(*entry.get_ref(), "foo");
200 
201     let entry = assert_ready!(poll!(queue));
202     assert!(entry.is_none())
203 }
204 
205 // Reproduces tokio-rs/tokio#849.
206 #[tokio::test]
reset_much_later()207 async fn reset_much_later() {
208     time::pause();
209 
210     let mut queue = task::spawn(DelayQueue::new());
211 
212     let now = Instant::now();
213     sleep(ms(1)).await;
214 
215     let key = queue.insert_at("foo", now + ms(200));
216     assert_pending!(poll!(queue));
217 
218     sleep(ms(3)).await;
219 
220     queue.reset_at(&key, now + ms(10));
221 
222     sleep(ms(20)).await;
223 
224     assert!(queue.is_woken());
225 }
226 
227 // Reproduces tokio-rs/tokio#849.
228 #[tokio::test]
reset_twice()229 async fn reset_twice() {
230     time::pause();
231 
232     let mut queue = task::spawn(DelayQueue::new());
233     let now = Instant::now();
234 
235     sleep(ms(1)).await;
236 
237     let key = queue.insert_at("foo", now + ms(200));
238 
239     assert_pending!(poll!(queue));
240 
241     sleep(ms(3)).await;
242 
243     queue.reset_at(&key, now + ms(50));
244 
245     sleep(ms(20)).await;
246 
247     queue.reset_at(&key, now + ms(40));
248 
249     sleep(ms(20)).await;
250 
251     assert!(queue.is_woken());
252 }
253 
254 /// Regression test: Given an entry inserted with a deadline in the past, so
255 /// that it is placed directly on the expired queue, reset the entry to a
256 /// deadline in the future. Validate that this leaves the entry and queue in an
257 /// internally consistent state by running an additional reset on the entry
258 /// before polling it to completion.
259 #[tokio::test]
repeatedly_reset_entry_inserted_as_expired()260 async fn repeatedly_reset_entry_inserted_as_expired() {
261     time::pause();
262 
263     // Instants before the start of the test seem to break in wasm.
264     time::sleep(ms(1000)).await;
265 
266     let mut queue = task::spawn(DelayQueue::new());
267     let now = Instant::now();
268 
269     let key = queue.insert_at("foo", now - ms(100));
270 
271     queue.reset_at(&key, now + ms(100));
272     queue.reset_at(&key, now + ms(50));
273 
274     assert_pending!(poll!(queue));
275 
276     time::sleep_until(now + ms(60)).await;
277 
278     assert!(queue.is_woken());
279 
280     let entry = assert_ready_some!(poll!(queue)).into_inner();
281     assert_eq!(entry, "foo");
282 
283     let entry = assert_ready!(poll!(queue));
284     assert!(entry.is_none());
285 }
286 
287 #[tokio::test]
remove_expired_item()288 async fn remove_expired_item() {
289     time::pause();
290 
291     let mut queue = DelayQueue::new();
292 
293     let now = Instant::now();
294 
295     sleep(ms(10)).await;
296 
297     let key = queue.insert_at("foo", now);
298 
299     let entry = queue.remove(&key);
300     assert_eq!(entry.into_inner(), "foo");
301 }
302 
303 /// Regression test: it should be possible to remove entries which fall in the
304 /// 0th slot of the internal timer wheel — that is, entries whose expiration
305 /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
306 /// is equal to the wheel's current elapsed time.
307 #[tokio::test]
remove_at_timer_wheel_threshold()308 async fn remove_at_timer_wheel_threshold() {
309     time::pause();
310 
311     let mut queue = task::spawn(DelayQueue::new());
312 
313     let now = Instant::now();
314 
315     let key1 = queue.insert_at("foo", now + ms(64));
316     let key2 = queue.insert_at("bar", now + ms(64));
317 
318     sleep(ms(80)).await;
319 
320     let entry = assert_ready_some!(poll!(queue)).into_inner();
321 
322     match entry {
323         "foo" => {
324             let entry = queue.remove(&key2).into_inner();
325             assert_eq!(entry, "bar");
326         }
327         "bar" => {
328             let entry = queue.remove(&key1).into_inner();
329             assert_eq!(entry, "foo");
330         }
331         other => panic!("other: {other:?}"),
332     }
333 }
334 
335 #[tokio::test]
expires_before_last_insert()336 async fn expires_before_last_insert() {
337     time::pause();
338 
339     let mut queue = task::spawn(DelayQueue::new());
340 
341     let now = Instant::now();
342 
343     queue.insert_at("foo", now + ms(10_000));
344 
345     // Delay should be set to 8.192s here.
346     assert_pending!(poll!(queue));
347 
348     // Delay should be set to the delay of the new item here
349     queue.insert_at("bar", now + ms(600));
350 
351     assert_pending!(poll!(queue));
352 
353     sleep(ms(600)).await;
354 
355     assert!(queue.is_woken());
356 
357     let entry = assert_ready_some!(poll!(queue)).into_inner();
358     assert_eq!(entry, "bar");
359 }
360 
361 #[tokio::test]
multi_reset()362 async fn multi_reset() {
363     time::pause();
364 
365     let mut queue = task::spawn(DelayQueue::new());
366 
367     let now = Instant::now();
368 
369     let one = queue.insert_at("one", now + ms(200));
370     let two = queue.insert_at("two", now + ms(250));
371 
372     assert_pending!(poll!(queue));
373 
374     queue.reset_at(&one, now + ms(300));
375     queue.reset_at(&two, now + ms(350));
376     queue.reset_at(&one, now + ms(400));
377 
378     sleep(ms(310)).await;
379 
380     assert_pending!(poll!(queue));
381 
382     sleep(ms(50)).await;
383 
384     let entry = assert_ready_some!(poll!(queue));
385     assert_eq!(*entry.get_ref(), "two");
386 
387     assert_pending!(poll!(queue));
388 
389     sleep(ms(50)).await;
390 
391     let entry = assert_ready_some!(poll!(queue));
392     assert_eq!(*entry.get_ref(), "one");
393 
394     let entry = assert_ready!(poll!(queue));
395     assert!(entry.is_none())
396 }
397 
398 #[tokio::test]
expire_first_key_when_reset_to_expire_earlier()399 async fn expire_first_key_when_reset_to_expire_earlier() {
400     time::pause();
401 
402     let mut queue = task::spawn(DelayQueue::new());
403 
404     let now = Instant::now();
405 
406     let one = queue.insert_at("one", now + ms(200));
407     queue.insert_at("two", now + ms(250));
408 
409     assert_pending!(poll!(queue));
410 
411     queue.reset_at(&one, now + ms(100));
412 
413     sleep(ms(100)).await;
414 
415     assert!(queue.is_woken());
416 
417     let entry = assert_ready_some!(poll!(queue)).into_inner();
418     assert_eq!(entry, "one");
419 }
420 
421 #[tokio::test]
expire_second_key_when_reset_to_expire_earlier()422 async fn expire_second_key_when_reset_to_expire_earlier() {
423     time::pause();
424 
425     let mut queue = task::spawn(DelayQueue::new());
426 
427     let now = Instant::now();
428 
429     queue.insert_at("one", now + ms(200));
430     let two = queue.insert_at("two", now + ms(250));
431 
432     assert_pending!(poll!(queue));
433 
434     queue.reset_at(&two, now + ms(100));
435 
436     sleep(ms(100)).await;
437 
438     assert!(queue.is_woken());
439 
440     let entry = assert_ready_some!(poll!(queue)).into_inner();
441     assert_eq!(entry, "two");
442 }
443 
444 #[tokio::test]
reset_first_expiring_item_to_expire_later()445 async fn reset_first_expiring_item_to_expire_later() {
446     time::pause();
447 
448     let mut queue = task::spawn(DelayQueue::new());
449 
450     let now = Instant::now();
451 
452     let one = queue.insert_at("one", now + ms(200));
453     let _two = queue.insert_at("two", now + ms(250));
454 
455     assert_pending!(poll!(queue));
456 
457     queue.reset_at(&one, now + ms(300));
458     sleep(ms(250)).await;
459 
460     assert!(queue.is_woken());
461 
462     let entry = assert_ready_some!(poll!(queue)).into_inner();
463     assert_eq!(entry, "two");
464 }
465 
466 #[tokio::test]
insert_before_first_after_poll()467 async fn insert_before_first_after_poll() {
468     time::pause();
469 
470     let mut queue = task::spawn(DelayQueue::new());
471 
472     let now = Instant::now();
473 
474     let _one = queue.insert_at("one", now + ms(200));
475 
476     assert_pending!(poll!(queue));
477 
478     let _two = queue.insert_at("two", now + ms(100));
479 
480     sleep(ms(99)).await;
481 
482     assert_pending!(poll!(queue));
483 
484     sleep(ms(1)).await;
485 
486     assert!(queue.is_woken());
487 
488     let entry = assert_ready_some!(poll!(queue)).into_inner();
489     assert_eq!(entry, "two");
490 }
491 
492 #[tokio::test]
insert_after_ready_poll()493 async fn insert_after_ready_poll() {
494     time::pause();
495 
496     let mut queue = task::spawn(DelayQueue::new());
497 
498     let now = Instant::now();
499 
500     queue.insert_at("1", now + ms(100));
501     queue.insert_at("2", now + ms(100));
502     queue.insert_at("3", now + ms(100));
503 
504     assert_pending!(poll!(queue));
505 
506     sleep(ms(100)).await;
507 
508     assert!(queue.is_woken());
509 
510     let mut res = vec![];
511 
512     while res.len() < 3 {
513         let entry = assert_ready_some!(poll!(queue));
514         res.push(entry.into_inner());
515         queue.insert_at("foo", now + ms(500));
516     }
517 
518     res.sort_unstable();
519 
520     assert_eq!("1", res[0]);
521     assert_eq!("2", res[1]);
522     assert_eq!("3", res[2]);
523 }
524 
525 #[tokio::test]
reset_later_after_slot_starts()526 async fn reset_later_after_slot_starts() {
527     time::pause();
528 
529     let mut queue = task::spawn(DelayQueue::new());
530 
531     let now = Instant::now();
532 
533     let foo = queue.insert_at("foo", now + ms(100));
534 
535     assert_pending!(poll!(queue));
536 
537     sleep_until(now + Duration::from_millis(80)).await;
538 
539     assert!(!queue.is_woken());
540 
541     // At this point the queue hasn't been polled, so `elapsed` on the wheel
542     // for the queue is still at 0 and hence the 1ms resolution slots cover
543     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
544     // the [64-128) slot.  As the queue knows that the first entry is within
545     // that slot, but doesn't know when, it must wake immediately to advance
546     // the wheel.
547     queue.reset_at(&foo, now + ms(120));
548     assert!(queue.is_woken());
549 
550     assert_pending!(poll!(queue));
551 
552     sleep_until(now + Duration::from_millis(119)).await;
553     assert!(!queue.is_woken());
554 
555     sleep(ms(1)).await;
556     assert!(queue.is_woken());
557 
558     let entry = assert_ready_some!(poll!(queue)).into_inner();
559     assert_eq!(entry, "foo");
560 }
561 
562 #[tokio::test]
reset_inserted_expired()563 async fn reset_inserted_expired() {
564     time::pause();
565 
566     // Instants before the start of the test seem to break in wasm.
567     time::sleep(ms(1000)).await;
568 
569     let mut queue = task::spawn(DelayQueue::new());
570     let now = Instant::now();
571 
572     let key = queue.insert_at("foo", now - ms(100));
573 
574     // this causes the panic described in #2473
575     queue.reset_at(&key, now + ms(100));
576 
577     assert_eq!(1, queue.len());
578 
579     sleep(ms(200)).await;
580 
581     let entry = assert_ready_some!(poll!(queue)).into_inner();
582     assert_eq!(entry, "foo");
583 
584     assert_eq!(queue.len(), 0);
585 }
586 
587 #[tokio::test]
reset_earlier_after_slot_starts()588 async fn reset_earlier_after_slot_starts() {
589     time::pause();
590 
591     let mut queue = task::spawn(DelayQueue::new());
592 
593     let now = Instant::now();
594 
595     let foo = queue.insert_at("foo", now + ms(200));
596 
597     assert_pending!(poll!(queue));
598 
599     sleep_until(now + Duration::from_millis(80)).await;
600 
601     assert!(!queue.is_woken());
602 
603     // At this point the queue hasn't been polled, so `elapsed` on the wheel
604     // for the queue is still at 0 and hence the 1ms resolution slots cover
605     // [0-64).  Resetting the time on the entry to 120 causes it to get put in
606     // the [64-128) slot.  As the queue knows that the first entry is within
607     // that slot, but doesn't know when, it must wake immediately to advance
608     // the wheel.
609     queue.reset_at(&foo, now + ms(120));
610     assert!(queue.is_woken());
611 
612     assert_pending!(poll!(queue));
613 
614     sleep_until(now + Duration::from_millis(119)).await;
615     assert!(!queue.is_woken());
616 
617     sleep(ms(1)).await;
618     assert!(queue.is_woken());
619 
620     let entry = assert_ready_some!(poll!(queue)).into_inner();
621     assert_eq!(entry, "foo");
622 }
623 
624 #[tokio::test]
insert_in_past_after_poll_fires_immediately()625 async fn insert_in_past_after_poll_fires_immediately() {
626     time::pause();
627 
628     let mut queue = task::spawn(DelayQueue::new());
629 
630     let now = Instant::now();
631 
632     queue.insert_at("foo", now + ms(200));
633 
634     assert_pending!(poll!(queue));
635 
636     sleep(ms(80)).await;
637 
638     assert!(!queue.is_woken());
639     queue.insert_at("bar", now + ms(40));
640 
641     assert!(queue.is_woken());
642 
643     let entry = assert_ready_some!(poll!(queue)).into_inner();
644     assert_eq!(entry, "bar");
645 }
646 
647 #[tokio::test]
delay_queue_poll_expired_when_empty()648 async fn delay_queue_poll_expired_when_empty() {
649     let mut delay_queue = task::spawn(DelayQueue::new());
650     let key = delay_queue.insert(0, std::time::Duration::from_secs(10));
651     assert_pending!(poll!(delay_queue));
652 
653     delay_queue.remove(&key);
654     assert!(assert_ready!(poll!(delay_queue)).is_none());
655 }
656 
657 #[tokio::test(start_paused = true)]
compact_expire_empty()658 async fn compact_expire_empty() {
659     let mut queue = task::spawn(DelayQueue::new());
660 
661     let now = Instant::now();
662 
663     queue.insert_at("foo1", now + ms(10));
664     queue.insert_at("foo2", now + ms(10));
665 
666     sleep(ms(10)).await;
667 
668     let mut res = vec![];
669     while res.len() < 2 {
670         let entry = assert_ready_some!(poll!(queue));
671         res.push(entry.into_inner());
672     }
673 
674     queue.compact();
675 
676     assert_eq!(queue.len(), 0);
677     assert_eq!(queue.capacity(), 0);
678 }
679 
680 #[tokio::test(start_paused = true)]
compact_remove_empty()681 async fn compact_remove_empty() {
682     let mut queue = task::spawn(DelayQueue::new());
683 
684     let now = Instant::now();
685 
686     let key1 = queue.insert_at("foo1", now + ms(10));
687     let key2 = queue.insert_at("foo2", now + ms(10));
688 
689     queue.remove(&key1);
690     queue.remove(&key2);
691 
692     queue.compact();
693 
694     assert_eq!(queue.len(), 0);
695     assert_eq!(queue.capacity(), 0);
696 }
697 
698 #[tokio::test(start_paused = true)]
699 // Trigger a re-mapping of keys in the slab due to a `compact` call and
700 // test removal of re-mapped keys
compact_remove_remapped_keys()701 async fn compact_remove_remapped_keys() {
702     let mut queue = task::spawn(DelayQueue::new());
703 
704     let now = Instant::now();
705 
706     queue.insert_at("foo1", now + ms(10));
707     queue.insert_at("foo2", now + ms(10));
708 
709     // should be assigned indices 3 and 4
710     let key3 = queue.insert_at("foo3", now + ms(20));
711     let key4 = queue.insert_at("foo4", now + ms(20));
712 
713     sleep(ms(10)).await;
714 
715     let mut res = vec![];
716     while res.len() < 2 {
717         let entry = assert_ready_some!(poll!(queue));
718         res.push(entry.into_inner());
719     }
720 
721     // items corresponding to `foo3` and `foo4` will be assigned
722     // new indices here
723     queue.compact();
724 
725     queue.insert_at("foo5", now + ms(10));
726 
727     // test removal of re-mapped keys
728     let expired3 = queue.remove(&key3);
729     let expired4 = queue.remove(&key4);
730 
731     assert_eq!(expired3.into_inner(), "foo3");
732     assert_eq!(expired4.into_inner(), "foo4");
733 
734     queue.compact();
735     assert_eq!(queue.len(), 1);
736     assert_eq!(queue.capacity(), 1);
737 }
738 
739 #[tokio::test(start_paused = true)]
compact_change_deadline()740 async fn compact_change_deadline() {
741     let mut queue = task::spawn(DelayQueue::new());
742 
743     let mut now = Instant::now();
744 
745     queue.insert_at("foo1", now + ms(10));
746     queue.insert_at("foo2", now + ms(10));
747 
748     // should be assigned indices 3 and 4
749     queue.insert_at("foo3", now + ms(20));
750     let key4 = queue.insert_at("foo4", now + ms(20));
751 
752     sleep(ms(10)).await;
753 
754     let mut res = vec![];
755     while res.len() < 2 {
756         let entry = assert_ready_some!(poll!(queue));
757         res.push(entry.into_inner());
758     }
759 
760     // items corresponding to `foo3` and `foo4` should be assigned
761     // new indices
762     queue.compact();
763 
764     now = Instant::now();
765 
766     queue.insert_at("foo5", now + ms(10));
767     let key6 = queue.insert_at("foo6", now + ms(10));
768 
769     queue.reset_at(&key4, now + ms(20));
770     queue.reset_at(&key6, now + ms(20));
771 
772     // foo3 and foo5 will expire
773     sleep(ms(10)).await;
774 
775     while res.len() < 4 {
776         let entry = assert_ready_some!(poll!(queue));
777         res.push(entry.into_inner());
778     }
779 
780     sleep(ms(10)).await;
781 
782     while res.len() < 6 {
783         let entry = assert_ready_some!(poll!(queue));
784         res.push(entry.into_inner());
785     }
786 
787     let entry = assert_ready!(poll!(queue));
788     assert!(entry.is_none());
789 }
790 
791 #[tokio::test(start_paused = true)]
item_expiry_greater_than_wheel()792 async fn item_expiry_greater_than_wheel() {
793     // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted.
794     let mut queue = DelayQueue::new();
795     for _ in 0..2 {
796         tokio::time::advance(Duration::from_millis(1 << 35)).await;
797         queue.insert(0, Duration::from_millis(0));
798         queue.next().await;
799     }
800     // This should not panic
801     let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
802         queue.insert(1, Duration::from_millis(1));
803     }));
804     assert!(no_panic.is_ok());
805 }
806 
807 #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
808 #[tokio::test(start_paused = true)]
809 #[cfg(panic = "unwind")]
remove_after_compact()810 async fn remove_after_compact() {
811     let now = Instant::now();
812     let mut queue = DelayQueue::new();
813 
814     let foo_key = queue.insert_at("foo", now + ms(10));
815     queue.insert_at("bar", now + ms(20));
816     queue.remove(&foo_key);
817     queue.compact();
818 
819     let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
820         queue.remove(&foo_key);
821     }));
822     assert!(panic.is_err());
823 }
824 
825 #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
826 #[tokio::test(start_paused = true)]
827 #[cfg(panic = "unwind")]
remove_after_compact_poll()828 async fn remove_after_compact_poll() {
829     let now = Instant::now();
830     let mut queue = task::spawn(DelayQueue::new());
831 
832     let foo_key = queue.insert_at("foo", now + ms(10));
833     queue.insert_at("bar", now + ms(20));
834 
835     sleep(ms(10)).await;
836     assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key);
837 
838     queue.compact();
839 
840     let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
841         queue.remove(&foo_key);
842     }));
843     assert!(panic.is_err());
844 }
845 
846 #[tokio::test(start_paused = true)]
peek()847 async fn peek() {
848     let mut queue = task::spawn(DelayQueue::new());
849 
850     let now = Instant::now();
851 
852     let key = queue.insert_at("foo", now + ms(5));
853     let key2 = queue.insert_at("bar", now);
854     let key3 = queue.insert_at("baz", now + ms(10));
855 
856     assert_eq!(queue.peek(), Some(key2));
857 
858     sleep(ms(6)).await;
859 
860     assert_eq!(queue.peek(), Some(key2));
861 
862     let entry = assert_ready_some!(poll!(queue));
863     assert_eq!(entry.get_ref(), &"bar");
864 
865     assert_eq!(queue.peek(), Some(key));
866 
867     let entry = assert_ready_some!(poll!(queue));
868     assert_eq!(entry.get_ref(), &"foo");
869 
870     assert_eq!(queue.peek(), Some(key3));
871 
872     assert_pending!(poll!(queue));
873 
874     sleep(ms(5)).await;
875 
876     assert_eq!(queue.peek(), Some(key3));
877 
878     let entry = assert_ready_some!(poll!(queue));
879     assert_eq!(entry.get_ref(), &"baz");
880 
881     assert!(queue.peek().is_none());
882 }
883 
884 #[tokio::test(start_paused = true)]
wake_after_remove_last()885 async fn wake_after_remove_last() {
886     let mut queue = task::spawn(DelayQueue::new());
887     let key = queue.insert("foo", ms(1000));
888 
889     assert_pending!(poll!(queue));
890 
891     queue.remove(&key);
892 
893     assert!(queue.is_woken());
894     assert!(assert_ready!(poll!(queue)).is_none());
895 }
896 
ms(n: u64) -> Duration897 fn ms(n: u64) -> Duration {
898     Duration::from_millis(n)
899 }
900