xref: /aosp_15_r20/external/crosvm/cros_async/src/executor.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::future::Future;
6 use std::pin::Pin;
7 use std::sync::Arc;
8 
9 #[cfg(any(target_os = "android", target_os = "linux"))]
10 use base::warn;
11 #[cfg(any(target_os = "android", target_os = "linux"))]
12 use base::AsRawDescriptors;
13 #[cfg(any(target_os = "android", target_os = "linux"))]
14 use base::RawDescriptor;
15 use once_cell::sync::OnceCell;
16 use serde::Deserialize;
17 use serde_keyvalue::argh::FromArgValue;
18 use serde_keyvalue::ErrorKind;
19 use serde_keyvalue::KeyValueDeserializer;
20 
21 use crate::common_executor;
22 use crate::common_executor::RawExecutor;
23 #[cfg(any(target_os = "android", target_os = "linux"))]
24 use crate::sys::linux;
25 #[cfg(windows)]
26 use crate::sys::windows;
27 use crate::sys::ExecutorKindSys;
28 use crate::AsyncResult;
29 use crate::IntoAsync;
30 use crate::IoSource;
31 
32 cfg_if::cfg_if! {
33     if #[cfg(feature = "tokio")] {
34         use crate::tokio_executor::TokioExecutor;
35         use crate::tokio_executor::TokioTaskHandle;
36     }
37 }
38 
39 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
40 pub enum ExecutorKind {
41     SysVariants(ExecutorKindSys),
42     #[cfg(feature = "tokio")]
43     Tokio,
44 }
45 
46 impl From<ExecutorKindSys> for ExecutorKind {
from(e: ExecutorKindSys) -> ExecutorKind47     fn from(e: ExecutorKindSys) -> ExecutorKind {
48         ExecutorKind::SysVariants(e)
49     }
50 }
51 
52 /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
53 /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and
54 /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
55 static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new();
56 
57 impl Default for ExecutorKind {
default() -> Self58     fn default() -> Self {
59         #[cfg(any(target_os = "android", target_os = "linux"))]
60         let default_fn = || ExecutorKindSys::Fd.into();
61         #[cfg(windows)]
62         let default_fn = || ExecutorKindSys::Handle.into();
63         *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn)
64     }
65 }
66 
67 /// The error type for [`Executor::set_default_executor_kind()`].
68 #[derive(thiserror::Error, Debug)]
69 pub enum SetDefaultExecutorKindError {
70     /// The default executor kind is set more than once.
71     #[error("The default executor kind is already set to {0:?}")]
72     SetMoreThanOnce(ExecutorKind),
73 
74     #[cfg(any(target_os = "android", target_os = "linux"))]
75     /// io_uring is unavailable. The reason might be the lack of the kernel support,
76     /// but is not limited to that.
77     #[error("io_uring is unavailable: {0}")]
78     UringUnavailable(linux::uring_executor::Error),
79 }
80 
81 impl FromArgValue for ExecutorKind {
from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String>82     fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> {
83         // `from_arg_value` returns a `String` as error, but our deserializer API defines its own
84         // error type. Perform parsing from a closure so we can easily map returned errors.
85         let builder = move || {
86             let mut des = KeyValueDeserializer::from(value);
87 
88             let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) {
89                 #[cfg(any(target_os = "android", target_os = "linux"))]
90                 ("epoll", None) => ExecutorKindSys::Fd.into(),
91                 #[cfg(any(target_os = "android", target_os = "linux"))]
92                 ("uring", None) => ExecutorKindSys::Uring.into(),
93                 #[cfg(windows)]
94                 ("handle", None) => ExecutorKindSys::Handle.into(),
95                 #[cfg(windows)]
96                 ("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(),
97                 #[cfg(windows)]
98                 ("overlapped", Some(',')) => {
99                     if des.parse_identifier()? != "concurrency" {
100                         let kind = ErrorKind::SerdeError("expected `concurrency`".to_string());
101                         return Err(des.error_here(kind));
102                     }
103                     if des.next_char() != Some('=') {
104                         return Err(des.error_here(ErrorKind::ExpectedEqual));
105                     }
106                     let concurrency = des.parse_number()?;
107                     ExecutorKindSys::Overlapped {
108                         concurrency: Some(concurrency),
109                     }
110                     .into()
111                 }
112                 #[cfg(feature = "tokio")]
113                 ("tokio", None) => ExecutorKind::Tokio,
114                 (_identifier, _next) => {
115                     let kind = ErrorKind::SerdeError("unexpected kind".to_string());
116                     return Err(des.error_here(kind));
117                 }
118             };
119             des.finish()?;
120             Ok(kind)
121         };
122 
123         builder().map_err(|e| e.to_string())
124     }
125 }
126 
127 impl serde::Serialize for ExecutorKind {
serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer,128     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129     where
130         S: serde::Serializer,
131     {
132         match self {
133             ExecutorKind::SysVariants(sv) => sv.serialize(serializer),
134             #[cfg(feature = "tokio")]
135             ExecutorKind::Tokio => "tokio".serialize(serializer),
136         }
137     }
138 }
139 
140 impl<'de> Deserialize<'de> for ExecutorKind {
deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> where D: serde::Deserializer<'de>,141     fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error>
142     where
143         D: serde::Deserializer<'de>,
144     {
145         let string = String::deserialize(deserializer)?;
146         ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom)
147     }
148 }
149 
150 /// Reference to a task managed by the executor.
151 ///
152 /// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to
153 /// continue running the background.
154 ///
155 /// `await`ing the `TaskHandle` waits for the task to finish and yields its result.
156 pub enum TaskHandle<R> {
157     #[cfg(any(target_os = "android", target_os = "linux"))]
158     Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>),
159     #[cfg(any(target_os = "android", target_os = "linux"))]
160     Uring(common_executor::RawTaskHandle<linux::UringReactor, R>),
161     #[cfg(windows)]
162     Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>),
163     #[cfg(feature = "tokio")]
164     Tokio(TokioTaskHandle<R>),
165 }
166 
167 impl<R: Send + 'static> TaskHandle<R> {
detach(self)168     pub fn detach(self) {
169         match self {
170             #[cfg(any(target_os = "android", target_os = "linux"))]
171             TaskHandle::Fd(f) => f.detach(),
172             #[cfg(any(target_os = "android", target_os = "linux"))]
173             TaskHandle::Uring(u) => u.detach(),
174             #[cfg(windows)]
175             TaskHandle::Handle(h) => h.detach(),
176             #[cfg(feature = "tokio")]
177             TaskHandle::Tokio(t) => t.detach(),
178         }
179     }
180 
181     // Cancel the task and wait for it to stop. Returns the result of the task if it was already
182     // finished.
cancel(self) -> Option<R>183     pub async fn cancel(self) -> Option<R> {
184         match self {
185             #[cfg(any(target_os = "android", target_os = "linux"))]
186             TaskHandle::Fd(f) => f.cancel().await,
187             #[cfg(any(target_os = "android", target_os = "linux"))]
188             TaskHandle::Uring(u) => u.cancel().await,
189             #[cfg(windows)]
190             TaskHandle::Handle(h) => h.cancel().await,
191             #[cfg(feature = "tokio")]
192             TaskHandle::Tokio(t) => t.cancel().await,
193         }
194     }
195 }
196 
197 impl<R: 'static> Future for TaskHandle<R> {
198     type Output = R;
199 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>200     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
201         match self.get_mut() {
202             #[cfg(any(target_os = "android", target_os = "linux"))]
203             TaskHandle::Fd(f) => Pin::new(f).poll(cx),
204             #[cfg(any(target_os = "android", target_os = "linux"))]
205             TaskHandle::Uring(u) => Pin::new(u).poll(cx),
206             #[cfg(windows)]
207             TaskHandle::Handle(h) => Pin::new(h).poll(cx),
208             #[cfg(feature = "tokio")]
209             TaskHandle::Tokio(t) => Pin::new(t).poll(cx),
210         }
211     }
212 }
213 
214 pub(crate) trait ExecutorTrait {
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>215     fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;
216 
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static217     fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
218     where
219         F: Future + Send + 'static,
220         F::Output: Send + 'static;
221 
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static222     fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
223     where
224         F: FnOnce() -> R + Send + 'static,
225         R: Send + 'static;
226 
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static227     fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
228     where
229         F: Future + 'static,
230         F::Output: 'static;
231 
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>232     fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;
233 }
234 
235 /// An executor for scheduling tasks that poll futures to completion.
236 ///
237 /// All asynchronous operations must run within an executor, which is capable of spawning futures as
238 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
239 ///
240 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
241 /// create a new reference, not a new executor.
242 ///
243 /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
244 /// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
245 /// interface duplication, but as far as we understand that is unavoidable.
246 ///
247 /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
248 /// for further details.
249 ///
250 /// # Examples
251 ///
252 /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
253 ///
254 /// ```
255 /// use std::cmp::min;
256 /// use std::error::Error;
257 /// use std::fs::{File, OpenOptions};
258 ///
259 /// use cros_async::{AsyncResult, Executor, IoSource, complete3};
260 /// const CHUNK_SIZE: usize = 32;
261 ///
262 /// // Write all bytes from `data` to `f`.
263 /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
264 ///     while data.len() > 0 {
265 ///         let (count, mut buf) = f.write_from_vec(None, data).await?;
266 ///
267 ///         data = buf.split_off(count);
268 ///     }
269 ///
270 ///     Ok(())
271 /// }
272 ///
273 /// // Transfer `len` bytes of data from `from` to `to`.
274 /// async fn transfer_data(
275 ///     from: IoSource<File>,
276 ///     to: IoSource<File>,
277 ///     len: usize,
278 /// ) -> AsyncResult<usize> {
279 ///     let mut rem = len;
280 ///
281 ///     while rem > 0 {
282 ///         let buf = vec![0u8; min(rem, CHUNK_SIZE)];
283 ///         let (count, mut data) = from.read_to_vec(None, buf).await?;
284 ///
285 ///         if count == 0 {
286 ///             // End of file. Return the number of bytes transferred.
287 ///             return Ok(len - rem);
288 ///         }
289 ///
290 ///         data.truncate(count);
291 ///         write_file(&to, data).await?;
292 ///
293 ///         rem = rem.saturating_sub(count);
294 ///     }
295 ///
296 ///     Ok(len)
297 /// }
298 ///
299 /// #[cfg(any(target_os = "android", target_os = "linux"))]
300 /// # fn do_it() -> Result<(), Box<dyn Error>> {
301 ///     let ex = Executor::new()?;
302 ///
303 ///     let (rx, tx) = base::linux::pipe()?;
304 ///     let zero = File::open("/dev/zero")?;
305 ///     let zero_bytes = CHUNK_SIZE * 7;
306 ///     let zero_to_pipe = transfer_data(
307 ///         ex.async_from(zero)?,
308 ///         ex.async_from(tx.try_clone()?)?,
309 ///         zero_bytes,
310 ///     );
311 ///
312 ///     let rand = File::open("/dev/urandom")?;
313 ///     let rand_bytes = CHUNK_SIZE * 19;
314 ///     let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
315 ///
316 ///     let null = OpenOptions::new().write(true).open("/dev/null")?;
317 ///     let null_bytes = zero_bytes + rand_bytes;
318 ///     let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
319 ///
320 ///     ex.run_until(complete3(
321 ///         async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
322 ///         async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
323 ///         async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
324 ///     ))?;
325 ///
326 /// #     Ok(())
327 /// # }
328 /// #[cfg(any(target_os = "android", target_os = "linux"))]
329 /// # do_it().unwrap();
330 /// ```
331 #[derive(Clone)]
332 pub enum Executor {
333     #[cfg(any(target_os = "android", target_os = "linux"))]
334     Fd(Arc<RawExecutor<linux::EpollReactor>>),
335     #[cfg(any(target_os = "android", target_os = "linux"))]
336     Uring(Arc<RawExecutor<linux::UringReactor>>),
337     #[cfg(windows)]
338     Handle(Arc<RawExecutor<windows::HandleReactor>>),
339     #[cfg(windows)]
340     Overlapped(Arc<RawExecutor<windows::HandleReactor>>),
341     #[cfg(feature = "tokio")]
342     Tokio(TokioExecutor),
343 }
344 
345 impl Executor {
346     /// Create a new `Executor`.
new() -> AsyncResult<Self>347     pub fn new() -> AsyncResult<Self> {
348         Executor::with_executor_kind(ExecutorKind::default())
349     }
350 
351     /// Create a new `Executor` of the given `ExecutorKind`.
with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>352     pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
353         Ok(match kind {
354             #[cfg(any(target_os = "android", target_os = "linux"))]
355             ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?),
356             #[cfg(any(target_os = "android", target_os = "linux"))]
357             ExecutorKind::SysVariants(ExecutorKindSys::Uring) => {
358                 Executor::Uring(RawExecutor::new()?)
359             }
360             #[cfg(windows)]
361             ExecutorKind::SysVariants(ExecutorKindSys::Handle) => {
362                 Executor::Handle(RawExecutor::new()?)
363             }
364             #[cfg(windows)]
365             ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => {
366                 let reactor = match concurrency {
367                     Some(concurrency) => windows::HandleReactor::new_with(concurrency)?,
368                     None => windows::HandleReactor::new()?,
369                 };
370                 Executor::Overlapped(RawExecutor::new_with(reactor)?)
371             }
372             #[cfg(feature = "tokio")]
373             ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?),
374         })
375     }
376 
377     /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>378     pub fn set_default_executor_kind(
379         executor_kind: ExecutorKind,
380     ) -> Result<(), SetDefaultExecutorKindError> {
381         #[cfg(any(target_os = "android", target_os = "linux"))]
382         if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) {
383             linux::uring_executor::check_uring_availability()
384                 .map_err(SetDefaultExecutorKindError::UringUnavailable)?;
385             if !crate::is_uring_stable() {
386                 warn!(
387                     "Enabling io_uring executor on the kernel version where io_uring is unstable"
388                 );
389             }
390         }
391         DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
392             // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
393             SetDefaultExecutorKindError::SetMoreThanOnce(
394                 *DEFAULT_EXECUTOR_KIND
395                     .get()
396                     .expect("Failed to get DEFAULT_EXECUTOR_KIND"),
397             ))
398     }
399 
400     /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
401     /// `IoSource` to directly start async operations without needing a separate reference to the
402     /// executor.
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>403     pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
404         match self {
405             #[cfg(any(target_os = "android", target_os = "linux"))]
406             Executor::Fd(ex) => ex.async_from(f),
407             #[cfg(any(target_os = "android", target_os = "linux"))]
408             Executor::Uring(ex) => ex.async_from(f),
409             #[cfg(windows)]
410             Executor::Handle(ex) => ex.async_from(f),
411             #[cfg(windows)]
412             Executor::Overlapped(ex) => ex.async_from(f),
413             #[cfg(feature = "tokio")]
414             Executor::Tokio(ex) => ex.async_from(f),
415         }
416     }
417 
418     /// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the
419     /// If the executor is not overlapped, then Handle source is returned.
420     /// returned `IoSource` to directly start async operations without needing a separate reference
421     /// to the executor.
422     #[cfg(windows)]
async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>423     pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
424         match self {
425             Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new(
426                 f, ex, false,
427             )?)),
428             _ => self.async_from(f),
429         }
430     }
431 
432     /// Spawn a new future for this executor to run to completion. Callers may use the returned
433     /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
434     /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
435     /// future associated with it use `TaskHandle::detach`.
436     ///
437     /// # Examples
438     ///
439     /// ```
440     /// # use cros_async::AsyncResult;
441     /// # fn example_spawn() -> AsyncResult<()> {
442     /// #      use std::thread;
443     ///
444     /// #      use cros_async::Executor;
445     ///        use futures::executor::block_on;
446     ///
447     /// #      let ex = Executor::new()?;
448     ///
449     /// #      // Spawn a thread that runs the executor.
450     /// #      let ex2 = ex.clone();
451     /// #      thread::spawn(move || ex2.run());
452     ///
453     ///       let task = ex.spawn(async { 7 + 13 });
454     ///
455     ///       let result = block_on(task);
456     ///       assert_eq!(result, 20);
457     /// #     Ok(())
458     /// # }
459     ///
460     /// # example_spawn().unwrap();
461     /// ```
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,462     pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
463     where
464         F: Future + Send + 'static,
465         F::Output: Send + 'static,
466     {
467         match self {
468             #[cfg(any(target_os = "android", target_os = "linux"))]
469             Executor::Fd(ex) => ex.spawn(f),
470             #[cfg(any(target_os = "android", target_os = "linux"))]
471             Executor::Uring(ex) => ex.spawn(f),
472             #[cfg(windows)]
473             Executor::Handle(ex) => ex.spawn(f),
474             #[cfg(windows)]
475             Executor::Overlapped(ex) => ex.spawn(f),
476             #[cfg(feature = "tokio")]
477             Executor::Tokio(ex) => ex.spawn(f),
478         }
479     }
480 
481     /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
482     /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
483     /// thread where `run()` or `run_until()` is called.
484     ///
485     /// # Panics
486     ///
487     /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
488     /// added by calling `spawn_local` from a different thread.
489     ///
490     /// # Examples
491     ///
492     /// ```
493     /// # use cros_async::AsyncResult;
494     /// # fn example_spawn_local() -> AsyncResult<()> {
495     /// #      use cros_async::Executor;
496     ///
497     /// #      let ex = Executor::new()?;
498     ///
499     ///        let task = ex.spawn_local(async { 7 + 13 });
500     ///
501     ///        let result = ex.run_until(task)?;
502     ///        assert_eq!(result, 20);
503     ///        Ok(())
504     /// # }
505     ///
506     /// # example_spawn_local().unwrap();
507     /// ```
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,508     pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
509     where
510         F: Future + 'static,
511         F::Output: 'static,
512     {
513         match self {
514             #[cfg(any(target_os = "android", target_os = "linux"))]
515             Executor::Fd(ex) => ex.spawn_local(f),
516             #[cfg(any(target_os = "android", target_os = "linux"))]
517             Executor::Uring(ex) => ex.spawn_local(f),
518             #[cfg(windows)]
519             Executor::Handle(ex) => ex.spawn_local(f),
520             #[cfg(windows)]
521             Executor::Overlapped(ex) => ex.spawn_local(f),
522             #[cfg(feature = "tokio")]
523             Executor::Tokio(ex) => ex.spawn_local(f),
524         }
525     }
526 
527     /// Run the provided closure on a dedicated thread where blocking is allowed.
528     ///
529     /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
530     /// the returned `TaskHandle` may not cancel the operation if it was already started on a
531     /// worker thread.
532     ///
533     /// # Panics
534     ///
535     /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
536     /// already completed.
537     ///
538     /// # Examples
539     ///
540     /// ```edition2018
541     /// # use cros_async::Executor;
542     ///
543     /// # async fn do_it(ex: &Executor) {
544     ///     let res = ex.spawn_blocking(move || {
545     ///         // Do some CPU-intensive or blocking work here.
546     ///
547     ///         42
548     ///     }).await;
549     ///
550     ///     assert_eq!(res, 42);
551     /// # }
552     ///
553     /// # let ex = Executor::new().unwrap();
554     /// # ex.run_until(do_it(&ex)).unwrap();
555     /// ```
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,556     pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
557     where
558         F: FnOnce() -> R + Send + 'static,
559         R: Send + 'static,
560     {
561         match self {
562             #[cfg(any(target_os = "android", target_os = "linux"))]
563             Executor::Fd(ex) => ex.spawn_blocking(f),
564             #[cfg(any(target_os = "android", target_os = "linux"))]
565             Executor::Uring(ex) => ex.spawn_blocking(f),
566             #[cfg(windows)]
567             Executor::Handle(ex) => ex.spawn_blocking(f),
568             #[cfg(windows)]
569             Executor::Overlapped(ex) => ex.spawn_blocking(f),
570             #[cfg(feature = "tokio")]
571             Executor::Tokio(ex) => ex.spawn_blocking(f),
572         }
573     }
574 
575     /// Run the executor indefinitely, driving all spawned futures to completion. This method will
576     /// block the current thread and only return in the case of an error.
577     ///
578     /// # Panics
579     ///
580     /// Once this method has been called on a thread, it may only be called on that thread from that
581     /// point on. Attempting to call it from another thread will panic.
582     ///
583     /// # Examples
584     ///
585     /// ```
586     /// # use cros_async::AsyncResult;
587     /// # fn example_run() -> AsyncResult<()> {
588     ///       use std::thread;
589     ///
590     ///       use cros_async::Executor;
591     ///       use futures::executor::block_on;
592     ///
593     ///       let ex = Executor::new()?;
594     ///
595     ///       // Spawn a thread that runs the executor.
596     ///       let ex2 = ex.clone();
597     ///       thread::spawn(move || ex2.run());
598     ///
599     ///       let task = ex.spawn(async { 7 + 13 });
600     ///
601     ///       let result = block_on(task);
602     ///       assert_eq!(result, 20);
603     /// #     Ok(())
604     /// # }
605     ///
606     /// # example_run().unwrap();
607     /// ```
run(&self) -> AsyncResult<()>608     pub fn run(&self) -> AsyncResult<()> {
609         self.run_until(std::future::pending())
610     }
611 
612     /// Drive all futures spawned in this executor until `f` completes. This method will block the
613     /// current thread only until `f` is complete and there may still be unfinished futures in the
614     /// executor.
615     ///
616     /// # Panics
617     ///
618     /// Once this method has been called on a thread, from then onwards it may only be called on
619     /// that thread. Attempting to call it from another thread will panic.
620     ///
621     /// # Examples
622     ///
623     /// ```
624     /// # use cros_async::AsyncResult;
625     /// # fn example_run_until() -> AsyncResult<()> {
626     ///       use cros_async::Executor;
627     ///
628     ///       let ex = Executor::new()?;
629     ///
630     ///       let task = ex.spawn_local(async { 7 + 13 });
631     ///
632     ///       let result = ex.run_until(task)?;
633     ///       assert_eq!(result, 20);
634     /// #     Ok(())
635     /// # }
636     ///
637     /// # example_run_until().unwrap();
638     /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>639     pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
640         match self {
641             #[cfg(any(target_os = "android", target_os = "linux"))]
642             Executor::Fd(ex) => ex.run_until(f),
643             #[cfg(any(target_os = "android", target_os = "linux"))]
644             Executor::Uring(ex) => ex.run_until(f),
645             #[cfg(windows)]
646             Executor::Handle(ex) => ex.run_until(f),
647             #[cfg(windows)]
648             Executor::Overlapped(ex) => ex.run_until(f),
649             #[cfg(feature = "tokio")]
650             Executor::Tokio(ex) => ex.run_until(f),
651         }
652     }
653 }
654 
655 #[cfg(any(target_os = "android", target_os = "linux"))]
656 impl AsRawDescriptors for Executor {
as_raw_descriptors(&self) -> Vec<RawDescriptor>657     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
658         match self {
659             Executor::Fd(ex) => ex.as_raw_descriptors(),
660             Executor::Uring(ex) => ex.as_raw_descriptors(),
661             #[cfg(feature = "tokio")]
662             Executor::Tokio(ex) => ex.as_raw_descriptors(),
663         }
664     }
665 }
666