xref: /aosp_15_r20/external/crosvm/devices/src/utils/event_loop.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2018 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::BTreeMap;
6 use std::mem::drop;
7 use std::sync::Arc;
8 use std::sync::Weak;
9 use std::thread;
10 
11 use base::error;
12 use base::warn;
13 use base::AsRawDescriptor;
14 use base::Descriptor;
15 use base::Event;
16 use base::EventType;
17 use base::WaitContext;
18 use sync::Mutex;
19 
20 use super::error::Error;
21 use super::error::Result;
22 
23 /// A fail handle will do the clean up when we cannot recover from some error.
24 pub trait FailHandle: Send + Sync {
25     /// Fail the code.
fail(&self)26     fn fail(&self);
27     /// Returns true if already failed.
failed(&self) -> bool28     fn failed(&self) -> bool;
29 }
30 
31 impl FailHandle for Option<Arc<dyn FailHandle>> {
fail(&self)32     fn fail(&self) {
33         match self {
34             Some(handle) => handle.fail(),
35             None => error!("event loop trying to fail without a fail handle"),
36         }
37     }
38 
failed(&self) -> bool39     fn failed(&self) -> bool {
40         match self {
41             Some(handle) => handle.failed(),
42             None => false,
43         }
44     }
45 }
46 
47 /// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered,
48 /// event loop will invoke the mapped handler.
49 pub struct EventLoop {
50     fail_handle: Option<Arc<dyn FailHandle>>,
51     poll_ctx: Arc<WaitContext<Descriptor>>,
52     handlers: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>>,
53     stop_evt: Event,
54 }
55 
56 /// Interface for event handler.
57 pub trait EventHandler: Send + Sync {
on_event(&self) -> anyhow::Result<()>58     fn on_event(&self) -> anyhow::Result<()>;
59 }
60 
61 impl EventLoop {
62     /// Start an event loop. An optional fail handle could be passed to the event loop.
start( name: String, fail_handle: Option<Arc<dyn FailHandle>>, ) -> Result<(EventLoop, thread::JoinHandle<()>)>63     pub fn start(
64         name: String,
65         fail_handle: Option<Arc<dyn FailHandle>>,
66     ) -> Result<(EventLoop, thread::JoinHandle<()>)> {
67         let (self_stop_evt, stop_evt) = Event::new()
68             .and_then(|e| Ok((e.try_clone()?, e)))
69             .map_err(Error::CreateEvent)?;
70 
71         let fd_callbacks: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>> =
72             Arc::new(Mutex::new(BTreeMap::new()));
73         let poll_ctx: WaitContext<Descriptor> = WaitContext::new()
74             .and_then(|pc| {
75                 pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor()))
76                     .and(Ok(pc))
77             })
78             .map_err(Error::CreateWaitContext)?;
79 
80         let poll_ctx = Arc::new(poll_ctx);
81         let event_loop = EventLoop {
82             fail_handle: fail_handle.clone(),
83             poll_ctx: poll_ctx.clone(),
84             handlers: fd_callbacks.clone(),
85             stop_evt: self_stop_evt,
86         };
87 
88         let handle = thread::Builder::new()
89             .name(name)
90             .spawn(move || {
91                 loop {
92                     if fail_handle.failed() {
93                         error!("event loop already failed");
94                         return;
95                     }
96                     let events = match poll_ctx.wait() {
97                         Ok(events) => events,
98                         Err(e) => {
99                             error!("cannot wait on events {:?}", e);
100                             fail_handle.fail();
101                             return;
102                         }
103                     };
104                     for event in &events {
105                         let fd = event.token.as_raw_descriptor();
106                         if fd == stop_evt.as_raw_descriptor() {
107                             return;
108                         }
109 
110                         let mut locked = fd_callbacks.lock();
111                         let weak_handler = match locked.get(&Descriptor(fd)) {
112                             Some(cb) => cb.clone(),
113                             None => {
114                                 warn!("callback for fd {} already removed", fd);
115                                 continue;
116                             }
117                         };
118 
119                         // If the file descriptor is hung up, remove it after calling the handler
120                         // one final time.
121                         let mut remove = event.is_hungup;
122 
123                         if let Some(handler) = weak_handler.upgrade() {
124                             // Drop lock before triggering the event.
125                             drop(locked);
126                             if let Err(e) = handler.on_event() {
127                                 error!("removing event handler due to error: {:#}", e);
128                                 remove = true;
129                             }
130                             locked = fd_callbacks.lock();
131                         } else {
132                             // If the handler is already gone, we remove the fd.
133                             remove = true;
134                         }
135 
136                         if remove {
137                             let _ = poll_ctx.delete(&event.token);
138                             let _ = locked.remove(&Descriptor(fd));
139                         }
140                     }
141                 }
142             })
143             .map_err(Error::StartThread)?;
144 
145         Ok((event_loop, handle))
146     }
147 
148     /// Add a new event to event loop. The event handler will be invoked when `event` happens on
149     /// `descriptor`.
150     ///
151     /// If the same `descriptor` is added multiple times, the old handler will be replaced.
152     /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered,
153     /// the event will be removed.
add_event( &self, descriptor: &dyn AsRawDescriptor, event_type: EventType, handler: Weak<dyn EventHandler>, ) -> Result<()>154     pub fn add_event(
155         &self,
156         descriptor: &dyn AsRawDescriptor,
157         event_type: EventType,
158         handler: Weak<dyn EventHandler>,
159     ) -> Result<()> {
160         if self.fail_handle.failed() {
161             return Err(Error::EventLoopAlreadyFailed);
162         }
163         self.handlers
164             .lock()
165             .insert(Descriptor(descriptor.as_raw_descriptor()), handler);
166         self.poll_ctx
167             .add_for_event(
168                 descriptor,
169                 event_type,
170                 Descriptor(descriptor.as_raw_descriptor()),
171             )
172             .map_err(Error::WaitContextAddDescriptor)
173     }
174 
175     /// Removes event for this `descriptor`. This function is safe to call even when the
176     /// `descriptor` is not actively being polled because it's been paused.
177     ///
178     /// EventLoop does not guarantee all events for `descriptor` is handled.
remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()>179     pub fn remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
180         if self.fail_handle.failed() {
181             return Err(Error::EventLoopAlreadyFailed);
182         }
183         self.poll_ctx
184             .delete(descriptor)
185             .map_err(Error::WaitContextDeleteDescriptor)?;
186         self.handlers
187             .lock()
188             .remove(&Descriptor(descriptor.as_raw_descriptor()));
189         Ok(())
190     }
191 
192     /// Pauses polling on the given `descriptor`. It keeps a reference to the `descriptor` and its
193     /// handler so it can be resumed by calling `resume_event_for_descriptor()`.
pause_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()>194     pub fn pause_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
195         if self.fail_handle.failed() {
196             return Err(Error::EventLoopAlreadyFailed);
197         }
198         self.poll_ctx
199             .delete(descriptor)
200             .map_err(Error::WaitContextDeleteDescriptor)?;
201         Ok(())
202     }
203 
204     /// Resumes polling on the given `descriptor` with the previously-provided handler. If
205     /// `descriptor` was not paused beforehand, this function does nothing. If `descriptor` does
206     /// not exist in the event loop, it returns an error.
207     /// `event_type` does not need to match the previously registered event type.
resume_event_for_descriptor( &self, descriptor: &dyn AsRawDescriptor, event_type: EventType, ) -> Result<()>208     pub fn resume_event_for_descriptor(
209         &self,
210         descriptor: &dyn AsRawDescriptor,
211         event_type: EventType,
212     ) -> Result<()> {
213         let handler = self
214             .handlers
215             .lock()
216             .get(&Descriptor(descriptor.as_raw_descriptor()))
217             .ok_or(Error::EventLoopMissingHandler)?
218             .clone();
219         self.add_event(descriptor, event_type, handler)
220     }
221 
222     /// Stops this event loop asynchronously. Previous events might not be handled.
stop(&self)223     pub fn stop(&self) {
224         match self.stop_evt.signal() {
225             Ok(_) => {}
226             Err(_) => {
227                 error!("fail to send event loop stop event, it might already stopped");
228             }
229         }
230     }
231 }
232 
233 #[cfg(test)]
234 mod tests {
235     use std::sync::Arc;
236     use std::sync::Condvar;
237     use std::sync::Mutex;
238 
239     use base::Event;
240 
241     use super::*;
242 
243     struct EventLoopTestHandler {
244         val: Mutex<u8>,
245         cvar: Condvar,
246         evt: Event,
247     }
248 
249     impl EventHandler for EventLoopTestHandler {
on_event(&self) -> anyhow::Result<()>250         fn on_event(&self) -> anyhow::Result<()> {
251             self.evt.wait().unwrap();
252             *self.val.lock().unwrap() += 1;
253             self.cvar.notify_one();
254             Ok(())
255         }
256     }
257 
258     #[test]
event_loop_test()259     fn event_loop_test() {
260         let (l, j) = EventLoop::start("test".to_string(), None).unwrap();
261         let (self_evt, evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
262             Ok(v) => v,
263             Err(e) => {
264                 error!("failed creating Event pair: {:?}", e);
265                 return;
266             }
267         };
268         let h = Arc::new(EventLoopTestHandler {
269             val: Mutex::new(0),
270             cvar: Condvar::new(),
271             evt,
272         });
273         let t: Arc<dyn EventHandler> = h.clone();
274         l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t))
275             .unwrap();
276         self_evt.signal().unwrap();
277         {
278             let mut val = h.val.lock().unwrap();
279             while *val < 1 {
280                 val = h.cvar.wait(val).unwrap();
281             }
282         }
283         l.stop();
284         j.join().unwrap();
285         assert_eq!(*(h.val.lock().unwrap()), 1);
286     }
287 }
288