1 //! Channel that delivers a message at a certain moment in time.
2 //!
3 //! Messages cannot be sent into this kind of channel; they are materialized on demand.
4 
5 use std::sync::atomic::{AtomicBool, Ordering};
6 use std::thread;
7 use std::time::Instant;
8 
9 use crate::context::Context;
10 use crate::err::{RecvTimeoutError, TryRecvError};
11 use crate::select::{Operation, SelectHandle, Token};
12 use crate::utils;
13 
14 /// Result of a receive operation.
15 pub(crate) type AtToken = Option<Instant>;
16 
17 /// Channel that delivers a message at a certain moment in time
18 pub(crate) struct Channel {
19     /// The instant at which the message will be delivered.
20     delivery_time: Instant,
21 
22     /// `true` if the message has been received.
23     received: AtomicBool,
24 }
25 
26 impl Channel {
27     /// Creates a channel that delivers a message at a certain instant in time.
28     #[inline]
new_deadline(when: Instant) -> Self29     pub(crate) fn new_deadline(when: Instant) -> Self {
30         Channel {
31             delivery_time: when,
32             received: AtomicBool::new(false),
33         }
34     }
35 
36     /// Attempts to receive a message without blocking.
37     #[inline]
try_recv(&self) -> Result<Instant, TryRecvError>38     pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39         // We use relaxed ordering because this is just an optional optimistic check.
40         if self.received.load(Ordering::Relaxed) {
41             // The message has already been received.
42             return Err(TryRecvError::Empty);
43         }
44 
45         if Instant::now() < self.delivery_time {
46             // The message was not delivered yet.
47             return Err(TryRecvError::Empty);
48         }
49 
50         // Try receiving the message if it is still available.
51         if !self.received.swap(true, Ordering::SeqCst) {
52             // Success! Return delivery time as the message.
53             Ok(self.delivery_time)
54         } else {
55             // The message was already received.
56             Err(TryRecvError::Empty)
57         }
58     }
59 
60     /// Receives a message from the channel.
61     #[inline]
recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>62     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
63         // We use relaxed ordering because this is just an optional optimistic check.
64         if self.received.load(Ordering::Relaxed) {
65             // The message has already been received.
66             utils::sleep_until(deadline);
67             return Err(RecvTimeoutError::Timeout);
68         }
69 
70         // Wait until the message is received or the deadline is reached.
71         loop {
72             let now = Instant::now();
73 
74             let deadline = match deadline {
75                 // Check if we can receive the next message.
76                 _ if now >= self.delivery_time => break,
77                 // Check if the timeout deadline has been reached.
78                 Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
79 
80                 // Sleep until one of the above happens
81                 Some(d) if d < self.delivery_time => d,
82                 _ => self.delivery_time,
83             };
84 
85             thread::sleep(deadline - now);
86         }
87 
88         // Try receiving the message if it is still available.
89         if !self.received.swap(true, Ordering::SeqCst) {
90             // Success! Return the message, which is the instant at which it was delivered.
91             Ok(self.delivery_time)
92         } else {
93             // The message was already received. Block forever.
94             utils::sleep_until(None);
95             unreachable!()
96         }
97     }
98 
99     /// Reads a message from the channel.
100     #[inline]
read(&self, token: &mut Token) -> Result<Instant, ()>101     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
102         token.at.ok_or(())
103     }
104 
105     /// Returns `true` if the channel is empty.
106     #[inline]
is_empty(&self) -> bool107     pub(crate) fn is_empty(&self) -> bool {
108         // We use relaxed ordering because this is just an optional optimistic check.
109         if self.received.load(Ordering::Relaxed) {
110             return true;
111         }
112 
113         // If the delivery time hasn't been reached yet, the channel is empty.
114         if Instant::now() < self.delivery_time {
115             return true;
116         }
117 
118         // The delivery time has been reached. The channel is empty only if the message has already
119         // been received.
120         self.received.load(Ordering::SeqCst)
121     }
122 
123     /// Returns `true` if the channel is full.
124     #[inline]
is_full(&self) -> bool125     pub(crate) fn is_full(&self) -> bool {
126         !self.is_empty()
127     }
128 
129     /// Returns the number of messages in the channel.
130     #[inline]
len(&self) -> usize131     pub(crate) fn len(&self) -> usize {
132         if self.is_empty() {
133             0
134         } else {
135             1
136         }
137     }
138 
139     /// Returns the capacity of the channel.
140     #[inline]
capacity(&self) -> Option<usize>141     pub(crate) fn capacity(&self) -> Option<usize> {
142         Some(1)
143     }
144 }
145 
146 impl SelectHandle for Channel {
147     #[inline]
try_select(&self, token: &mut Token) -> bool148     fn try_select(&self, token: &mut Token) -> bool {
149         match self.try_recv() {
150             Ok(msg) => {
151                 token.at = Some(msg);
152                 true
153             }
154             Err(TryRecvError::Disconnected) => {
155                 token.at = None;
156                 true
157             }
158             Err(TryRecvError::Empty) => false,
159         }
160     }
161 
162     #[inline]
deadline(&self) -> Option<Instant>163     fn deadline(&self) -> Option<Instant> {
164         // We use relaxed ordering because this is just an optional optimistic check.
165         if self.received.load(Ordering::Relaxed) {
166             None
167         } else {
168             Some(self.delivery_time)
169         }
170     }
171 
172     #[inline]
register(&self, _oper: Operation, _cx: &Context) -> bool173     fn register(&self, _oper: Operation, _cx: &Context) -> bool {
174         self.is_ready()
175     }
176 
177     #[inline]
unregister(&self, _oper: Operation)178     fn unregister(&self, _oper: Operation) {}
179 
180     #[inline]
accept(&self, token: &mut Token, _cx: &Context) -> bool181     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
182         self.try_select(token)
183     }
184 
185     #[inline]
is_ready(&self) -> bool186     fn is_ready(&self) -> bool {
187         !self.is_empty()
188     }
189 
190     #[inline]
watch(&self, _oper: Operation, _cx: &Context) -> bool191     fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
192         self.is_ready()
193     }
194 
195     #[inline]
unwatch(&self, _oper: Operation)196     fn unwatch(&self, _oper: Operation) {}
197 }
198