xref: /aosp_15_r20/external/crosvm/cros_async/src/common_executor.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2023 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::future::Future;
6 use std::io::Result;
7 use std::pin::Pin;
8 use std::sync::atomic::AtomicI32;
9 use std::sync::atomic::Ordering;
10 use std::sync::Arc;
11 use std::sync::Weak;
12 use std::task::Context;
13 use std::task::Poll;
14 
15 use async_task::Task;
16 use base::warn;
17 use base::AsRawDescriptor;
18 use base::AsRawDescriptors;
19 use base::RawDescriptor;
20 use futures::task::noop_waker;
21 use pin_utils::pin_mut;
22 use sync::Mutex;
23 
24 use crate::queue::RunnableQueue;
25 use crate::waker::WeakWake;
26 use crate::AsyncError;
27 use crate::AsyncResult;
28 use crate::BlockingPool;
29 use crate::DetachedTasks;
30 use crate::ExecutorTrait;
31 use crate::IntoAsync;
32 use crate::IoSource;
33 use crate::TaskHandle;
34 
35 /// Abstraction for IO backends.
36 pub trait Reactor: Send + Sync + Sized {
new() -> Result<Self>37     fn new() -> Result<Self>;
38 
39     /// Called when the executor is being dropped to allow orderly shutdown (e.g. cancelling IO
40     /// work). The returned future will be run to completion.
41     ///
42     /// Note that, since this is called from `RawExecutor::drop`, there will not be any
43     /// `Arc<Executor>` left, so weak references to the executor will always fail to upgrade at
44     /// this point. Reactors can potentially make use of this fact to keep more IO work from being
45     /// submitted.
on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>46     fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
47 
48     /// Called when an executor run loop starts on a thread.
on_thread_start(&self)49     fn on_thread_start(&self) {}
50 
51     /// Block until an event occurs (e.g. IO work is ready) or until `wake` is called.
52     ///
53     /// As an optimization, `set_processing` should be called immediately after wake up (i.e.
54     /// before any book keeping is done) so that concurrent calls to wakers can safely skip making
55     /// redundant calls to `Reactor::wake`.
wait_for_work(&self, set_processing: impl Fn()) -> Result<()>56     fn wait_for_work(&self, set_processing: impl Fn()) -> Result<()>;
57 
58     /// Wake up any pending `wait_for_work` calls. If there are none pending, then wake up the next
59     /// `wait_for_work` call (necessary to avoid race conditions).
wake(&self)60     fn wake(&self);
61 
62     /// Create an `IoSource` for the backend.
new_source<F: AsRawDescriptor>( &self, ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>63     fn new_source<F: AsRawDescriptor>(
64         &self,
65         ex: &Arc<RawExecutor<Self>>,
66         f: F,
67     ) -> AsyncResult<IoSource<F>>;
68 
wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>69     fn wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>;
70 }
71 
72 // Indicates the executor is either within or about to make a `Reactor::wait_for_work` call. When a
73 // waker sees this value, it will call `Reactor::wake`.
74 const WAITING: i32 = 0x1d5b_c019u32 as i32;
75 
76 // Indicates the executor is processing futures.
77 const PROCESSING: i32 = 0xd474_77bcu32 as i32;
78 
79 // Indicates one or more futures may be ready to make progress (i.e. causes the main loop to return
80 // diretly to PROCESSING instead of WAITING).
81 const WOKEN: i32 = 0x3e4d_3276u32 as i32;
82 
83 pub struct RawExecutor<Re: Reactor + 'static> {
84     pub reactor: Re,
85     queue: RunnableQueue,
86     blocking_pool: BlockingPool,
87     state: AtomicI32,
88     detached_tasks: Mutex<DetachedTasks>,
89 }
90 
91 impl<Re: Reactor> RawExecutor<Re> {
new_with(reactor: Re) -> AsyncResult<Arc<Self>>92     pub fn new_with(reactor: Re) -> AsyncResult<Arc<Self>> {
93         Ok(Arc::new(RawExecutor {
94             reactor,
95             queue: RunnableQueue::new(),
96             blocking_pool: Default::default(),
97             state: AtomicI32::new(PROCESSING),
98             detached_tasks: Mutex::new(DetachedTasks::new()),
99         }))
100     }
101 
new() -> AsyncResult<Arc<Self>>102     pub fn new() -> AsyncResult<Arc<Self>> {
103         Self::new_with(Re::new().map_err(AsyncError::Io)?)
104     }
105 
wake(&self)106     fn wake(&self) {
107         let oldstate = self.state.swap(WOKEN, Ordering::AcqRel);
108         if oldstate == WAITING {
109             self.reactor.wake();
110         }
111     }
112 
run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output>113     fn run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output> {
114         self.reactor.on_thread_start();
115 
116         pin_mut!(done);
117 
118         loop {
119             self.state.store(PROCESSING, Ordering::Release);
120             for runnable in self.queue.iter() {
121                 runnable.run();
122             }
123 
124             if let Ok(mut tasks) = self.detached_tasks.try_lock() {
125                 tasks.poll(cx);
126             }
127 
128             if let Poll::Ready(val) = done.as_mut().poll(cx) {
129                 return Ok(val);
130             }
131 
132             let oldstate = self.state.compare_exchange(
133                 PROCESSING,
134                 WAITING,
135                 Ordering::AcqRel,
136                 Ordering::Acquire,
137             );
138             if let Err(oldstate) = oldstate {
139                 debug_assert_eq!(oldstate, WOKEN);
140                 // One or more futures have become runnable.
141                 continue;
142             }
143 
144             self.reactor
145                 .wait_for_work(|| self.state.store(PROCESSING, Ordering::Release))
146                 .map_err(AsyncError::Io)?;
147         }
148     }
149 }
150 
151 impl<Re: Reactor + 'static> ExecutorTrait for Arc<RawExecutor<Re>> {
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>152     fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
153         self.reactor.new_source(self, f)
154     }
155 
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,156     fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
157     where
158         F: Future + Send + 'static,
159         F::Output: Send + 'static,
160     {
161         let raw = Arc::downgrade(self);
162         let schedule = move |runnable| {
163             if let Some(r) = raw.upgrade() {
164                 r.queue.push_back(runnable);
165                 r.wake();
166             }
167         };
168         let (runnable, task) = async_task::spawn(f, schedule);
169         runnable.schedule();
170         Re::wrap_task_handle(RawTaskHandle {
171             task,
172             raw: Arc::downgrade(self),
173         })
174     }
175 
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,176     fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
177     where
178         F: Future + 'static,
179         F::Output: 'static,
180     {
181         let raw = Arc::downgrade(self);
182         let schedule = move |runnable| {
183             if let Some(r) = raw.upgrade() {
184                 r.queue.push_back(runnable);
185                 r.wake();
186             }
187         };
188         let (runnable, task) = async_task::spawn_local(f, schedule);
189         runnable.schedule();
190         Re::wrap_task_handle(RawTaskHandle {
191             task,
192             raw: Arc::downgrade(self),
193         })
194     }
195 
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,196     fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
197     where
198         F: FnOnce() -> R + Send + 'static,
199         R: Send + 'static,
200     {
201         self.spawn(self.blocking_pool.spawn(f))
202     }
203 
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>204     fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
205         let waker = super::waker::new_waker(Arc::downgrade(self));
206         let mut ctx = Context::from_waker(&waker);
207 
208         self.run_internal(&mut ctx, f)
209     }
210 }
211 
212 impl<Re: Reactor + AsRawDescriptors> AsRawDescriptors for RawExecutor<Re> {
as_raw_descriptors(&self) -> Vec<RawDescriptor>213     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
214         self.reactor.as_raw_descriptors()
215     }
216 }
217 
218 impl<Re: Reactor> WeakWake for RawExecutor<Re> {
wake_by_ref(weak_self: &Weak<Self>)219     fn wake_by_ref(weak_self: &Weak<Self>) {
220         if let Some(arc_self) = weak_self.upgrade() {
221             RawExecutor::wake(&arc_self);
222         }
223     }
224 }
225 
226 impl<Re: Reactor> Drop for RawExecutor<Re> {
drop(&mut self)227     fn drop(&mut self) {
228         let final_future = self.reactor.on_executor_drop();
229 
230         let waker = noop_waker();
231         let mut cx = Context::from_waker(&waker);
232         if let Err(e) = self.run_internal(&mut cx, final_future) {
233             warn!("Failed to drive RawExecutor to completion: {}", e);
234         }
235     }
236 }
237 
238 pub struct RawTaskHandle<Re: Reactor + 'static, R> {
239     task: Task<R>,
240     raw: Weak<RawExecutor<Re>>,
241 }
242 
243 impl<Re: Reactor, R: Send + 'static> RawTaskHandle<Re, R> {
detach(self)244     pub fn detach(self) {
245         if let Some(raw) = self.raw.upgrade() {
246             raw.detached_tasks.lock().push(self.task);
247         }
248     }
249 
cancel(self) -> Option<R>250     pub async fn cancel(self) -> Option<R> {
251         self.task.cancel().await
252     }
253 }
254 
255 impl<Re: Reactor, R: 'static> Future for RawTaskHandle<Re, R> {
256     type Output = R;
257 
poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context, ) -> std::task::Poll<Self::Output>258     fn poll(
259         mut self: std::pin::Pin<&mut Self>,
260         cx: &mut std::task::Context,
261     ) -> std::task::Poll<Self::Output> {
262         Pin::new(&mut self.task).poll(cx)
263     }
264 }
265