1 //! Channel that delivers messages periodically.
2 //!
3 //! Messages cannot be sent into this kind of channel; they are materialized on demand.
4 
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_utils::atomic::AtomicCell;
9 
10 use crate::context::Context;
11 use crate::err::{RecvTimeoutError, TryRecvError};
12 use crate::select::{Operation, SelectHandle, Token};
13 
14 /// Result of a receive operation.
15 pub(crate) type TickToken = Option<Instant>;
16 
17 /// Channel that delivers messages periodically.
18 pub(crate) struct Channel {
19     /// The instant at which the next message will be delivered.
20     delivery_time: AtomicCell<Instant>,
21 
22     /// The time interval in which messages get delivered.
23     duration: Duration,
24 }
25 
26 impl Channel {
27     /// Creates a channel that delivers messages periodically.
28     #[inline]
new(delivery_time: Instant, dur: Duration) -> Self29     pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self {
30         Channel {
31             delivery_time: AtomicCell::new(delivery_time),
32             duration: dur,
33         }
34     }
35 
36     /// Attempts to receive a message without blocking.
37     #[inline]
try_recv(&self) -> Result<Instant, TryRecvError>38     pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39         loop {
40             let now = Instant::now();
41             let delivery_time = self.delivery_time.load();
42 
43             if now < delivery_time {
44                 return Err(TryRecvError::Empty);
45             }
46 
47             if self
48                 .delivery_time
49                 .compare_exchange(delivery_time, now + self.duration)
50                 .is_ok()
51             {
52                 return Ok(delivery_time);
53             }
54         }
55     }
56 
57     /// Receives a message from the channel.
58     #[inline]
recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>59     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60         loop {
61             let delivery_time = self.delivery_time.load();
62             let now = Instant::now();
63 
64             if let Some(d) = deadline {
65                 if d < delivery_time {
66                     if now < d {
67                         thread::sleep(d - now);
68                     }
69                     return Err(RecvTimeoutError::Timeout);
70                 }
71             }
72 
73             if self
74                 .delivery_time
75                 .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76                 .is_ok()
77             {
78                 if now < delivery_time {
79                     thread::sleep(delivery_time - now);
80                 }
81                 return Ok(delivery_time);
82             }
83         }
84     }
85 
86     /// Reads a message from the channel.
87     #[inline]
read(&self, token: &mut Token) -> Result<Instant, ()>88     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89         token.tick.ok_or(())
90     }
91 
92     /// Returns `true` if the channel is empty.
93     #[inline]
is_empty(&self) -> bool94     pub(crate) fn is_empty(&self) -> bool {
95         Instant::now() < self.delivery_time.load()
96     }
97 
98     /// Returns `true` if the channel is full.
99     #[inline]
is_full(&self) -> bool100     pub(crate) fn is_full(&self) -> bool {
101         !self.is_empty()
102     }
103 
104     /// Returns the number of messages in the channel.
105     #[inline]
len(&self) -> usize106     pub(crate) fn len(&self) -> usize {
107         if self.is_empty() {
108             0
109         } else {
110             1
111         }
112     }
113 
114     /// Returns the capacity of the channel.
115     #[inline]
capacity(&self) -> Option<usize>116     pub(crate) fn capacity(&self) -> Option<usize> {
117         Some(1)
118     }
119 }
120 
121 impl SelectHandle for Channel {
122     #[inline]
try_select(&self, token: &mut Token) -> bool123     fn try_select(&self, token: &mut Token) -> bool {
124         match self.try_recv() {
125             Ok(msg) => {
126                 token.tick = Some(msg);
127                 true
128             }
129             Err(TryRecvError::Disconnected) => {
130                 token.tick = None;
131                 true
132             }
133             Err(TryRecvError::Empty) => false,
134         }
135     }
136 
137     #[inline]
deadline(&self) -> Option<Instant>138     fn deadline(&self) -> Option<Instant> {
139         Some(self.delivery_time.load())
140     }
141 
142     #[inline]
register(&self, _oper: Operation, _cx: &Context) -> bool143     fn register(&self, _oper: Operation, _cx: &Context) -> bool {
144         self.is_ready()
145     }
146 
147     #[inline]
unregister(&self, _oper: Operation)148     fn unregister(&self, _oper: Operation) {}
149 
150     #[inline]
accept(&self, token: &mut Token, _cx: &Context) -> bool151     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
152         self.try_select(token)
153     }
154 
155     #[inline]
is_ready(&self) -> bool156     fn is_ready(&self) -> bool {
157         !self.is_empty()
158     }
159 
160     #[inline]
watch(&self, _oper: Operation, _cx: &Context) -> bool161     fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
162         self.is_ready()
163     }
164 
165     #[inline]
unwatch(&self, _oper: Operation)166     fn unwatch(&self, _oper: Operation) {}
167 }
168