xref: /aosp_15_r20/external/crosvm/cros_async/src/executor.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2024 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker 
5*bb4ee6a4SAndroid Build Coastguard Worker use std::future::Future;
6*bb4ee6a4SAndroid Build Coastguard Worker use std::pin::Pin;
7*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc;
8*bb4ee6a4SAndroid Build Coastguard Worker 
9*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))]
10*bb4ee6a4SAndroid Build Coastguard Worker use base::warn;
11*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))]
12*bb4ee6a4SAndroid Build Coastguard Worker use base::AsRawDescriptors;
13*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))]
14*bb4ee6a4SAndroid Build Coastguard Worker use base::RawDescriptor;
15*bb4ee6a4SAndroid Build Coastguard Worker use once_cell::sync::OnceCell;
16*bb4ee6a4SAndroid Build Coastguard Worker use serde::Deserialize;
17*bb4ee6a4SAndroid Build Coastguard Worker use serde_keyvalue::argh::FromArgValue;
18*bb4ee6a4SAndroid Build Coastguard Worker use serde_keyvalue::ErrorKind;
19*bb4ee6a4SAndroid Build Coastguard Worker use serde_keyvalue::KeyValueDeserializer;
20*bb4ee6a4SAndroid Build Coastguard Worker 
21*bb4ee6a4SAndroid Build Coastguard Worker use crate::common_executor;
22*bb4ee6a4SAndroid Build Coastguard Worker use crate::common_executor::RawExecutor;
23*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))]
24*bb4ee6a4SAndroid Build Coastguard Worker use crate::sys::linux;
25*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)]
26*bb4ee6a4SAndroid Build Coastguard Worker use crate::sys::windows;
27*bb4ee6a4SAndroid Build Coastguard Worker use crate::sys::ExecutorKindSys;
28*bb4ee6a4SAndroid Build Coastguard Worker use crate::AsyncResult;
29*bb4ee6a4SAndroid Build Coastguard Worker use crate::IntoAsync;
30*bb4ee6a4SAndroid Build Coastguard Worker use crate::IoSource;
31*bb4ee6a4SAndroid Build Coastguard Worker 
32*bb4ee6a4SAndroid Build Coastguard Worker cfg_if::cfg_if! {
33*bb4ee6a4SAndroid Build Coastguard Worker     if #[cfg(feature = "tokio")] {
34*bb4ee6a4SAndroid Build Coastguard Worker         use crate::tokio_executor::TokioExecutor;
35*bb4ee6a4SAndroid Build Coastguard Worker         use crate::tokio_executor::TokioTaskHandle;
36*bb4ee6a4SAndroid Build Coastguard Worker     }
37*bb4ee6a4SAndroid Build Coastguard Worker }
38*bb4ee6a4SAndroid Build Coastguard Worker 
39*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Clone, Copy, Debug, PartialEq, Eq)]
40*bb4ee6a4SAndroid Build Coastguard Worker pub enum ExecutorKind {
41*bb4ee6a4SAndroid Build Coastguard Worker     SysVariants(ExecutorKindSys),
42*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(feature = "tokio")]
43*bb4ee6a4SAndroid Build Coastguard Worker     Tokio,
44*bb4ee6a4SAndroid Build Coastguard Worker }
45*bb4ee6a4SAndroid Build Coastguard Worker 
46*bb4ee6a4SAndroid Build Coastguard Worker impl From<ExecutorKindSys> for ExecutorKind {
from(e: ExecutorKindSys) -> ExecutorKind47*bb4ee6a4SAndroid Build Coastguard Worker     fn from(e: ExecutorKindSys) -> ExecutorKind {
48*bb4ee6a4SAndroid Build Coastguard Worker         ExecutorKind::SysVariants(e)
49*bb4ee6a4SAndroid Build Coastguard Worker     }
50*bb4ee6a4SAndroid Build Coastguard Worker }
51*bb4ee6a4SAndroid Build Coastguard Worker 
52*bb4ee6a4SAndroid Build Coastguard Worker /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
53*bb4ee6a4SAndroid Build Coastguard Worker /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and
54*bb4ee6a4SAndroid Build Coastguard Worker /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
55*bb4ee6a4SAndroid Build Coastguard Worker static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new();
56*bb4ee6a4SAndroid Build Coastguard Worker 
57*bb4ee6a4SAndroid Build Coastguard Worker impl Default for ExecutorKind {
default() -> Self58*bb4ee6a4SAndroid Build Coastguard Worker     fn default() -> Self {
59*bb4ee6a4SAndroid Build Coastguard Worker         #[cfg(any(target_os = "android", target_os = "linux"))]
60*bb4ee6a4SAndroid Build Coastguard Worker         let default_fn = || ExecutorKindSys::Fd.into();
61*bb4ee6a4SAndroid Build Coastguard Worker         #[cfg(windows)]
62*bb4ee6a4SAndroid Build Coastguard Worker         let default_fn = || ExecutorKindSys::Handle.into();
63*bb4ee6a4SAndroid Build Coastguard Worker         *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn)
64*bb4ee6a4SAndroid Build Coastguard Worker     }
65*bb4ee6a4SAndroid Build Coastguard Worker }
66*bb4ee6a4SAndroid Build Coastguard Worker 
67*bb4ee6a4SAndroid Build Coastguard Worker /// The error type for [`Executor::set_default_executor_kind()`].
68*bb4ee6a4SAndroid Build Coastguard Worker #[derive(thiserror::Error, Debug)]
69*bb4ee6a4SAndroid Build Coastguard Worker pub enum SetDefaultExecutorKindError {
70*bb4ee6a4SAndroid Build Coastguard Worker     /// The default executor kind is set more than once.
71*bb4ee6a4SAndroid Build Coastguard Worker     #[error("The default executor kind is already set to {0:?}")]
72*bb4ee6a4SAndroid Build Coastguard Worker     SetMoreThanOnce(ExecutorKind),
73*bb4ee6a4SAndroid Build Coastguard Worker 
74*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(any(target_os = "android", target_os = "linux"))]
75*bb4ee6a4SAndroid Build Coastguard Worker     /// io_uring is unavailable. The reason might be the lack of the kernel support,
76*bb4ee6a4SAndroid Build Coastguard Worker     /// but is not limited to that.
77*bb4ee6a4SAndroid Build Coastguard Worker     #[error("io_uring is unavailable: {0}")]
78*bb4ee6a4SAndroid Build Coastguard Worker     UringUnavailable(linux::uring_executor::Error),
79*bb4ee6a4SAndroid Build Coastguard Worker }
80*bb4ee6a4SAndroid Build Coastguard Worker 
81*bb4ee6a4SAndroid Build Coastguard Worker impl FromArgValue for ExecutorKind {
from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String>82*bb4ee6a4SAndroid Build Coastguard Worker     fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> {
83*bb4ee6a4SAndroid Build Coastguard Worker         // `from_arg_value` returns a `String` as error, but our deserializer API defines its own
84*bb4ee6a4SAndroid Build Coastguard Worker         // error type. Perform parsing from a closure so we can easily map returned errors.
85*bb4ee6a4SAndroid Build Coastguard Worker         let builder = move || {
86*bb4ee6a4SAndroid Build Coastguard Worker             let mut des = KeyValueDeserializer::from(value);
87*bb4ee6a4SAndroid Build Coastguard Worker 
88*bb4ee6a4SAndroid Build Coastguard Worker             let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) {
89*bb4ee6a4SAndroid Build Coastguard Worker                 #[cfg(any(target_os = "android", target_os = "linux"))]
90*bb4ee6a4SAndroid Build Coastguard Worker                 ("epoll", None) => ExecutorKindSys::Fd.into(),
91*bb4ee6a4SAndroid Build Coastguard Worker                 #[cfg(any(target_os = "android", target_os = "linux"))]
92*bb4ee6a4SAndroid Build Coastguard Worker                 ("uring", None) => ExecutorKindSys::Uring.into(),
93*bb4ee6a4SAndroid Build Coastguard Worker                 #[cfg(windows)]
94*bb4ee6a4SAndroid Build Coastguard Worker                 ("handle", None) => ExecutorKindSys::Handle.into(),
95*bb4ee6a4SAndroid Build Coastguard Worker                 #[cfg(windows)]
96*bb4ee6a4SAndroid Build Coastguard Worker                 ("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(),
97*bb4ee6a4SAndroid Build Coastguard Worker                 #[cfg(windows)]
98*bb4ee6a4SAndroid Build Coastguard Worker                 ("overlapped", Some(',')) => {
99*bb4ee6a4SAndroid Build Coastguard Worker                     if des.parse_identifier()? != "concurrency" {
100*bb4ee6a4SAndroid Build Coastguard Worker                         let kind = ErrorKind::SerdeError("expected `concurrency`".to_string());
101*bb4ee6a4SAndroid Build Coastguard Worker                         return Err(des.error_here(kind));
102*bb4ee6a4SAndroid Build Coastguard Worker                     }
103*bb4ee6a4SAndroid Build Coastguard Worker                     if des.next_char() != Some('=') {
104*bb4ee6a4SAndroid Build Coastguard Worker                         return Err(des.error_here(ErrorKind::ExpectedEqual));
105*bb4ee6a4SAndroid Build Coastguard Worker                     }
106*bb4ee6a4SAndroid Build Coastguard Worker                     let concurrency = des.parse_number()?;
107*bb4ee6a4SAndroid Build Coastguard Worker                     ExecutorKindSys::Overlapped {
108*bb4ee6a4SAndroid Build Coastguard Worker                         concurrency: Some(concurrency),
109*bb4ee6a4SAndroid Build Coastguard Worker                     }
110*bb4ee6a4SAndroid Build Coastguard Worker                     .into()
111*bb4ee6a4SAndroid Build Coastguard Worker                 }
112*bb4ee6a4SAndroid Build Coastguard Worker                 #[cfg(feature = "tokio")]
113*bb4ee6a4SAndroid Build Coastguard Worker                 ("tokio", None) => ExecutorKind::Tokio,
114*bb4ee6a4SAndroid Build Coastguard Worker                 (_identifier, _next) => {
115*bb4ee6a4SAndroid Build Coastguard Worker                     let kind = ErrorKind::SerdeError("unexpected kind".to_string());
116*bb4ee6a4SAndroid Build Coastguard Worker                     return Err(des.error_here(kind));
117*bb4ee6a4SAndroid Build Coastguard Worker                 }
118*bb4ee6a4SAndroid Build Coastguard Worker             };
119*bb4ee6a4SAndroid Build Coastguard Worker             des.finish()?;
120*bb4ee6a4SAndroid Build Coastguard Worker             Ok(kind)
121*bb4ee6a4SAndroid Build Coastguard Worker         };
122*bb4ee6a4SAndroid Build Coastguard Worker 
123*bb4ee6a4SAndroid Build Coastguard Worker         builder().map_err(|e| e.to_string())
124*bb4ee6a4SAndroid Build Coastguard Worker     }
125*bb4ee6a4SAndroid Build Coastguard Worker }
126*bb4ee6a4SAndroid Build Coastguard Worker 
127*bb4ee6a4SAndroid Build Coastguard Worker impl serde::Serialize for ExecutorKind {
serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer,128*bb4ee6a4SAndroid Build Coastguard Worker     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129*bb4ee6a4SAndroid Build Coastguard Worker     where
130*bb4ee6a4SAndroid Build Coastguard Worker         S: serde::Serializer,
131*bb4ee6a4SAndroid Build Coastguard Worker     {
132*bb4ee6a4SAndroid Build Coastguard Worker         match self {
133*bb4ee6a4SAndroid Build Coastguard Worker             ExecutorKind::SysVariants(sv) => sv.serialize(serializer),
134*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
135*bb4ee6a4SAndroid Build Coastguard Worker             ExecutorKind::Tokio => "tokio".serialize(serializer),
136*bb4ee6a4SAndroid Build Coastguard Worker         }
137*bb4ee6a4SAndroid Build Coastguard Worker     }
138*bb4ee6a4SAndroid Build Coastguard Worker }
139*bb4ee6a4SAndroid Build Coastguard Worker 
140*bb4ee6a4SAndroid Build Coastguard Worker impl<'de> Deserialize<'de> for ExecutorKind {
deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> where D: serde::Deserializer<'de>,141*bb4ee6a4SAndroid Build Coastguard Worker     fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error>
142*bb4ee6a4SAndroid Build Coastguard Worker     where
143*bb4ee6a4SAndroid Build Coastguard Worker         D: serde::Deserializer<'de>,
144*bb4ee6a4SAndroid Build Coastguard Worker     {
145*bb4ee6a4SAndroid Build Coastguard Worker         let string = String::deserialize(deserializer)?;
146*bb4ee6a4SAndroid Build Coastguard Worker         ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom)
147*bb4ee6a4SAndroid Build Coastguard Worker     }
148*bb4ee6a4SAndroid Build Coastguard Worker }
149*bb4ee6a4SAndroid Build Coastguard Worker 
150*bb4ee6a4SAndroid Build Coastguard Worker /// Reference to a task managed by the executor.
151*bb4ee6a4SAndroid Build Coastguard Worker ///
152*bb4ee6a4SAndroid Build Coastguard Worker /// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to
153*bb4ee6a4SAndroid Build Coastguard Worker /// continue running the background.
154*bb4ee6a4SAndroid Build Coastguard Worker ///
155*bb4ee6a4SAndroid Build Coastguard Worker /// `await`ing the `TaskHandle` waits for the task to finish and yields its result.
156*bb4ee6a4SAndroid Build Coastguard Worker pub enum TaskHandle<R> {
157*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(any(target_os = "android", target_os = "linux"))]
158*bb4ee6a4SAndroid Build Coastguard Worker     Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>),
159*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(any(target_os = "android", target_os = "linux"))]
160*bb4ee6a4SAndroid Build Coastguard Worker     Uring(common_executor::RawTaskHandle<linux::UringReactor, R>),
161*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(windows)]
162*bb4ee6a4SAndroid Build Coastguard Worker     Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>),
163*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(feature = "tokio")]
164*bb4ee6a4SAndroid Build Coastguard Worker     Tokio(TokioTaskHandle<R>),
165*bb4ee6a4SAndroid Build Coastguard Worker }
166*bb4ee6a4SAndroid Build Coastguard Worker 
167*bb4ee6a4SAndroid Build Coastguard Worker impl<R: Send + 'static> TaskHandle<R> {
detach(self)168*bb4ee6a4SAndroid Build Coastguard Worker     pub fn detach(self) {
169*bb4ee6a4SAndroid Build Coastguard Worker         match self {
170*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
171*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Fd(f) => f.detach(),
172*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
173*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Uring(u) => u.detach(),
174*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
175*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Handle(h) => h.detach(),
176*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
177*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Tokio(t) => t.detach(),
178*bb4ee6a4SAndroid Build Coastguard Worker         }
179*bb4ee6a4SAndroid Build Coastguard Worker     }
180*bb4ee6a4SAndroid Build Coastguard Worker 
181*bb4ee6a4SAndroid Build Coastguard Worker     // Cancel the task and wait for it to stop. Returns the result of the task if it was already
182*bb4ee6a4SAndroid Build Coastguard Worker     // finished.
cancel(self) -> Option<R>183*bb4ee6a4SAndroid Build Coastguard Worker     pub async fn cancel(self) -> Option<R> {
184*bb4ee6a4SAndroid Build Coastguard Worker         match self {
185*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
186*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Fd(f) => f.cancel().await,
187*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
188*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Uring(u) => u.cancel().await,
189*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
190*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Handle(h) => h.cancel().await,
191*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
192*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Tokio(t) => t.cancel().await,
193*bb4ee6a4SAndroid Build Coastguard Worker         }
194*bb4ee6a4SAndroid Build Coastguard Worker     }
195*bb4ee6a4SAndroid Build Coastguard Worker }
196*bb4ee6a4SAndroid Build Coastguard Worker 
197*bb4ee6a4SAndroid Build Coastguard Worker impl<R: 'static> Future for TaskHandle<R> {
198*bb4ee6a4SAndroid Build Coastguard Worker     type Output = R;
199*bb4ee6a4SAndroid Build Coastguard Worker 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>200*bb4ee6a4SAndroid Build Coastguard Worker     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
201*bb4ee6a4SAndroid Build Coastguard Worker         match self.get_mut() {
202*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
203*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Fd(f) => Pin::new(f).poll(cx),
204*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
205*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Uring(u) => Pin::new(u).poll(cx),
206*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
207*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Handle(h) => Pin::new(h).poll(cx),
208*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
209*bb4ee6a4SAndroid Build Coastguard Worker             TaskHandle::Tokio(t) => Pin::new(t).poll(cx),
210*bb4ee6a4SAndroid Build Coastguard Worker         }
211*bb4ee6a4SAndroid Build Coastguard Worker     }
212*bb4ee6a4SAndroid Build Coastguard Worker }
213*bb4ee6a4SAndroid Build Coastguard Worker 
214*bb4ee6a4SAndroid Build Coastguard Worker pub(crate) trait ExecutorTrait {
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>215*bb4ee6a4SAndroid Build Coastguard Worker     fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;
216*bb4ee6a4SAndroid Build Coastguard Worker 
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static217*bb4ee6a4SAndroid Build Coastguard Worker     fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
218*bb4ee6a4SAndroid Build Coastguard Worker     where
219*bb4ee6a4SAndroid Build Coastguard Worker         F: Future + Send + 'static,
220*bb4ee6a4SAndroid Build Coastguard Worker         F::Output: Send + 'static;
221*bb4ee6a4SAndroid Build Coastguard Worker 
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static222*bb4ee6a4SAndroid Build Coastguard Worker     fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
223*bb4ee6a4SAndroid Build Coastguard Worker     where
224*bb4ee6a4SAndroid Build Coastguard Worker         F: FnOnce() -> R + Send + 'static,
225*bb4ee6a4SAndroid Build Coastguard Worker         R: Send + 'static;
226*bb4ee6a4SAndroid Build Coastguard Worker 
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static227*bb4ee6a4SAndroid Build Coastguard Worker     fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
228*bb4ee6a4SAndroid Build Coastguard Worker     where
229*bb4ee6a4SAndroid Build Coastguard Worker         F: Future + 'static,
230*bb4ee6a4SAndroid Build Coastguard Worker         F::Output: 'static;
231*bb4ee6a4SAndroid Build Coastguard Worker 
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>232*bb4ee6a4SAndroid Build Coastguard Worker     fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;
233*bb4ee6a4SAndroid Build Coastguard Worker }
234*bb4ee6a4SAndroid Build Coastguard Worker 
235*bb4ee6a4SAndroid Build Coastguard Worker /// An executor for scheduling tasks that poll futures to completion.
236*bb4ee6a4SAndroid Build Coastguard Worker ///
237*bb4ee6a4SAndroid Build Coastguard Worker /// All asynchronous operations must run within an executor, which is capable of spawning futures as
238*bb4ee6a4SAndroid Build Coastguard Worker /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
239*bb4ee6a4SAndroid Build Coastguard Worker ///
240*bb4ee6a4SAndroid Build Coastguard Worker /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
241*bb4ee6a4SAndroid Build Coastguard Worker /// create a new reference, not a new executor.
242*bb4ee6a4SAndroid Build Coastguard Worker ///
243*bb4ee6a4SAndroid Build Coastguard Worker /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
244*bb4ee6a4SAndroid Build Coastguard Worker /// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
245*bb4ee6a4SAndroid Build Coastguard Worker /// interface duplication, but as far as we understand that is unavoidable.
246*bb4ee6a4SAndroid Build Coastguard Worker ///
247*bb4ee6a4SAndroid Build Coastguard Worker /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
248*bb4ee6a4SAndroid Build Coastguard Worker /// for further details.
249*bb4ee6a4SAndroid Build Coastguard Worker ///
250*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples
251*bb4ee6a4SAndroid Build Coastguard Worker ///
252*bb4ee6a4SAndroid Build Coastguard Worker /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
253*bb4ee6a4SAndroid Build Coastguard Worker ///
254*bb4ee6a4SAndroid Build Coastguard Worker /// ```
255*bb4ee6a4SAndroid Build Coastguard Worker /// use std::cmp::min;
256*bb4ee6a4SAndroid Build Coastguard Worker /// use std::error::Error;
257*bb4ee6a4SAndroid Build Coastguard Worker /// use std::fs::{File, OpenOptions};
258*bb4ee6a4SAndroid Build Coastguard Worker ///
259*bb4ee6a4SAndroid Build Coastguard Worker /// use cros_async::{AsyncResult, Executor, IoSource, complete3};
260*bb4ee6a4SAndroid Build Coastguard Worker /// const CHUNK_SIZE: usize = 32;
261*bb4ee6a4SAndroid Build Coastguard Worker ///
262*bb4ee6a4SAndroid Build Coastguard Worker /// // Write all bytes from `data` to `f`.
263*bb4ee6a4SAndroid Build Coastguard Worker /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
264*bb4ee6a4SAndroid Build Coastguard Worker ///     while data.len() > 0 {
265*bb4ee6a4SAndroid Build Coastguard Worker ///         let (count, mut buf) = f.write_from_vec(None, data).await?;
266*bb4ee6a4SAndroid Build Coastguard Worker ///
267*bb4ee6a4SAndroid Build Coastguard Worker ///         data = buf.split_off(count);
268*bb4ee6a4SAndroid Build Coastguard Worker ///     }
269*bb4ee6a4SAndroid Build Coastguard Worker ///
270*bb4ee6a4SAndroid Build Coastguard Worker ///     Ok(())
271*bb4ee6a4SAndroid Build Coastguard Worker /// }
272*bb4ee6a4SAndroid Build Coastguard Worker ///
273*bb4ee6a4SAndroid Build Coastguard Worker /// // Transfer `len` bytes of data from `from` to `to`.
274*bb4ee6a4SAndroid Build Coastguard Worker /// async fn transfer_data(
275*bb4ee6a4SAndroid Build Coastguard Worker ///     from: IoSource<File>,
276*bb4ee6a4SAndroid Build Coastguard Worker ///     to: IoSource<File>,
277*bb4ee6a4SAndroid Build Coastguard Worker ///     len: usize,
278*bb4ee6a4SAndroid Build Coastguard Worker /// ) -> AsyncResult<usize> {
279*bb4ee6a4SAndroid Build Coastguard Worker ///     let mut rem = len;
280*bb4ee6a4SAndroid Build Coastguard Worker ///
281*bb4ee6a4SAndroid Build Coastguard Worker ///     while rem > 0 {
282*bb4ee6a4SAndroid Build Coastguard Worker ///         let buf = vec![0u8; min(rem, CHUNK_SIZE)];
283*bb4ee6a4SAndroid Build Coastguard Worker ///         let (count, mut data) = from.read_to_vec(None, buf).await?;
284*bb4ee6a4SAndroid Build Coastguard Worker ///
285*bb4ee6a4SAndroid Build Coastguard Worker ///         if count == 0 {
286*bb4ee6a4SAndroid Build Coastguard Worker ///             // End of file. Return the number of bytes transferred.
287*bb4ee6a4SAndroid Build Coastguard Worker ///             return Ok(len - rem);
288*bb4ee6a4SAndroid Build Coastguard Worker ///         }
289*bb4ee6a4SAndroid Build Coastguard Worker ///
290*bb4ee6a4SAndroid Build Coastguard Worker ///         data.truncate(count);
291*bb4ee6a4SAndroid Build Coastguard Worker ///         write_file(&to, data).await?;
292*bb4ee6a4SAndroid Build Coastguard Worker ///
293*bb4ee6a4SAndroid Build Coastguard Worker ///         rem = rem.saturating_sub(count);
294*bb4ee6a4SAndroid Build Coastguard Worker ///     }
295*bb4ee6a4SAndroid Build Coastguard Worker ///
296*bb4ee6a4SAndroid Build Coastguard Worker ///     Ok(len)
297*bb4ee6a4SAndroid Build Coastguard Worker /// }
298*bb4ee6a4SAndroid Build Coastguard Worker ///
299*bb4ee6a4SAndroid Build Coastguard Worker /// #[cfg(any(target_os = "android", target_os = "linux"))]
300*bb4ee6a4SAndroid Build Coastguard Worker /// # fn do_it() -> Result<(), Box<dyn Error>> {
301*bb4ee6a4SAndroid Build Coastguard Worker ///     let ex = Executor::new()?;
302*bb4ee6a4SAndroid Build Coastguard Worker ///
303*bb4ee6a4SAndroid Build Coastguard Worker ///     let (rx, tx) = base::linux::pipe()?;
304*bb4ee6a4SAndroid Build Coastguard Worker ///     let zero = File::open("/dev/zero")?;
305*bb4ee6a4SAndroid Build Coastguard Worker ///     let zero_bytes = CHUNK_SIZE * 7;
306*bb4ee6a4SAndroid Build Coastguard Worker ///     let zero_to_pipe = transfer_data(
307*bb4ee6a4SAndroid Build Coastguard Worker ///         ex.async_from(zero)?,
308*bb4ee6a4SAndroid Build Coastguard Worker ///         ex.async_from(tx.try_clone()?)?,
309*bb4ee6a4SAndroid Build Coastguard Worker ///         zero_bytes,
310*bb4ee6a4SAndroid Build Coastguard Worker ///     );
311*bb4ee6a4SAndroid Build Coastguard Worker ///
312*bb4ee6a4SAndroid Build Coastguard Worker ///     let rand = File::open("/dev/urandom")?;
313*bb4ee6a4SAndroid Build Coastguard Worker ///     let rand_bytes = CHUNK_SIZE * 19;
314*bb4ee6a4SAndroid Build Coastguard Worker ///     let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
315*bb4ee6a4SAndroid Build Coastguard Worker ///
316*bb4ee6a4SAndroid Build Coastguard Worker ///     let null = OpenOptions::new().write(true).open("/dev/null")?;
317*bb4ee6a4SAndroid Build Coastguard Worker ///     let null_bytes = zero_bytes + rand_bytes;
318*bb4ee6a4SAndroid Build Coastguard Worker ///     let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
319*bb4ee6a4SAndroid Build Coastguard Worker ///
320*bb4ee6a4SAndroid Build Coastguard Worker ///     ex.run_until(complete3(
321*bb4ee6a4SAndroid Build Coastguard Worker ///         async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
322*bb4ee6a4SAndroid Build Coastguard Worker ///         async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
323*bb4ee6a4SAndroid Build Coastguard Worker ///         async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
324*bb4ee6a4SAndroid Build Coastguard Worker ///     ))?;
325*bb4ee6a4SAndroid Build Coastguard Worker ///
326*bb4ee6a4SAndroid Build Coastguard Worker /// #     Ok(())
327*bb4ee6a4SAndroid Build Coastguard Worker /// # }
328*bb4ee6a4SAndroid Build Coastguard Worker /// #[cfg(any(target_os = "android", target_os = "linux"))]
329*bb4ee6a4SAndroid Build Coastguard Worker /// # do_it().unwrap();
330*bb4ee6a4SAndroid Build Coastguard Worker /// ```
331*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Clone)]
332*bb4ee6a4SAndroid Build Coastguard Worker pub enum Executor {
333*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(any(target_os = "android", target_os = "linux"))]
334*bb4ee6a4SAndroid Build Coastguard Worker     Fd(Arc<RawExecutor<linux::EpollReactor>>),
335*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(any(target_os = "android", target_os = "linux"))]
336*bb4ee6a4SAndroid Build Coastguard Worker     Uring(Arc<RawExecutor<linux::UringReactor>>),
337*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(windows)]
338*bb4ee6a4SAndroid Build Coastguard Worker     Handle(Arc<RawExecutor<windows::HandleReactor>>),
339*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(windows)]
340*bb4ee6a4SAndroid Build Coastguard Worker     Overlapped(Arc<RawExecutor<windows::HandleReactor>>),
341*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(feature = "tokio")]
342*bb4ee6a4SAndroid Build Coastguard Worker     Tokio(TokioExecutor),
343*bb4ee6a4SAndroid Build Coastguard Worker }
344*bb4ee6a4SAndroid Build Coastguard Worker 
345*bb4ee6a4SAndroid Build Coastguard Worker impl Executor {
346*bb4ee6a4SAndroid Build Coastguard Worker     /// Create a new `Executor`.
new() -> AsyncResult<Self>347*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new() -> AsyncResult<Self> {
348*bb4ee6a4SAndroid Build Coastguard Worker         Executor::with_executor_kind(ExecutorKind::default())
349*bb4ee6a4SAndroid Build Coastguard Worker     }
350*bb4ee6a4SAndroid Build Coastguard Worker 
351*bb4ee6a4SAndroid Build Coastguard Worker     /// Create a new `Executor` of the given `ExecutorKind`.
with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>352*bb4ee6a4SAndroid Build Coastguard Worker     pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
353*bb4ee6a4SAndroid Build Coastguard Worker         Ok(match kind {
354*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
355*bb4ee6a4SAndroid Build Coastguard Worker             ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?),
356*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
357*bb4ee6a4SAndroid Build Coastguard Worker             ExecutorKind::SysVariants(ExecutorKindSys::Uring) => {
358*bb4ee6a4SAndroid Build Coastguard Worker                 Executor::Uring(RawExecutor::new()?)
359*bb4ee6a4SAndroid Build Coastguard Worker             }
360*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
361*bb4ee6a4SAndroid Build Coastguard Worker             ExecutorKind::SysVariants(ExecutorKindSys::Handle) => {
362*bb4ee6a4SAndroid Build Coastguard Worker                 Executor::Handle(RawExecutor::new()?)
363*bb4ee6a4SAndroid Build Coastguard Worker             }
364*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
365*bb4ee6a4SAndroid Build Coastguard Worker             ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => {
366*bb4ee6a4SAndroid Build Coastguard Worker                 let reactor = match concurrency {
367*bb4ee6a4SAndroid Build Coastguard Worker                     Some(concurrency) => windows::HandleReactor::new_with(concurrency)?,
368*bb4ee6a4SAndroid Build Coastguard Worker                     None => windows::HandleReactor::new()?,
369*bb4ee6a4SAndroid Build Coastguard Worker                 };
370*bb4ee6a4SAndroid Build Coastguard Worker                 Executor::Overlapped(RawExecutor::new_with(reactor)?)
371*bb4ee6a4SAndroid Build Coastguard Worker             }
372*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
373*bb4ee6a4SAndroid Build Coastguard Worker             ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?),
374*bb4ee6a4SAndroid Build Coastguard Worker         })
375*bb4ee6a4SAndroid Build Coastguard Worker     }
376*bb4ee6a4SAndroid Build Coastguard Worker 
377*bb4ee6a4SAndroid Build Coastguard Worker     /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>378*bb4ee6a4SAndroid Build Coastguard Worker     pub fn set_default_executor_kind(
379*bb4ee6a4SAndroid Build Coastguard Worker         executor_kind: ExecutorKind,
380*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Result<(), SetDefaultExecutorKindError> {
381*bb4ee6a4SAndroid Build Coastguard Worker         #[cfg(any(target_os = "android", target_os = "linux"))]
382*bb4ee6a4SAndroid Build Coastguard Worker         if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) {
383*bb4ee6a4SAndroid Build Coastguard Worker             linux::uring_executor::check_uring_availability()
384*bb4ee6a4SAndroid Build Coastguard Worker                 .map_err(SetDefaultExecutorKindError::UringUnavailable)?;
385*bb4ee6a4SAndroid Build Coastguard Worker             if !crate::is_uring_stable() {
386*bb4ee6a4SAndroid Build Coastguard Worker                 warn!(
387*bb4ee6a4SAndroid Build Coastguard Worker                     "Enabling io_uring executor on the kernel version where io_uring is unstable"
388*bb4ee6a4SAndroid Build Coastguard Worker                 );
389*bb4ee6a4SAndroid Build Coastguard Worker             }
390*bb4ee6a4SAndroid Build Coastguard Worker         }
391*bb4ee6a4SAndroid Build Coastguard Worker         DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
392*bb4ee6a4SAndroid Build Coastguard Worker             // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
393*bb4ee6a4SAndroid Build Coastguard Worker             SetDefaultExecutorKindError::SetMoreThanOnce(
394*bb4ee6a4SAndroid Build Coastguard Worker                 *DEFAULT_EXECUTOR_KIND
395*bb4ee6a4SAndroid Build Coastguard Worker                     .get()
396*bb4ee6a4SAndroid Build Coastguard Worker                     .expect("Failed to get DEFAULT_EXECUTOR_KIND"),
397*bb4ee6a4SAndroid Build Coastguard Worker             ))
398*bb4ee6a4SAndroid Build Coastguard Worker     }
399*bb4ee6a4SAndroid Build Coastguard Worker 
400*bb4ee6a4SAndroid Build Coastguard Worker     /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
401*bb4ee6a4SAndroid Build Coastguard Worker     /// `IoSource` to directly start async operations without needing a separate reference to the
402*bb4ee6a4SAndroid Build Coastguard Worker     /// executor.
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>403*bb4ee6a4SAndroid Build Coastguard Worker     pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
404*bb4ee6a4SAndroid Build Coastguard Worker         match self {
405*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
406*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Fd(ex) => ex.async_from(f),
407*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
408*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Uring(ex) => ex.async_from(f),
409*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
410*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Handle(ex) => ex.async_from(f),
411*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
412*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Overlapped(ex) => ex.async_from(f),
413*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
414*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Tokio(ex) => ex.async_from(f),
415*bb4ee6a4SAndroid Build Coastguard Worker         }
416*bb4ee6a4SAndroid Build Coastguard Worker     }
417*bb4ee6a4SAndroid Build Coastguard Worker 
418*bb4ee6a4SAndroid Build Coastguard Worker     /// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the
419*bb4ee6a4SAndroid Build Coastguard Worker     /// If the executor is not overlapped, then Handle source is returned.
420*bb4ee6a4SAndroid Build Coastguard Worker     /// returned `IoSource` to directly start async operations without needing a separate reference
421*bb4ee6a4SAndroid Build Coastguard Worker     /// to the executor.
422*bb4ee6a4SAndroid Build Coastguard Worker     #[cfg(windows)]
async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>423*bb4ee6a4SAndroid Build Coastguard Worker     pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
424*bb4ee6a4SAndroid Build Coastguard Worker         match self {
425*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new(
426*bb4ee6a4SAndroid Build Coastguard Worker                 f, ex, false,
427*bb4ee6a4SAndroid Build Coastguard Worker             )?)),
428*bb4ee6a4SAndroid Build Coastguard Worker             _ => self.async_from(f),
429*bb4ee6a4SAndroid Build Coastguard Worker         }
430*bb4ee6a4SAndroid Build Coastguard Worker     }
431*bb4ee6a4SAndroid Build Coastguard Worker 
432*bb4ee6a4SAndroid Build Coastguard Worker     /// Spawn a new future for this executor to run to completion. Callers may use the returned
433*bb4ee6a4SAndroid Build Coastguard Worker     /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
434*bb4ee6a4SAndroid Build Coastguard Worker     /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
435*bb4ee6a4SAndroid Build Coastguard Worker     /// future associated with it use `TaskHandle::detach`.
436*bb4ee6a4SAndroid Build Coastguard Worker     ///
437*bb4ee6a4SAndroid Build Coastguard Worker     /// # Examples
438*bb4ee6a4SAndroid Build Coastguard Worker     ///
439*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
440*bb4ee6a4SAndroid Build Coastguard Worker     /// # use cros_async::AsyncResult;
441*bb4ee6a4SAndroid Build Coastguard Worker     /// # fn example_spawn() -> AsyncResult<()> {
442*bb4ee6a4SAndroid Build Coastguard Worker     /// #      use std::thread;
443*bb4ee6a4SAndroid Build Coastguard Worker     ///
444*bb4ee6a4SAndroid Build Coastguard Worker     /// #      use cros_async::Executor;
445*bb4ee6a4SAndroid Build Coastguard Worker     ///        use futures::executor::block_on;
446*bb4ee6a4SAndroid Build Coastguard Worker     ///
447*bb4ee6a4SAndroid Build Coastguard Worker     /// #      let ex = Executor::new()?;
448*bb4ee6a4SAndroid Build Coastguard Worker     ///
449*bb4ee6a4SAndroid Build Coastguard Worker     /// #      // Spawn a thread that runs the executor.
450*bb4ee6a4SAndroid Build Coastguard Worker     /// #      let ex2 = ex.clone();
451*bb4ee6a4SAndroid Build Coastguard Worker     /// #      thread::spawn(move || ex2.run());
452*bb4ee6a4SAndroid Build Coastguard Worker     ///
453*bb4ee6a4SAndroid Build Coastguard Worker     ///       let task = ex.spawn(async { 7 + 13 });
454*bb4ee6a4SAndroid Build Coastguard Worker     ///
455*bb4ee6a4SAndroid Build Coastguard Worker     ///       let result = block_on(task);
456*bb4ee6a4SAndroid Build Coastguard Worker     ///       assert_eq!(result, 20);
457*bb4ee6a4SAndroid Build Coastguard Worker     /// #     Ok(())
458*bb4ee6a4SAndroid Build Coastguard Worker     /// # }
459*bb4ee6a4SAndroid Build Coastguard Worker     ///
460*bb4ee6a4SAndroid Build Coastguard Worker     /// # example_spawn().unwrap();
461*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,462*bb4ee6a4SAndroid Build Coastguard Worker     pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
463*bb4ee6a4SAndroid Build Coastguard Worker     where
464*bb4ee6a4SAndroid Build Coastguard Worker         F: Future + Send + 'static,
465*bb4ee6a4SAndroid Build Coastguard Worker         F::Output: Send + 'static,
466*bb4ee6a4SAndroid Build Coastguard Worker     {
467*bb4ee6a4SAndroid Build Coastguard Worker         match self {
468*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
469*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Fd(ex) => ex.spawn(f),
470*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
471*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Uring(ex) => ex.spawn(f),
472*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
473*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Handle(ex) => ex.spawn(f),
474*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
475*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Overlapped(ex) => ex.spawn(f),
476*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
477*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Tokio(ex) => ex.spawn(f),
478*bb4ee6a4SAndroid Build Coastguard Worker         }
479*bb4ee6a4SAndroid Build Coastguard Worker     }
480*bb4ee6a4SAndroid Build Coastguard Worker 
481*bb4ee6a4SAndroid Build Coastguard Worker     /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
482*bb4ee6a4SAndroid Build Coastguard Worker     /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
483*bb4ee6a4SAndroid Build Coastguard Worker     /// thread where `run()` or `run_until()` is called.
484*bb4ee6a4SAndroid Build Coastguard Worker     ///
485*bb4ee6a4SAndroid Build Coastguard Worker     /// # Panics
486*bb4ee6a4SAndroid Build Coastguard Worker     ///
487*bb4ee6a4SAndroid Build Coastguard Worker     /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
488*bb4ee6a4SAndroid Build Coastguard Worker     /// added by calling `spawn_local` from a different thread.
489*bb4ee6a4SAndroid Build Coastguard Worker     ///
490*bb4ee6a4SAndroid Build Coastguard Worker     /// # Examples
491*bb4ee6a4SAndroid Build Coastguard Worker     ///
492*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
493*bb4ee6a4SAndroid Build Coastguard Worker     /// # use cros_async::AsyncResult;
494*bb4ee6a4SAndroid Build Coastguard Worker     /// # fn example_spawn_local() -> AsyncResult<()> {
495*bb4ee6a4SAndroid Build Coastguard Worker     /// #      use cros_async::Executor;
496*bb4ee6a4SAndroid Build Coastguard Worker     ///
497*bb4ee6a4SAndroid Build Coastguard Worker     /// #      let ex = Executor::new()?;
498*bb4ee6a4SAndroid Build Coastguard Worker     ///
499*bb4ee6a4SAndroid Build Coastguard Worker     ///        let task = ex.spawn_local(async { 7 + 13 });
500*bb4ee6a4SAndroid Build Coastguard Worker     ///
501*bb4ee6a4SAndroid Build Coastguard Worker     ///        let result = ex.run_until(task)?;
502*bb4ee6a4SAndroid Build Coastguard Worker     ///        assert_eq!(result, 20);
503*bb4ee6a4SAndroid Build Coastguard Worker     ///        Ok(())
504*bb4ee6a4SAndroid Build Coastguard Worker     /// # }
505*bb4ee6a4SAndroid Build Coastguard Worker     ///
506*bb4ee6a4SAndroid Build Coastguard Worker     /// # example_spawn_local().unwrap();
507*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,508*bb4ee6a4SAndroid Build Coastguard Worker     pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
509*bb4ee6a4SAndroid Build Coastguard Worker     where
510*bb4ee6a4SAndroid Build Coastguard Worker         F: Future + 'static,
511*bb4ee6a4SAndroid Build Coastguard Worker         F::Output: 'static,
512*bb4ee6a4SAndroid Build Coastguard Worker     {
513*bb4ee6a4SAndroid Build Coastguard Worker         match self {
514*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
515*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Fd(ex) => ex.spawn_local(f),
516*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
517*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Uring(ex) => ex.spawn_local(f),
518*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
519*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Handle(ex) => ex.spawn_local(f),
520*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
521*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Overlapped(ex) => ex.spawn_local(f),
522*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
523*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Tokio(ex) => ex.spawn_local(f),
524*bb4ee6a4SAndroid Build Coastguard Worker         }
525*bb4ee6a4SAndroid Build Coastguard Worker     }
526*bb4ee6a4SAndroid Build Coastguard Worker 
527*bb4ee6a4SAndroid Build Coastguard Worker     /// Run the provided closure on a dedicated thread where blocking is allowed.
528*bb4ee6a4SAndroid Build Coastguard Worker     ///
529*bb4ee6a4SAndroid Build Coastguard Worker     /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
530*bb4ee6a4SAndroid Build Coastguard Worker     /// the returned `TaskHandle` may not cancel the operation if it was already started on a
531*bb4ee6a4SAndroid Build Coastguard Worker     /// worker thread.
532*bb4ee6a4SAndroid Build Coastguard Worker     ///
533*bb4ee6a4SAndroid Build Coastguard Worker     /// # Panics
534*bb4ee6a4SAndroid Build Coastguard Worker     ///
535*bb4ee6a4SAndroid Build Coastguard Worker     /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
536*bb4ee6a4SAndroid Build Coastguard Worker     /// already completed.
537*bb4ee6a4SAndroid Build Coastguard Worker     ///
538*bb4ee6a4SAndroid Build Coastguard Worker     /// # Examples
539*bb4ee6a4SAndroid Build Coastguard Worker     ///
540*bb4ee6a4SAndroid Build Coastguard Worker     /// ```edition2018
541*bb4ee6a4SAndroid Build Coastguard Worker     /// # use cros_async::Executor;
542*bb4ee6a4SAndroid Build Coastguard Worker     ///
543*bb4ee6a4SAndroid Build Coastguard Worker     /// # async fn do_it(ex: &Executor) {
544*bb4ee6a4SAndroid Build Coastguard Worker     ///     let res = ex.spawn_blocking(move || {
545*bb4ee6a4SAndroid Build Coastguard Worker     ///         // Do some CPU-intensive or blocking work here.
546*bb4ee6a4SAndroid Build Coastguard Worker     ///
547*bb4ee6a4SAndroid Build Coastguard Worker     ///         42
548*bb4ee6a4SAndroid Build Coastguard Worker     ///     }).await;
549*bb4ee6a4SAndroid Build Coastguard Worker     ///
550*bb4ee6a4SAndroid Build Coastguard Worker     ///     assert_eq!(res, 42);
551*bb4ee6a4SAndroid Build Coastguard Worker     /// # }
552*bb4ee6a4SAndroid Build Coastguard Worker     ///
553*bb4ee6a4SAndroid Build Coastguard Worker     /// # let ex = Executor::new().unwrap();
554*bb4ee6a4SAndroid Build Coastguard Worker     /// # ex.run_until(do_it(&ex)).unwrap();
555*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,556*bb4ee6a4SAndroid Build Coastguard Worker     pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
557*bb4ee6a4SAndroid Build Coastguard Worker     where
558*bb4ee6a4SAndroid Build Coastguard Worker         F: FnOnce() -> R + Send + 'static,
559*bb4ee6a4SAndroid Build Coastguard Worker         R: Send + 'static,
560*bb4ee6a4SAndroid Build Coastguard Worker     {
561*bb4ee6a4SAndroid Build Coastguard Worker         match self {
562*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
563*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Fd(ex) => ex.spawn_blocking(f),
564*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
565*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Uring(ex) => ex.spawn_blocking(f),
566*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
567*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Handle(ex) => ex.spawn_blocking(f),
568*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
569*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Overlapped(ex) => ex.spawn_blocking(f),
570*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
571*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Tokio(ex) => ex.spawn_blocking(f),
572*bb4ee6a4SAndroid Build Coastguard Worker         }
573*bb4ee6a4SAndroid Build Coastguard Worker     }
574*bb4ee6a4SAndroid Build Coastguard Worker 
575*bb4ee6a4SAndroid Build Coastguard Worker     /// Run the executor indefinitely, driving all spawned futures to completion. This method will
576*bb4ee6a4SAndroid Build Coastguard Worker     /// block the current thread and only return in the case of an error.
577*bb4ee6a4SAndroid Build Coastguard Worker     ///
578*bb4ee6a4SAndroid Build Coastguard Worker     /// # Panics
579*bb4ee6a4SAndroid Build Coastguard Worker     ///
580*bb4ee6a4SAndroid Build Coastguard Worker     /// Once this method has been called on a thread, it may only be called on that thread from that
581*bb4ee6a4SAndroid Build Coastguard Worker     /// point on. Attempting to call it from another thread will panic.
582*bb4ee6a4SAndroid Build Coastguard Worker     ///
583*bb4ee6a4SAndroid Build Coastguard Worker     /// # Examples
584*bb4ee6a4SAndroid Build Coastguard Worker     ///
585*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
586*bb4ee6a4SAndroid Build Coastguard Worker     /// # use cros_async::AsyncResult;
587*bb4ee6a4SAndroid Build Coastguard Worker     /// # fn example_run() -> AsyncResult<()> {
588*bb4ee6a4SAndroid Build Coastguard Worker     ///       use std::thread;
589*bb4ee6a4SAndroid Build Coastguard Worker     ///
590*bb4ee6a4SAndroid Build Coastguard Worker     ///       use cros_async::Executor;
591*bb4ee6a4SAndroid Build Coastguard Worker     ///       use futures::executor::block_on;
592*bb4ee6a4SAndroid Build Coastguard Worker     ///
593*bb4ee6a4SAndroid Build Coastguard Worker     ///       let ex = Executor::new()?;
594*bb4ee6a4SAndroid Build Coastguard Worker     ///
595*bb4ee6a4SAndroid Build Coastguard Worker     ///       // Spawn a thread that runs the executor.
596*bb4ee6a4SAndroid Build Coastguard Worker     ///       let ex2 = ex.clone();
597*bb4ee6a4SAndroid Build Coastguard Worker     ///       thread::spawn(move || ex2.run());
598*bb4ee6a4SAndroid Build Coastguard Worker     ///
599*bb4ee6a4SAndroid Build Coastguard Worker     ///       let task = ex.spawn(async { 7 + 13 });
600*bb4ee6a4SAndroid Build Coastguard Worker     ///
601*bb4ee6a4SAndroid Build Coastguard Worker     ///       let result = block_on(task);
602*bb4ee6a4SAndroid Build Coastguard Worker     ///       assert_eq!(result, 20);
603*bb4ee6a4SAndroid Build Coastguard Worker     /// #     Ok(())
604*bb4ee6a4SAndroid Build Coastguard Worker     /// # }
605*bb4ee6a4SAndroid Build Coastguard Worker     ///
606*bb4ee6a4SAndroid Build Coastguard Worker     /// # example_run().unwrap();
607*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
run(&self) -> AsyncResult<()>608*bb4ee6a4SAndroid Build Coastguard Worker     pub fn run(&self) -> AsyncResult<()> {
609*bb4ee6a4SAndroid Build Coastguard Worker         self.run_until(std::future::pending())
610*bb4ee6a4SAndroid Build Coastguard Worker     }
611*bb4ee6a4SAndroid Build Coastguard Worker 
612*bb4ee6a4SAndroid Build Coastguard Worker     /// Drive all futures spawned in this executor until `f` completes. This method will block the
613*bb4ee6a4SAndroid Build Coastguard Worker     /// current thread only until `f` is complete and there may still be unfinished futures in the
614*bb4ee6a4SAndroid Build Coastguard Worker     /// executor.
615*bb4ee6a4SAndroid Build Coastguard Worker     ///
616*bb4ee6a4SAndroid Build Coastguard Worker     /// # Panics
617*bb4ee6a4SAndroid Build Coastguard Worker     ///
618*bb4ee6a4SAndroid Build Coastguard Worker     /// Once this method has been called on a thread, from then onwards it may only be called on
619*bb4ee6a4SAndroid Build Coastguard Worker     /// that thread. Attempting to call it from another thread will panic.
620*bb4ee6a4SAndroid Build Coastguard Worker     ///
621*bb4ee6a4SAndroid Build Coastguard Worker     /// # Examples
622*bb4ee6a4SAndroid Build Coastguard Worker     ///
623*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
624*bb4ee6a4SAndroid Build Coastguard Worker     /// # use cros_async::AsyncResult;
625*bb4ee6a4SAndroid Build Coastguard Worker     /// # fn example_run_until() -> AsyncResult<()> {
626*bb4ee6a4SAndroid Build Coastguard Worker     ///       use cros_async::Executor;
627*bb4ee6a4SAndroid Build Coastguard Worker     ///
628*bb4ee6a4SAndroid Build Coastguard Worker     ///       let ex = Executor::new()?;
629*bb4ee6a4SAndroid Build Coastguard Worker     ///
630*bb4ee6a4SAndroid Build Coastguard Worker     ///       let task = ex.spawn_local(async { 7 + 13 });
631*bb4ee6a4SAndroid Build Coastguard Worker     ///
632*bb4ee6a4SAndroid Build Coastguard Worker     ///       let result = ex.run_until(task)?;
633*bb4ee6a4SAndroid Build Coastguard Worker     ///       assert_eq!(result, 20);
634*bb4ee6a4SAndroid Build Coastguard Worker     /// #     Ok(())
635*bb4ee6a4SAndroid Build Coastguard Worker     /// # }
636*bb4ee6a4SAndroid Build Coastguard Worker     ///
637*bb4ee6a4SAndroid Build Coastguard Worker     /// # example_run_until().unwrap();
638*bb4ee6a4SAndroid Build Coastguard Worker     /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>639*bb4ee6a4SAndroid Build Coastguard Worker     pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
640*bb4ee6a4SAndroid Build Coastguard Worker         match self {
641*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
642*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Fd(ex) => ex.run_until(f),
643*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(any(target_os = "android", target_os = "linux"))]
644*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Uring(ex) => ex.run_until(f),
645*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
646*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Handle(ex) => ex.run_until(f),
647*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(windows)]
648*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Overlapped(ex) => ex.run_until(f),
649*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
650*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Tokio(ex) => ex.run_until(f),
651*bb4ee6a4SAndroid Build Coastguard Worker         }
652*bb4ee6a4SAndroid Build Coastguard Worker     }
653*bb4ee6a4SAndroid Build Coastguard Worker }
654*bb4ee6a4SAndroid Build Coastguard Worker 
655*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))]
656*bb4ee6a4SAndroid Build Coastguard Worker impl AsRawDescriptors for Executor {
as_raw_descriptors(&self) -> Vec<RawDescriptor>657*bb4ee6a4SAndroid Build Coastguard Worker     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
658*bb4ee6a4SAndroid Build Coastguard Worker         match self {
659*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Fd(ex) => ex.as_raw_descriptors(),
660*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Uring(ex) => ex.as_raw_descriptors(),
661*bb4ee6a4SAndroid Build Coastguard Worker             #[cfg(feature = "tokio")]
662*bb4ee6a4SAndroid Build Coastguard Worker             Executor::Tokio(ex) => ex.as_raw_descriptors(),
663*bb4ee6a4SAndroid Build Coastguard Worker         }
664*bb4ee6a4SAndroid Build Coastguard Worker     }
665*bb4ee6a4SAndroid Build Coastguard Worker }
666