1 //! Channel that delivers messages periodically. 2 //! 3 //! Messages cannot be sent into this kind of channel; they are materialized on demand. 4 5 use std::thread; 6 use std::time::{Duration, Instant}; 7 8 use crossbeam_utils::atomic::AtomicCell; 9 10 use crate::context::Context; 11 use crate::err::{RecvTimeoutError, TryRecvError}; 12 use crate::select::{Operation, SelectHandle, Token}; 13 14 /// Result of a receive operation. 15 pub(crate) type TickToken = Option<Instant>; 16 17 /// Channel that delivers messages periodically. 18 pub(crate) struct Channel { 19 /// The instant at which the next message will be delivered. 20 delivery_time: AtomicCell<Instant>, 21 22 /// The time interval in which messages get delivered. 23 duration: Duration, 24 } 25 26 impl Channel { 27 /// Creates a channel that delivers messages periodically. 28 #[inline] new(delivery_time: Instant, dur: Duration) -> Self29 pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self { 30 Channel { 31 delivery_time: AtomicCell::new(delivery_time), 32 duration: dur, 33 } 34 } 35 36 /// Attempts to receive a message without blocking. 37 #[inline] try_recv(&self) -> Result<Instant, TryRecvError>38 pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { 39 loop { 40 let now = Instant::now(); 41 let delivery_time = self.delivery_time.load(); 42 43 if now < delivery_time { 44 return Err(TryRecvError::Empty); 45 } 46 47 if self 48 .delivery_time 49 .compare_exchange(delivery_time, now + self.duration) 50 .is_ok() 51 { 52 return Ok(delivery_time); 53 } 54 } 55 } 56 57 /// Receives a message from the channel. 58 #[inline] recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError>59 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { 60 loop { 61 let delivery_time = self.delivery_time.load(); 62 let now = Instant::now(); 63 64 if let Some(d) = deadline { 65 if d < delivery_time { 66 if now < d { 67 thread::sleep(d - now); 68 } 69 return Err(RecvTimeoutError::Timeout); 70 } 71 } 72 73 if self 74 .delivery_time 75 .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) 76 .is_ok() 77 { 78 if now < delivery_time { 79 thread::sleep(delivery_time - now); 80 } 81 return Ok(delivery_time); 82 } 83 } 84 } 85 86 /// Reads a message from the channel. 87 #[inline] read(&self, token: &mut Token) -> Result<Instant, ()>88 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { 89 token.tick.ok_or(()) 90 } 91 92 /// Returns `true` if the channel is empty. 93 #[inline] is_empty(&self) -> bool94 pub(crate) fn is_empty(&self) -> bool { 95 Instant::now() < self.delivery_time.load() 96 } 97 98 /// Returns `true` if the channel is full. 99 #[inline] is_full(&self) -> bool100 pub(crate) fn is_full(&self) -> bool { 101 !self.is_empty() 102 } 103 104 /// Returns the number of messages in the channel. 105 #[inline] len(&self) -> usize106 pub(crate) fn len(&self) -> usize { 107 if self.is_empty() { 108 0 109 } else { 110 1 111 } 112 } 113 114 /// Returns the capacity of the channel. 115 #[inline] capacity(&self) -> Option<usize>116 pub(crate) fn capacity(&self) -> Option<usize> { 117 Some(1) 118 } 119 } 120 121 impl SelectHandle for Channel { 122 #[inline] try_select(&self, token: &mut Token) -> bool123 fn try_select(&self, token: &mut Token) -> bool { 124 match self.try_recv() { 125 Ok(msg) => { 126 token.tick = Some(msg); 127 true 128 } 129 Err(TryRecvError::Disconnected) => { 130 token.tick = None; 131 true 132 } 133 Err(TryRecvError::Empty) => false, 134 } 135 } 136 137 #[inline] deadline(&self) -> Option<Instant>138 fn deadline(&self) -> Option<Instant> { 139 Some(self.delivery_time.load()) 140 } 141 142 #[inline] register(&self, _oper: Operation, _cx: &Context) -> bool143 fn register(&self, _oper: Operation, _cx: &Context) -> bool { 144 self.is_ready() 145 } 146 147 #[inline] unregister(&self, _oper: Operation)148 fn unregister(&self, _oper: Operation) {} 149 150 #[inline] accept(&self, token: &mut Token, _cx: &Context) -> bool151 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 152 self.try_select(token) 153 } 154 155 #[inline] is_ready(&self) -> bool156 fn is_ready(&self) -> bool { 157 !self.is_empty() 158 } 159 160 #[inline] watch(&self, _oper: Operation, _cx: &Context) -> bool161 fn watch(&self, _oper: Operation, _cx: &Context) -> bool { 162 self.is_ready() 163 } 164 165 #[inline] unwatch(&self, _oper: Operation)166 fn unwatch(&self, _oper: Operation) {} 167 } 168