1 // Copyright 2018 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::collections::BTreeMap; 6 use std::mem::drop; 7 use std::sync::Arc; 8 use std::sync::Weak; 9 use std::thread; 10 11 use base::error; 12 use base::warn; 13 use base::AsRawDescriptor; 14 use base::Descriptor; 15 use base::Event; 16 use base::EventType; 17 use base::WaitContext; 18 use sync::Mutex; 19 20 use super::error::Error; 21 use super::error::Result; 22 23 /// A fail handle will do the clean up when we cannot recover from some error. 24 pub trait FailHandle: Send + Sync { 25 /// Fail the code. fail(&self)26 fn fail(&self); 27 /// Returns true if already failed. failed(&self) -> bool28 fn failed(&self) -> bool; 29 } 30 31 impl FailHandle for Option<Arc<dyn FailHandle>> { fail(&self)32 fn fail(&self) { 33 match self { 34 Some(handle) => handle.fail(), 35 None => error!("event loop trying to fail without a fail handle"), 36 } 37 } 38 failed(&self) -> bool39 fn failed(&self) -> bool { 40 match self { 41 Some(handle) => handle.failed(), 42 None => false, 43 } 44 } 45 } 46 47 /// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered, 48 /// event loop will invoke the mapped handler. 49 pub struct EventLoop { 50 fail_handle: Option<Arc<dyn FailHandle>>, 51 poll_ctx: Arc<WaitContext<Descriptor>>, 52 handlers: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>>, 53 stop_evt: Event, 54 } 55 56 /// Interface for event handler. 57 pub trait EventHandler: Send + Sync { on_event(&self) -> anyhow::Result<()>58 fn on_event(&self) -> anyhow::Result<()>; 59 } 60 61 impl EventLoop { 62 /// Start an event loop. An optional fail handle could be passed to the event loop. start( name: String, fail_handle: Option<Arc<dyn FailHandle>>, ) -> Result<(EventLoop, thread::JoinHandle<()>)>63 pub fn start( 64 name: String, 65 fail_handle: Option<Arc<dyn FailHandle>>, 66 ) -> Result<(EventLoop, thread::JoinHandle<()>)> { 67 let (self_stop_evt, stop_evt) = Event::new() 68 .and_then(|e| Ok((e.try_clone()?, e))) 69 .map_err(Error::CreateEvent)?; 70 71 let fd_callbacks: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>> = 72 Arc::new(Mutex::new(BTreeMap::new())); 73 let poll_ctx: WaitContext<Descriptor> = WaitContext::new() 74 .and_then(|pc| { 75 pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor())) 76 .and(Ok(pc)) 77 }) 78 .map_err(Error::CreateWaitContext)?; 79 80 let poll_ctx = Arc::new(poll_ctx); 81 let event_loop = EventLoop { 82 fail_handle: fail_handle.clone(), 83 poll_ctx: poll_ctx.clone(), 84 handlers: fd_callbacks.clone(), 85 stop_evt: self_stop_evt, 86 }; 87 88 let handle = thread::Builder::new() 89 .name(name) 90 .spawn(move || { 91 loop { 92 if fail_handle.failed() { 93 error!("event loop already failed"); 94 return; 95 } 96 let events = match poll_ctx.wait() { 97 Ok(events) => events, 98 Err(e) => { 99 error!("cannot wait on events {:?}", e); 100 fail_handle.fail(); 101 return; 102 } 103 }; 104 for event in &events { 105 let fd = event.token.as_raw_descriptor(); 106 if fd == stop_evt.as_raw_descriptor() { 107 return; 108 } 109 110 let mut locked = fd_callbacks.lock(); 111 let weak_handler = match locked.get(&Descriptor(fd)) { 112 Some(cb) => cb.clone(), 113 None => { 114 warn!("callback for fd {} already removed", fd); 115 continue; 116 } 117 }; 118 119 // If the file descriptor is hung up, remove it after calling the handler 120 // one final time. 121 let mut remove = event.is_hungup; 122 123 if let Some(handler) = weak_handler.upgrade() { 124 // Drop lock before triggering the event. 125 drop(locked); 126 if let Err(e) = handler.on_event() { 127 error!("removing event handler due to error: {:#}", e); 128 remove = true; 129 } 130 locked = fd_callbacks.lock(); 131 } else { 132 // If the handler is already gone, we remove the fd. 133 remove = true; 134 } 135 136 if remove { 137 let _ = poll_ctx.delete(&event.token); 138 let _ = locked.remove(&Descriptor(fd)); 139 } 140 } 141 } 142 }) 143 .map_err(Error::StartThread)?; 144 145 Ok((event_loop, handle)) 146 } 147 148 /// Add a new event to event loop. The event handler will be invoked when `event` happens on 149 /// `descriptor`. 150 /// 151 /// If the same `descriptor` is added multiple times, the old handler will be replaced. 152 /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered, 153 /// the event will be removed. add_event( &self, descriptor: &dyn AsRawDescriptor, event_type: EventType, handler: Weak<dyn EventHandler>, ) -> Result<()>154 pub fn add_event( 155 &self, 156 descriptor: &dyn AsRawDescriptor, 157 event_type: EventType, 158 handler: Weak<dyn EventHandler>, 159 ) -> Result<()> { 160 if self.fail_handle.failed() { 161 return Err(Error::EventLoopAlreadyFailed); 162 } 163 self.handlers 164 .lock() 165 .insert(Descriptor(descriptor.as_raw_descriptor()), handler); 166 self.poll_ctx 167 .add_for_event( 168 descriptor, 169 event_type, 170 Descriptor(descriptor.as_raw_descriptor()), 171 ) 172 .map_err(Error::WaitContextAddDescriptor) 173 } 174 175 /// Removes event for this `descriptor`. This function is safe to call even when the 176 /// `descriptor` is not actively being polled because it's been paused. 177 /// 178 /// EventLoop does not guarantee all events for `descriptor` is handled. remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()>179 pub fn remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> { 180 if self.fail_handle.failed() { 181 return Err(Error::EventLoopAlreadyFailed); 182 } 183 self.poll_ctx 184 .delete(descriptor) 185 .map_err(Error::WaitContextDeleteDescriptor)?; 186 self.handlers 187 .lock() 188 .remove(&Descriptor(descriptor.as_raw_descriptor())); 189 Ok(()) 190 } 191 192 /// Pauses polling on the given `descriptor`. It keeps a reference to the `descriptor` and its 193 /// handler so it can be resumed by calling `resume_event_for_descriptor()`. pause_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()>194 pub fn pause_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> { 195 if self.fail_handle.failed() { 196 return Err(Error::EventLoopAlreadyFailed); 197 } 198 self.poll_ctx 199 .delete(descriptor) 200 .map_err(Error::WaitContextDeleteDescriptor)?; 201 Ok(()) 202 } 203 204 /// Resumes polling on the given `descriptor` with the previously-provided handler. If 205 /// `descriptor` was not paused beforehand, this function does nothing. If `descriptor` does 206 /// not exist in the event loop, it returns an error. 207 /// `event_type` does not need to match the previously registered event type. resume_event_for_descriptor( &self, descriptor: &dyn AsRawDescriptor, event_type: EventType, ) -> Result<()>208 pub fn resume_event_for_descriptor( 209 &self, 210 descriptor: &dyn AsRawDescriptor, 211 event_type: EventType, 212 ) -> Result<()> { 213 let handler = self 214 .handlers 215 .lock() 216 .get(&Descriptor(descriptor.as_raw_descriptor())) 217 .ok_or(Error::EventLoopMissingHandler)? 218 .clone(); 219 self.add_event(descriptor, event_type, handler) 220 } 221 222 /// Stops this event loop asynchronously. Previous events might not be handled. stop(&self)223 pub fn stop(&self) { 224 match self.stop_evt.signal() { 225 Ok(_) => {} 226 Err(_) => { 227 error!("fail to send event loop stop event, it might already stopped"); 228 } 229 } 230 } 231 } 232 233 #[cfg(test)] 234 mod tests { 235 use std::sync::Arc; 236 use std::sync::Condvar; 237 use std::sync::Mutex; 238 239 use base::Event; 240 241 use super::*; 242 243 struct EventLoopTestHandler { 244 val: Mutex<u8>, 245 cvar: Condvar, 246 evt: Event, 247 } 248 249 impl EventHandler for EventLoopTestHandler { on_event(&self) -> anyhow::Result<()>250 fn on_event(&self) -> anyhow::Result<()> { 251 self.evt.wait().unwrap(); 252 *self.val.lock().unwrap() += 1; 253 self.cvar.notify_one(); 254 Ok(()) 255 } 256 } 257 258 #[test] event_loop_test()259 fn event_loop_test() { 260 let (l, j) = EventLoop::start("test".to_string(), None).unwrap(); 261 let (self_evt, evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) { 262 Ok(v) => v, 263 Err(e) => { 264 error!("failed creating Event pair: {:?}", e); 265 return; 266 } 267 }; 268 let h = Arc::new(EventLoopTestHandler { 269 val: Mutex::new(0), 270 cvar: Condvar::new(), 271 evt, 272 }); 273 let t: Arc<dyn EventHandler> = h.clone(); 274 l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t)) 275 .unwrap(); 276 self_evt.signal().unwrap(); 277 { 278 let mut val = h.val.lock().unwrap(); 279 while *val < 1 { 280 val = h.cvar.wait(val).unwrap(); 281 } 282 } 283 l.stop(); 284 j.join().unwrap(); 285 assert_eq!(*(h.val.lock().unwrap()), 1); 286 } 287 } 288