1 //! Channel that delivers a message at a certain moment in time. 2 //! 3 //! Messages cannot be sent into this kind of channel; they are materialized on demand. 4 5 use std::sync::atomic::{AtomicBool, Ordering}; 6 use std::thread; 7 use std::time::Instant; 8 9 use crate::context::Context; 10 use crate::err::{RecvTimeoutError, TryRecvError}; 11 use crate::select::{Operation, SelectHandle, Token}; 12 use crate::utils; 13 14 /// Result of a receive operation. 15 pub(crate) type AtToken = Option<Instant>; 16 17 /// Channel that delivers a message at a certain moment in time 18 pub(crate) struct Channel { 19 /// The instant at which the message will be delivered. 20 delivery_time: Instant, 21 22 /// `true` if the message has been received. 23 received: AtomicBool, 24 } 25 26 impl Channel { 27 /// Creates a channel that delivers a message at a certain instant in time. 28 #[inline] new_deadline(when: Instant) -> Self29 pub(crate) fn new_deadline(when: Instant) -> Self { 30 Channel { 31 delivery_time: when, 32 received: AtomicBool::new(false), 33 } 34 } 35 36 /// Attempts to receive a message without blocking. 37 #[inline] try_recv(&self) -> Result<Instant, TryRecvError>38 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { 39 // We use relaxed ordering because this is just an optional optimistic check. 40 if self.received.load(Ordering::Relaxed) { 41 // The message has already been received. 42 return Err(TryRecvError::Empty); 43 } 44 45 if Instant::now() < self.delivery_time { 46 // The message was not delivered yet. 47 return Err(TryRecvError::Empty); 48 } 49 50 // Try receiving the message if it is still available. 51 if !self.received.swap(true, Ordering::SeqCst) { 52 // Success! Return delivery time as the message. 53 Ok(self.delivery_time) 54 } else { 55 // The message was already received. 56 Err(TryRecvError::Empty) 57 } 58 } 59 60 /// Receives a message from the channel. 61 #[inline] recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>62 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { 63 // We use relaxed ordering because this is just an optional optimistic check. 64 if self.received.load(Ordering::Relaxed) { 65 // The message has already been received. 66 utils::sleep_until(deadline); 67 return Err(RecvTimeoutError::Timeout); 68 } 69 70 // Wait until the message is received or the deadline is reached. 71 loop { 72 let now = Instant::now(); 73 74 let deadline = match deadline { 75 // Check if we can receive the next message. 76 _ if now >= self.delivery_time => break, 77 // Check if the timeout deadline has been reached. 78 Some(d) if now >= d => return Err(RecvTimeoutError::Timeout), 79 80 // Sleep until one of the above happens 81 Some(d) if d < self.delivery_time => d, 82 _ => self.delivery_time, 83 }; 84 85 thread::sleep(deadline - now); 86 } 87 88 // Try receiving the message if it is still available. 89 if !self.received.swap(true, Ordering::SeqCst) { 90 // Success! Return the message, which is the instant at which it was delivered. 91 Ok(self.delivery_time) 92 } else { 93 // The message was already received. Block forever. 94 utils::sleep_until(None); 95 unreachable!() 96 } 97 } 98 99 /// Reads a message from the channel. 100 #[inline] read(&self, token: &mut Token) -> Result<Instant, ()>101 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { 102 token.at.ok_or(()) 103 } 104 105 /// Returns `true` if the channel is empty. 106 #[inline] is_empty(&self) -> bool107 pub(crate) fn is_empty(&self) -> bool { 108 // We use relaxed ordering because this is just an optional optimistic check. 109 if self.received.load(Ordering::Relaxed) { 110 return true; 111 } 112 113 // If the delivery time hasn't been reached yet, the channel is empty. 114 if Instant::now() < self.delivery_time { 115 return true; 116 } 117 118 // The delivery time has been reached. The channel is empty only if the message has already 119 // been received. 120 self.received.load(Ordering::SeqCst) 121 } 122 123 /// Returns `true` if the channel is full. 124 #[inline] is_full(&self) -> bool125 pub(crate) fn is_full(&self) -> bool { 126 !self.is_empty() 127 } 128 129 /// Returns the number of messages in the channel. 130 #[inline] len(&self) -> usize131 pub(crate) fn len(&self) -> usize { 132 if self.is_empty() { 133 0 134 } else { 135 1 136 } 137 } 138 139 /// Returns the capacity of the channel. 140 #[inline] capacity(&self) -> Option<usize>141 pub(crate) fn capacity(&self) -> Option<usize> { 142 Some(1) 143 } 144 } 145 146 impl SelectHandle for Channel { 147 #[inline] try_select(&self, token: &mut Token) -> bool148 fn try_select(&self, token: &mut Token) -> bool { 149 match self.try_recv() { 150 Ok(msg) => { 151 token.at = Some(msg); 152 true 153 } 154 Err(TryRecvError::Disconnected) => { 155 token.at = None; 156 true 157 } 158 Err(TryRecvError::Empty) => false, 159 } 160 } 161 162 #[inline] deadline(&self) -> Option<Instant>163 fn deadline(&self) -> Option<Instant> { 164 // We use relaxed ordering because this is just an optional optimistic check. 165 if self.received.load(Ordering::Relaxed) { 166 None 167 } else { 168 Some(self.delivery_time) 169 } 170 } 171 172 #[inline] register(&self, _oper: Operation, _cx: &Context) -> bool173 fn register(&self, _oper: Operation, _cx: &Context) -> bool { 174 self.is_ready() 175 } 176 177 #[inline] unregister(&self, _oper: Operation)178 fn unregister(&self, _oper: Operation) {} 179 180 #[inline] accept(&self, token: &mut Token, _cx: &Context) -> bool181 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 182 self.try_select(token) 183 } 184 185 #[inline] is_ready(&self) -> bool186 fn is_ready(&self) -> bool { 187 !self.is_empty() 188 } 189 190 #[inline] watch(&self, _oper: Operation, _cx: &Context) -> bool191 fn watch(&self, _oper: Operation, _cx: &Context) -> bool { 192 self.is_ready() 193 } 194 195 #[inline] unwatch(&self, _oper: Operation)196 fn unwatch(&self, _oper: Operation) {} 197 } 198