1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2024 The ChromiumOS Authors 2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be 3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file. 4*bb4ee6a4SAndroid Build Coastguard Worker 5*bb4ee6a4SAndroid Build Coastguard Worker use std::future::Future; 6*bb4ee6a4SAndroid Build Coastguard Worker use std::pin::Pin; 7*bb4ee6a4SAndroid Build Coastguard Worker use std::sync::Arc; 8*bb4ee6a4SAndroid Build Coastguard Worker 9*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 10*bb4ee6a4SAndroid Build Coastguard Worker use base::warn; 11*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 12*bb4ee6a4SAndroid Build Coastguard Worker use base::AsRawDescriptors; 13*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 14*bb4ee6a4SAndroid Build Coastguard Worker use base::RawDescriptor; 15*bb4ee6a4SAndroid Build Coastguard Worker use once_cell::sync::OnceCell; 16*bb4ee6a4SAndroid Build Coastguard Worker use serde::Deserialize; 17*bb4ee6a4SAndroid Build Coastguard Worker use serde_keyvalue::argh::FromArgValue; 18*bb4ee6a4SAndroid Build Coastguard Worker use serde_keyvalue::ErrorKind; 19*bb4ee6a4SAndroid Build Coastguard Worker use serde_keyvalue::KeyValueDeserializer; 20*bb4ee6a4SAndroid Build Coastguard Worker 21*bb4ee6a4SAndroid Build Coastguard Worker use crate::common_executor; 22*bb4ee6a4SAndroid Build Coastguard Worker use crate::common_executor::RawExecutor; 23*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 24*bb4ee6a4SAndroid Build Coastguard Worker use crate::sys::linux; 25*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 26*bb4ee6a4SAndroid Build Coastguard Worker use crate::sys::windows; 27*bb4ee6a4SAndroid Build Coastguard Worker use crate::sys::ExecutorKindSys; 28*bb4ee6a4SAndroid Build Coastguard Worker use crate::AsyncResult; 29*bb4ee6a4SAndroid Build Coastguard Worker use crate::IntoAsync; 30*bb4ee6a4SAndroid Build Coastguard Worker use crate::IoSource; 31*bb4ee6a4SAndroid Build Coastguard Worker 32*bb4ee6a4SAndroid Build Coastguard Worker cfg_if::cfg_if! { 33*bb4ee6a4SAndroid Build Coastguard Worker if #[cfg(feature = "tokio")] { 34*bb4ee6a4SAndroid Build Coastguard Worker use crate::tokio_executor::TokioExecutor; 35*bb4ee6a4SAndroid Build Coastguard Worker use crate::tokio_executor::TokioTaskHandle; 36*bb4ee6a4SAndroid Build Coastguard Worker } 37*bb4ee6a4SAndroid Build Coastguard Worker } 38*bb4ee6a4SAndroid Build Coastguard Worker 39*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Clone, Copy, Debug, PartialEq, Eq)] 40*bb4ee6a4SAndroid Build Coastguard Worker pub enum ExecutorKind { 41*bb4ee6a4SAndroid Build Coastguard Worker SysVariants(ExecutorKindSys), 42*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 43*bb4ee6a4SAndroid Build Coastguard Worker Tokio, 44*bb4ee6a4SAndroid Build Coastguard Worker } 45*bb4ee6a4SAndroid Build Coastguard Worker 46*bb4ee6a4SAndroid Build Coastguard Worker impl From<ExecutorKindSys> for ExecutorKind { from(e: ExecutorKindSys) -> ExecutorKind47*bb4ee6a4SAndroid Build Coastguard Worker fn from(e: ExecutorKindSys) -> ExecutorKind { 48*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::SysVariants(e) 49*bb4ee6a4SAndroid Build Coastguard Worker } 50*bb4ee6a4SAndroid Build Coastguard Worker } 51*bb4ee6a4SAndroid Build Coastguard Worker 52*bb4ee6a4SAndroid Build Coastguard Worker /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`. 53*bb4ee6a4SAndroid Build Coastguard Worker /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and 54*bb4ee6a4SAndroid Build Coastguard Worker /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value. 55*bb4ee6a4SAndroid Build Coastguard Worker static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new(); 56*bb4ee6a4SAndroid Build Coastguard Worker 57*bb4ee6a4SAndroid Build Coastguard Worker impl Default for ExecutorKind { default() -> Self58*bb4ee6a4SAndroid Build Coastguard Worker fn default() -> Self { 59*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 60*bb4ee6a4SAndroid Build Coastguard Worker let default_fn = || ExecutorKindSys::Fd.into(); 61*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 62*bb4ee6a4SAndroid Build Coastguard Worker let default_fn = || ExecutorKindSys::Handle.into(); 63*bb4ee6a4SAndroid Build Coastguard Worker *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn) 64*bb4ee6a4SAndroid Build Coastguard Worker } 65*bb4ee6a4SAndroid Build Coastguard Worker } 66*bb4ee6a4SAndroid Build Coastguard Worker 67*bb4ee6a4SAndroid Build Coastguard Worker /// The error type for [`Executor::set_default_executor_kind()`]. 68*bb4ee6a4SAndroid Build Coastguard Worker #[derive(thiserror::Error, Debug)] 69*bb4ee6a4SAndroid Build Coastguard Worker pub enum SetDefaultExecutorKindError { 70*bb4ee6a4SAndroid Build Coastguard Worker /// The default executor kind is set more than once. 71*bb4ee6a4SAndroid Build Coastguard Worker #[error("The default executor kind is already set to {0:?}")] 72*bb4ee6a4SAndroid Build Coastguard Worker SetMoreThanOnce(ExecutorKind), 73*bb4ee6a4SAndroid Build Coastguard Worker 74*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 75*bb4ee6a4SAndroid Build Coastguard Worker /// io_uring is unavailable. The reason might be the lack of the kernel support, 76*bb4ee6a4SAndroid Build Coastguard Worker /// but is not limited to that. 77*bb4ee6a4SAndroid Build Coastguard Worker #[error("io_uring is unavailable: {0}")] 78*bb4ee6a4SAndroid Build Coastguard Worker UringUnavailable(linux::uring_executor::Error), 79*bb4ee6a4SAndroid Build Coastguard Worker } 80*bb4ee6a4SAndroid Build Coastguard Worker 81*bb4ee6a4SAndroid Build Coastguard Worker impl FromArgValue for ExecutorKind { from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String>82*bb4ee6a4SAndroid Build Coastguard Worker fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> { 83*bb4ee6a4SAndroid Build Coastguard Worker // `from_arg_value` returns a `String` as error, but our deserializer API defines its own 84*bb4ee6a4SAndroid Build Coastguard Worker // error type. Perform parsing from a closure so we can easily map returned errors. 85*bb4ee6a4SAndroid Build Coastguard Worker let builder = move || { 86*bb4ee6a4SAndroid Build Coastguard Worker let mut des = KeyValueDeserializer::from(value); 87*bb4ee6a4SAndroid Build Coastguard Worker 88*bb4ee6a4SAndroid Build Coastguard Worker let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) { 89*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 90*bb4ee6a4SAndroid Build Coastguard Worker ("epoll", None) => ExecutorKindSys::Fd.into(), 91*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 92*bb4ee6a4SAndroid Build Coastguard Worker ("uring", None) => ExecutorKindSys::Uring.into(), 93*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 94*bb4ee6a4SAndroid Build Coastguard Worker ("handle", None) => ExecutorKindSys::Handle.into(), 95*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 96*bb4ee6a4SAndroid Build Coastguard Worker ("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(), 97*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 98*bb4ee6a4SAndroid Build Coastguard Worker ("overlapped", Some(',')) => { 99*bb4ee6a4SAndroid Build Coastguard Worker if des.parse_identifier()? != "concurrency" { 100*bb4ee6a4SAndroid Build Coastguard Worker let kind = ErrorKind::SerdeError("expected `concurrency`".to_string()); 101*bb4ee6a4SAndroid Build Coastguard Worker return Err(des.error_here(kind)); 102*bb4ee6a4SAndroid Build Coastguard Worker } 103*bb4ee6a4SAndroid Build Coastguard Worker if des.next_char() != Some('=') { 104*bb4ee6a4SAndroid Build Coastguard Worker return Err(des.error_here(ErrorKind::ExpectedEqual)); 105*bb4ee6a4SAndroid Build Coastguard Worker } 106*bb4ee6a4SAndroid Build Coastguard Worker let concurrency = des.parse_number()?; 107*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKindSys::Overlapped { 108*bb4ee6a4SAndroid Build Coastguard Worker concurrency: Some(concurrency), 109*bb4ee6a4SAndroid Build Coastguard Worker } 110*bb4ee6a4SAndroid Build Coastguard Worker .into() 111*bb4ee6a4SAndroid Build Coastguard Worker } 112*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 113*bb4ee6a4SAndroid Build Coastguard Worker ("tokio", None) => ExecutorKind::Tokio, 114*bb4ee6a4SAndroid Build Coastguard Worker (_identifier, _next) => { 115*bb4ee6a4SAndroid Build Coastguard Worker let kind = ErrorKind::SerdeError("unexpected kind".to_string()); 116*bb4ee6a4SAndroid Build Coastguard Worker return Err(des.error_here(kind)); 117*bb4ee6a4SAndroid Build Coastguard Worker } 118*bb4ee6a4SAndroid Build Coastguard Worker }; 119*bb4ee6a4SAndroid Build Coastguard Worker des.finish()?; 120*bb4ee6a4SAndroid Build Coastguard Worker Ok(kind) 121*bb4ee6a4SAndroid Build Coastguard Worker }; 122*bb4ee6a4SAndroid Build Coastguard Worker 123*bb4ee6a4SAndroid Build Coastguard Worker builder().map_err(|e| e.to_string()) 124*bb4ee6a4SAndroid Build Coastguard Worker } 125*bb4ee6a4SAndroid Build Coastguard Worker } 126*bb4ee6a4SAndroid Build Coastguard Worker 127*bb4ee6a4SAndroid Build Coastguard Worker impl serde::Serialize for ExecutorKind { serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer,128*bb4ee6a4SAndroid Build Coastguard Worker fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 129*bb4ee6a4SAndroid Build Coastguard Worker where 130*bb4ee6a4SAndroid Build Coastguard Worker S: serde::Serializer, 131*bb4ee6a4SAndroid Build Coastguard Worker { 132*bb4ee6a4SAndroid Build Coastguard Worker match self { 133*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::SysVariants(sv) => sv.serialize(serializer), 134*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 135*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::Tokio => "tokio".serialize(serializer), 136*bb4ee6a4SAndroid Build Coastguard Worker } 137*bb4ee6a4SAndroid Build Coastguard Worker } 138*bb4ee6a4SAndroid Build Coastguard Worker } 139*bb4ee6a4SAndroid Build Coastguard Worker 140*bb4ee6a4SAndroid Build Coastguard Worker impl<'de> Deserialize<'de> for ExecutorKind { deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> where D: serde::Deserializer<'de>,141*bb4ee6a4SAndroid Build Coastguard Worker fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> 142*bb4ee6a4SAndroid Build Coastguard Worker where 143*bb4ee6a4SAndroid Build Coastguard Worker D: serde::Deserializer<'de>, 144*bb4ee6a4SAndroid Build Coastguard Worker { 145*bb4ee6a4SAndroid Build Coastguard Worker let string = String::deserialize(deserializer)?; 146*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom) 147*bb4ee6a4SAndroid Build Coastguard Worker } 148*bb4ee6a4SAndroid Build Coastguard Worker } 149*bb4ee6a4SAndroid Build Coastguard Worker 150*bb4ee6a4SAndroid Build Coastguard Worker /// Reference to a task managed by the executor. 151*bb4ee6a4SAndroid Build Coastguard Worker /// 152*bb4ee6a4SAndroid Build Coastguard Worker /// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to 153*bb4ee6a4SAndroid Build Coastguard Worker /// continue running the background. 154*bb4ee6a4SAndroid Build Coastguard Worker /// 155*bb4ee6a4SAndroid Build Coastguard Worker /// `await`ing the `TaskHandle` waits for the task to finish and yields its result. 156*bb4ee6a4SAndroid Build Coastguard Worker pub enum TaskHandle<R> { 157*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 158*bb4ee6a4SAndroid Build Coastguard Worker Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>), 159*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 160*bb4ee6a4SAndroid Build Coastguard Worker Uring(common_executor::RawTaskHandle<linux::UringReactor, R>), 161*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 162*bb4ee6a4SAndroid Build Coastguard Worker Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>), 163*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 164*bb4ee6a4SAndroid Build Coastguard Worker Tokio(TokioTaskHandle<R>), 165*bb4ee6a4SAndroid Build Coastguard Worker } 166*bb4ee6a4SAndroid Build Coastguard Worker 167*bb4ee6a4SAndroid Build Coastguard Worker impl<R: Send + 'static> TaskHandle<R> { detach(self)168*bb4ee6a4SAndroid Build Coastguard Worker pub fn detach(self) { 169*bb4ee6a4SAndroid Build Coastguard Worker match self { 170*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 171*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Fd(f) => f.detach(), 172*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 173*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Uring(u) => u.detach(), 174*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 175*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Handle(h) => h.detach(), 176*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 177*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Tokio(t) => t.detach(), 178*bb4ee6a4SAndroid Build Coastguard Worker } 179*bb4ee6a4SAndroid Build Coastguard Worker } 180*bb4ee6a4SAndroid Build Coastguard Worker 181*bb4ee6a4SAndroid Build Coastguard Worker // Cancel the task and wait for it to stop. Returns the result of the task if it was already 182*bb4ee6a4SAndroid Build Coastguard Worker // finished. cancel(self) -> Option<R>183*bb4ee6a4SAndroid Build Coastguard Worker pub async fn cancel(self) -> Option<R> { 184*bb4ee6a4SAndroid Build Coastguard Worker match self { 185*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 186*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Fd(f) => f.cancel().await, 187*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 188*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Uring(u) => u.cancel().await, 189*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 190*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Handle(h) => h.cancel().await, 191*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 192*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Tokio(t) => t.cancel().await, 193*bb4ee6a4SAndroid Build Coastguard Worker } 194*bb4ee6a4SAndroid Build Coastguard Worker } 195*bb4ee6a4SAndroid Build Coastguard Worker } 196*bb4ee6a4SAndroid Build Coastguard Worker 197*bb4ee6a4SAndroid Build Coastguard Worker impl<R: 'static> Future for TaskHandle<R> { 198*bb4ee6a4SAndroid Build Coastguard Worker type Output = R; 199*bb4ee6a4SAndroid Build Coastguard Worker poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>200*bb4ee6a4SAndroid Build Coastguard Worker fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> { 201*bb4ee6a4SAndroid Build Coastguard Worker match self.get_mut() { 202*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 203*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Fd(f) => Pin::new(f).poll(cx), 204*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 205*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Uring(u) => Pin::new(u).poll(cx), 206*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 207*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Handle(h) => Pin::new(h).poll(cx), 208*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 209*bb4ee6a4SAndroid Build Coastguard Worker TaskHandle::Tokio(t) => Pin::new(t).poll(cx), 210*bb4ee6a4SAndroid Build Coastguard Worker } 211*bb4ee6a4SAndroid Build Coastguard Worker } 212*bb4ee6a4SAndroid Build Coastguard Worker } 213*bb4ee6a4SAndroid Build Coastguard Worker 214*bb4ee6a4SAndroid Build Coastguard Worker pub(crate) trait ExecutorTrait { async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>215*bb4ee6a4SAndroid Build Coastguard Worker fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>; 216*bb4ee6a4SAndroid Build Coastguard Worker spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static217*bb4ee6a4SAndroid Build Coastguard Worker fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> 218*bb4ee6a4SAndroid Build Coastguard Worker where 219*bb4ee6a4SAndroid Build Coastguard Worker F: Future + Send + 'static, 220*bb4ee6a4SAndroid Build Coastguard Worker F::Output: Send + 'static; 221*bb4ee6a4SAndroid Build Coastguard Worker spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static222*bb4ee6a4SAndroid Build Coastguard Worker fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> 223*bb4ee6a4SAndroid Build Coastguard Worker where 224*bb4ee6a4SAndroid Build Coastguard Worker F: FnOnce() -> R + Send + 'static, 225*bb4ee6a4SAndroid Build Coastguard Worker R: Send + 'static; 226*bb4ee6a4SAndroid Build Coastguard Worker spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static227*bb4ee6a4SAndroid Build Coastguard Worker fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> 228*bb4ee6a4SAndroid Build Coastguard Worker where 229*bb4ee6a4SAndroid Build Coastguard Worker F: Future + 'static, 230*bb4ee6a4SAndroid Build Coastguard Worker F::Output: 'static; 231*bb4ee6a4SAndroid Build Coastguard Worker run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>232*bb4ee6a4SAndroid Build Coastguard Worker fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>; 233*bb4ee6a4SAndroid Build Coastguard Worker } 234*bb4ee6a4SAndroid Build Coastguard Worker 235*bb4ee6a4SAndroid Build Coastguard Worker /// An executor for scheduling tasks that poll futures to completion. 236*bb4ee6a4SAndroid Build Coastguard Worker /// 237*bb4ee6a4SAndroid Build Coastguard Worker /// All asynchronous operations must run within an executor, which is capable of spawning futures as 238*bb4ee6a4SAndroid Build Coastguard Worker /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations. 239*bb4ee6a4SAndroid Build Coastguard Worker /// 240*bb4ee6a4SAndroid Build Coastguard Worker /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only 241*bb4ee6a4SAndroid Build Coastguard Worker /// create a new reference, not a new executor. 242*bb4ee6a4SAndroid Build Coastguard Worker /// 243*bb4ee6a4SAndroid Build Coastguard Worker /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be 244*bb4ee6a4SAndroid Build Coastguard Worker /// represented on the POSIX side as an enum, rather than a trait. This leads to some code & 245*bb4ee6a4SAndroid Build Coastguard Worker /// interface duplication, but as far as we understand that is unavoidable. 246*bb4ee6a4SAndroid Build Coastguard Worker /// 247*bb4ee6a4SAndroid Build Coastguard Worker /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75> 248*bb4ee6a4SAndroid Build Coastguard Worker /// for further details. 249*bb4ee6a4SAndroid Build Coastguard Worker /// 250*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples 251*bb4ee6a4SAndroid Build Coastguard Worker /// 252*bb4ee6a4SAndroid Build Coastguard Worker /// Concurrently wait for multiple files to become readable/writable and then read/write the data. 253*bb4ee6a4SAndroid Build Coastguard Worker /// 254*bb4ee6a4SAndroid Build Coastguard Worker /// ``` 255*bb4ee6a4SAndroid Build Coastguard Worker /// use std::cmp::min; 256*bb4ee6a4SAndroid Build Coastguard Worker /// use std::error::Error; 257*bb4ee6a4SAndroid Build Coastguard Worker /// use std::fs::{File, OpenOptions}; 258*bb4ee6a4SAndroid Build Coastguard Worker /// 259*bb4ee6a4SAndroid Build Coastguard Worker /// use cros_async::{AsyncResult, Executor, IoSource, complete3}; 260*bb4ee6a4SAndroid Build Coastguard Worker /// const CHUNK_SIZE: usize = 32; 261*bb4ee6a4SAndroid Build Coastguard Worker /// 262*bb4ee6a4SAndroid Build Coastguard Worker /// // Write all bytes from `data` to `f`. 263*bb4ee6a4SAndroid Build Coastguard Worker /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> { 264*bb4ee6a4SAndroid Build Coastguard Worker /// while data.len() > 0 { 265*bb4ee6a4SAndroid Build Coastguard Worker /// let (count, mut buf) = f.write_from_vec(None, data).await?; 266*bb4ee6a4SAndroid Build Coastguard Worker /// 267*bb4ee6a4SAndroid Build Coastguard Worker /// data = buf.split_off(count); 268*bb4ee6a4SAndroid Build Coastguard Worker /// } 269*bb4ee6a4SAndroid Build Coastguard Worker /// 270*bb4ee6a4SAndroid Build Coastguard Worker /// Ok(()) 271*bb4ee6a4SAndroid Build Coastguard Worker /// } 272*bb4ee6a4SAndroid Build Coastguard Worker /// 273*bb4ee6a4SAndroid Build Coastguard Worker /// // Transfer `len` bytes of data from `from` to `to`. 274*bb4ee6a4SAndroid Build Coastguard Worker /// async fn transfer_data( 275*bb4ee6a4SAndroid Build Coastguard Worker /// from: IoSource<File>, 276*bb4ee6a4SAndroid Build Coastguard Worker /// to: IoSource<File>, 277*bb4ee6a4SAndroid Build Coastguard Worker /// len: usize, 278*bb4ee6a4SAndroid Build Coastguard Worker /// ) -> AsyncResult<usize> { 279*bb4ee6a4SAndroid Build Coastguard Worker /// let mut rem = len; 280*bb4ee6a4SAndroid Build Coastguard Worker /// 281*bb4ee6a4SAndroid Build Coastguard Worker /// while rem > 0 { 282*bb4ee6a4SAndroid Build Coastguard Worker /// let buf = vec![0u8; min(rem, CHUNK_SIZE)]; 283*bb4ee6a4SAndroid Build Coastguard Worker /// let (count, mut data) = from.read_to_vec(None, buf).await?; 284*bb4ee6a4SAndroid Build Coastguard Worker /// 285*bb4ee6a4SAndroid Build Coastguard Worker /// if count == 0 { 286*bb4ee6a4SAndroid Build Coastguard Worker /// // End of file. Return the number of bytes transferred. 287*bb4ee6a4SAndroid Build Coastguard Worker /// return Ok(len - rem); 288*bb4ee6a4SAndroid Build Coastguard Worker /// } 289*bb4ee6a4SAndroid Build Coastguard Worker /// 290*bb4ee6a4SAndroid Build Coastguard Worker /// data.truncate(count); 291*bb4ee6a4SAndroid Build Coastguard Worker /// write_file(&to, data).await?; 292*bb4ee6a4SAndroid Build Coastguard Worker /// 293*bb4ee6a4SAndroid Build Coastguard Worker /// rem = rem.saturating_sub(count); 294*bb4ee6a4SAndroid Build Coastguard Worker /// } 295*bb4ee6a4SAndroid Build Coastguard Worker /// 296*bb4ee6a4SAndroid Build Coastguard Worker /// Ok(len) 297*bb4ee6a4SAndroid Build Coastguard Worker /// } 298*bb4ee6a4SAndroid Build Coastguard Worker /// 299*bb4ee6a4SAndroid Build Coastguard Worker /// #[cfg(any(target_os = "android", target_os = "linux"))] 300*bb4ee6a4SAndroid Build Coastguard Worker /// # fn do_it() -> Result<(), Box<dyn Error>> { 301*bb4ee6a4SAndroid Build Coastguard Worker /// let ex = Executor::new()?; 302*bb4ee6a4SAndroid Build Coastguard Worker /// 303*bb4ee6a4SAndroid Build Coastguard Worker /// let (rx, tx) = base::linux::pipe()?; 304*bb4ee6a4SAndroid Build Coastguard Worker /// let zero = File::open("/dev/zero")?; 305*bb4ee6a4SAndroid Build Coastguard Worker /// let zero_bytes = CHUNK_SIZE * 7; 306*bb4ee6a4SAndroid Build Coastguard Worker /// let zero_to_pipe = transfer_data( 307*bb4ee6a4SAndroid Build Coastguard Worker /// ex.async_from(zero)?, 308*bb4ee6a4SAndroid Build Coastguard Worker /// ex.async_from(tx.try_clone()?)?, 309*bb4ee6a4SAndroid Build Coastguard Worker /// zero_bytes, 310*bb4ee6a4SAndroid Build Coastguard Worker /// ); 311*bb4ee6a4SAndroid Build Coastguard Worker /// 312*bb4ee6a4SAndroid Build Coastguard Worker /// let rand = File::open("/dev/urandom")?; 313*bb4ee6a4SAndroid Build Coastguard Worker /// let rand_bytes = CHUNK_SIZE * 19; 314*bb4ee6a4SAndroid Build Coastguard Worker /// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes); 315*bb4ee6a4SAndroid Build Coastguard Worker /// 316*bb4ee6a4SAndroid Build Coastguard Worker /// let null = OpenOptions::new().write(true).open("/dev/null")?; 317*bb4ee6a4SAndroid Build Coastguard Worker /// let null_bytes = zero_bytes + rand_bytes; 318*bb4ee6a4SAndroid Build Coastguard Worker /// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes); 319*bb4ee6a4SAndroid Build Coastguard Worker /// 320*bb4ee6a4SAndroid Build Coastguard Worker /// ex.run_until(complete3( 321*bb4ee6a4SAndroid Build Coastguard Worker /// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) }, 322*bb4ee6a4SAndroid Build Coastguard Worker /// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) }, 323*bb4ee6a4SAndroid Build Coastguard Worker /// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) }, 324*bb4ee6a4SAndroid Build Coastguard Worker /// ))?; 325*bb4ee6a4SAndroid Build Coastguard Worker /// 326*bb4ee6a4SAndroid Build Coastguard Worker /// # Ok(()) 327*bb4ee6a4SAndroid Build Coastguard Worker /// # } 328*bb4ee6a4SAndroid Build Coastguard Worker /// #[cfg(any(target_os = "android", target_os = "linux"))] 329*bb4ee6a4SAndroid Build Coastguard Worker /// # do_it().unwrap(); 330*bb4ee6a4SAndroid Build Coastguard Worker /// ``` 331*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Clone)] 332*bb4ee6a4SAndroid Build Coastguard Worker pub enum Executor { 333*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 334*bb4ee6a4SAndroid Build Coastguard Worker Fd(Arc<RawExecutor<linux::EpollReactor>>), 335*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 336*bb4ee6a4SAndroid Build Coastguard Worker Uring(Arc<RawExecutor<linux::UringReactor>>), 337*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 338*bb4ee6a4SAndroid Build Coastguard Worker Handle(Arc<RawExecutor<windows::HandleReactor>>), 339*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 340*bb4ee6a4SAndroid Build Coastguard Worker Overlapped(Arc<RawExecutor<windows::HandleReactor>>), 341*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 342*bb4ee6a4SAndroid Build Coastguard Worker Tokio(TokioExecutor), 343*bb4ee6a4SAndroid Build Coastguard Worker } 344*bb4ee6a4SAndroid Build Coastguard Worker 345*bb4ee6a4SAndroid Build Coastguard Worker impl Executor { 346*bb4ee6a4SAndroid Build Coastguard Worker /// Create a new `Executor`. new() -> AsyncResult<Self>347*bb4ee6a4SAndroid Build Coastguard Worker pub fn new() -> AsyncResult<Self> { 348*bb4ee6a4SAndroid Build Coastguard Worker Executor::with_executor_kind(ExecutorKind::default()) 349*bb4ee6a4SAndroid Build Coastguard Worker } 350*bb4ee6a4SAndroid Build Coastguard Worker 351*bb4ee6a4SAndroid Build Coastguard Worker /// Create a new `Executor` of the given `ExecutorKind`. with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>352*bb4ee6a4SAndroid Build Coastguard Worker pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> { 353*bb4ee6a4SAndroid Build Coastguard Worker Ok(match kind { 354*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 355*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?), 356*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 357*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::SysVariants(ExecutorKindSys::Uring) => { 358*bb4ee6a4SAndroid Build Coastguard Worker Executor::Uring(RawExecutor::new()?) 359*bb4ee6a4SAndroid Build Coastguard Worker } 360*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 361*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::SysVariants(ExecutorKindSys::Handle) => { 362*bb4ee6a4SAndroid Build Coastguard Worker Executor::Handle(RawExecutor::new()?) 363*bb4ee6a4SAndroid Build Coastguard Worker } 364*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 365*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => { 366*bb4ee6a4SAndroid Build Coastguard Worker let reactor = match concurrency { 367*bb4ee6a4SAndroid Build Coastguard Worker Some(concurrency) => windows::HandleReactor::new_with(concurrency)?, 368*bb4ee6a4SAndroid Build Coastguard Worker None => windows::HandleReactor::new()?, 369*bb4ee6a4SAndroid Build Coastguard Worker }; 370*bb4ee6a4SAndroid Build Coastguard Worker Executor::Overlapped(RawExecutor::new_with(reactor)?) 371*bb4ee6a4SAndroid Build Coastguard Worker } 372*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 373*bb4ee6a4SAndroid Build Coastguard Worker ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?), 374*bb4ee6a4SAndroid Build Coastguard Worker }) 375*bb4ee6a4SAndroid Build Coastguard Worker } 376*bb4ee6a4SAndroid Build Coastguard Worker 377*bb4ee6a4SAndroid Build Coastguard Worker /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once. set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>378*bb4ee6a4SAndroid Build Coastguard Worker pub fn set_default_executor_kind( 379*bb4ee6a4SAndroid Build Coastguard Worker executor_kind: ExecutorKind, 380*bb4ee6a4SAndroid Build Coastguard Worker ) -> Result<(), SetDefaultExecutorKindError> { 381*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 382*bb4ee6a4SAndroid Build Coastguard Worker if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) { 383*bb4ee6a4SAndroid Build Coastguard Worker linux::uring_executor::check_uring_availability() 384*bb4ee6a4SAndroid Build Coastguard Worker .map_err(SetDefaultExecutorKindError::UringUnavailable)?; 385*bb4ee6a4SAndroid Build Coastguard Worker if !crate::is_uring_stable() { 386*bb4ee6a4SAndroid Build Coastguard Worker warn!( 387*bb4ee6a4SAndroid Build Coastguard Worker "Enabling io_uring executor on the kernel version where io_uring is unstable" 388*bb4ee6a4SAndroid Build Coastguard Worker ); 389*bb4ee6a4SAndroid Build Coastguard Worker } 390*bb4ee6a4SAndroid Build Coastguard Worker } 391*bb4ee6a4SAndroid Build Coastguard Worker DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_| 392*bb4ee6a4SAndroid Build Coastguard Worker // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set. 393*bb4ee6a4SAndroid Build Coastguard Worker SetDefaultExecutorKindError::SetMoreThanOnce( 394*bb4ee6a4SAndroid Build Coastguard Worker *DEFAULT_EXECUTOR_KIND 395*bb4ee6a4SAndroid Build Coastguard Worker .get() 396*bb4ee6a4SAndroid Build Coastguard Worker .expect("Failed to get DEFAULT_EXECUTOR_KIND"), 397*bb4ee6a4SAndroid Build Coastguard Worker )) 398*bb4ee6a4SAndroid Build Coastguard Worker } 399*bb4ee6a4SAndroid Build Coastguard Worker 400*bb4ee6a4SAndroid Build Coastguard Worker /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned 401*bb4ee6a4SAndroid Build Coastguard Worker /// `IoSource` to directly start async operations without needing a separate reference to the 402*bb4ee6a4SAndroid Build Coastguard Worker /// executor. async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>403*bb4ee6a4SAndroid Build Coastguard Worker pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> { 404*bb4ee6a4SAndroid Build Coastguard Worker match self { 405*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 406*bb4ee6a4SAndroid Build Coastguard Worker Executor::Fd(ex) => ex.async_from(f), 407*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 408*bb4ee6a4SAndroid Build Coastguard Worker Executor::Uring(ex) => ex.async_from(f), 409*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 410*bb4ee6a4SAndroid Build Coastguard Worker Executor::Handle(ex) => ex.async_from(f), 411*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 412*bb4ee6a4SAndroid Build Coastguard Worker Executor::Overlapped(ex) => ex.async_from(f), 413*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 414*bb4ee6a4SAndroid Build Coastguard Worker Executor::Tokio(ex) => ex.async_from(f), 415*bb4ee6a4SAndroid Build Coastguard Worker } 416*bb4ee6a4SAndroid Build Coastguard Worker } 417*bb4ee6a4SAndroid Build Coastguard Worker 418*bb4ee6a4SAndroid Build Coastguard Worker /// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the 419*bb4ee6a4SAndroid Build Coastguard Worker /// If the executor is not overlapped, then Handle source is returned. 420*bb4ee6a4SAndroid Build Coastguard Worker /// returned `IoSource` to directly start async operations without needing a separate reference 421*bb4ee6a4SAndroid Build Coastguard Worker /// to the executor. 422*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>423*bb4ee6a4SAndroid Build Coastguard Worker pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> { 424*bb4ee6a4SAndroid Build Coastguard Worker match self { 425*bb4ee6a4SAndroid Build Coastguard Worker Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new( 426*bb4ee6a4SAndroid Build Coastguard Worker f, ex, false, 427*bb4ee6a4SAndroid Build Coastguard Worker )?)), 428*bb4ee6a4SAndroid Build Coastguard Worker _ => self.async_from(f), 429*bb4ee6a4SAndroid Build Coastguard Worker } 430*bb4ee6a4SAndroid Build Coastguard Worker } 431*bb4ee6a4SAndroid Build Coastguard Worker 432*bb4ee6a4SAndroid Build Coastguard Worker /// Spawn a new future for this executor to run to completion. Callers may use the returned 433*bb4ee6a4SAndroid Build Coastguard Worker /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel 434*bb4ee6a4SAndroid Build Coastguard Worker /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the 435*bb4ee6a4SAndroid Build Coastguard Worker /// future associated with it use `TaskHandle::detach`. 436*bb4ee6a4SAndroid Build Coastguard Worker /// 437*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples 438*bb4ee6a4SAndroid Build Coastguard Worker /// 439*bb4ee6a4SAndroid Build Coastguard Worker /// ``` 440*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::AsyncResult; 441*bb4ee6a4SAndroid Build Coastguard Worker /// # fn example_spawn() -> AsyncResult<()> { 442*bb4ee6a4SAndroid Build Coastguard Worker /// # use std::thread; 443*bb4ee6a4SAndroid Build Coastguard Worker /// 444*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::Executor; 445*bb4ee6a4SAndroid Build Coastguard Worker /// use futures::executor::block_on; 446*bb4ee6a4SAndroid Build Coastguard Worker /// 447*bb4ee6a4SAndroid Build Coastguard Worker /// # let ex = Executor::new()?; 448*bb4ee6a4SAndroid Build Coastguard Worker /// 449*bb4ee6a4SAndroid Build Coastguard Worker /// # // Spawn a thread that runs the executor. 450*bb4ee6a4SAndroid Build Coastguard Worker /// # let ex2 = ex.clone(); 451*bb4ee6a4SAndroid Build Coastguard Worker /// # thread::spawn(move || ex2.run()); 452*bb4ee6a4SAndroid Build Coastguard Worker /// 453*bb4ee6a4SAndroid Build Coastguard Worker /// let task = ex.spawn(async { 7 + 13 }); 454*bb4ee6a4SAndroid Build Coastguard Worker /// 455*bb4ee6a4SAndroid Build Coastguard Worker /// let result = block_on(task); 456*bb4ee6a4SAndroid Build Coastguard Worker /// assert_eq!(result, 20); 457*bb4ee6a4SAndroid Build Coastguard Worker /// # Ok(()) 458*bb4ee6a4SAndroid Build Coastguard Worker /// # } 459*bb4ee6a4SAndroid Build Coastguard Worker /// 460*bb4ee6a4SAndroid Build Coastguard Worker /// # example_spawn().unwrap(); 461*bb4ee6a4SAndroid Build Coastguard Worker /// ``` spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,462*bb4ee6a4SAndroid Build Coastguard Worker pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> 463*bb4ee6a4SAndroid Build Coastguard Worker where 464*bb4ee6a4SAndroid Build Coastguard Worker F: Future + Send + 'static, 465*bb4ee6a4SAndroid Build Coastguard Worker F::Output: Send + 'static, 466*bb4ee6a4SAndroid Build Coastguard Worker { 467*bb4ee6a4SAndroid Build Coastguard Worker match self { 468*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 469*bb4ee6a4SAndroid Build Coastguard Worker Executor::Fd(ex) => ex.spawn(f), 470*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 471*bb4ee6a4SAndroid Build Coastguard Worker Executor::Uring(ex) => ex.spawn(f), 472*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 473*bb4ee6a4SAndroid Build Coastguard Worker Executor::Handle(ex) => ex.spawn(f), 474*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 475*bb4ee6a4SAndroid Build Coastguard Worker Executor::Overlapped(ex) => ex.spawn(f), 476*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 477*bb4ee6a4SAndroid Build Coastguard Worker Executor::Tokio(ex) => ex.spawn(f), 478*bb4ee6a4SAndroid Build Coastguard Worker } 479*bb4ee6a4SAndroid Build Coastguard Worker } 480*bb4ee6a4SAndroid Build Coastguard Worker 481*bb4ee6a4SAndroid Build Coastguard Worker /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without 482*bb4ee6a4SAndroid Build Coastguard Worker /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same 483*bb4ee6a4SAndroid Build Coastguard Worker /// thread where `run()` or `run_until()` is called. 484*bb4ee6a4SAndroid Build Coastguard Worker /// 485*bb4ee6a4SAndroid Build Coastguard Worker /// # Panics 486*bb4ee6a4SAndroid Build Coastguard Worker /// 487*bb4ee6a4SAndroid Build Coastguard Worker /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was 488*bb4ee6a4SAndroid Build Coastguard Worker /// added by calling `spawn_local` from a different thread. 489*bb4ee6a4SAndroid Build Coastguard Worker /// 490*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples 491*bb4ee6a4SAndroid Build Coastguard Worker /// 492*bb4ee6a4SAndroid Build Coastguard Worker /// ``` 493*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::AsyncResult; 494*bb4ee6a4SAndroid Build Coastguard Worker /// # fn example_spawn_local() -> AsyncResult<()> { 495*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::Executor; 496*bb4ee6a4SAndroid Build Coastguard Worker /// 497*bb4ee6a4SAndroid Build Coastguard Worker /// # let ex = Executor::new()?; 498*bb4ee6a4SAndroid Build Coastguard Worker /// 499*bb4ee6a4SAndroid Build Coastguard Worker /// let task = ex.spawn_local(async { 7 + 13 }); 500*bb4ee6a4SAndroid Build Coastguard Worker /// 501*bb4ee6a4SAndroid Build Coastguard Worker /// let result = ex.run_until(task)?; 502*bb4ee6a4SAndroid Build Coastguard Worker /// assert_eq!(result, 20); 503*bb4ee6a4SAndroid Build Coastguard Worker /// Ok(()) 504*bb4ee6a4SAndroid Build Coastguard Worker /// # } 505*bb4ee6a4SAndroid Build Coastguard Worker /// 506*bb4ee6a4SAndroid Build Coastguard Worker /// # example_spawn_local().unwrap(); 507*bb4ee6a4SAndroid Build Coastguard Worker /// ``` spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,508*bb4ee6a4SAndroid Build Coastguard Worker pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> 509*bb4ee6a4SAndroid Build Coastguard Worker where 510*bb4ee6a4SAndroid Build Coastguard Worker F: Future + 'static, 511*bb4ee6a4SAndroid Build Coastguard Worker F::Output: 'static, 512*bb4ee6a4SAndroid Build Coastguard Worker { 513*bb4ee6a4SAndroid Build Coastguard Worker match self { 514*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 515*bb4ee6a4SAndroid Build Coastguard Worker Executor::Fd(ex) => ex.spawn_local(f), 516*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 517*bb4ee6a4SAndroid Build Coastguard Worker Executor::Uring(ex) => ex.spawn_local(f), 518*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 519*bb4ee6a4SAndroid Build Coastguard Worker Executor::Handle(ex) => ex.spawn_local(f), 520*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 521*bb4ee6a4SAndroid Build Coastguard Worker Executor::Overlapped(ex) => ex.spawn_local(f), 522*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 523*bb4ee6a4SAndroid Build Coastguard Worker Executor::Tokio(ex) => ex.spawn_local(f), 524*bb4ee6a4SAndroid Build Coastguard Worker } 525*bb4ee6a4SAndroid Build Coastguard Worker } 526*bb4ee6a4SAndroid Build Coastguard Worker 527*bb4ee6a4SAndroid Build Coastguard Worker /// Run the provided closure on a dedicated thread where blocking is allowed. 528*bb4ee6a4SAndroid Build Coastguard Worker /// 529*bb4ee6a4SAndroid Build Coastguard Worker /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping 530*bb4ee6a4SAndroid Build Coastguard Worker /// the returned `TaskHandle` may not cancel the operation if it was already started on a 531*bb4ee6a4SAndroid Build Coastguard Worker /// worker thread. 532*bb4ee6a4SAndroid Build Coastguard Worker /// 533*bb4ee6a4SAndroid Build Coastguard Worker /// # Panics 534*bb4ee6a4SAndroid Build Coastguard Worker /// 535*bb4ee6a4SAndroid Build Coastguard Worker /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not 536*bb4ee6a4SAndroid Build Coastguard Worker /// already completed. 537*bb4ee6a4SAndroid Build Coastguard Worker /// 538*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples 539*bb4ee6a4SAndroid Build Coastguard Worker /// 540*bb4ee6a4SAndroid Build Coastguard Worker /// ```edition2018 541*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::Executor; 542*bb4ee6a4SAndroid Build Coastguard Worker /// 543*bb4ee6a4SAndroid Build Coastguard Worker /// # async fn do_it(ex: &Executor) { 544*bb4ee6a4SAndroid Build Coastguard Worker /// let res = ex.spawn_blocking(move || { 545*bb4ee6a4SAndroid Build Coastguard Worker /// // Do some CPU-intensive or blocking work here. 546*bb4ee6a4SAndroid Build Coastguard Worker /// 547*bb4ee6a4SAndroid Build Coastguard Worker /// 42 548*bb4ee6a4SAndroid Build Coastguard Worker /// }).await; 549*bb4ee6a4SAndroid Build Coastguard Worker /// 550*bb4ee6a4SAndroid Build Coastguard Worker /// assert_eq!(res, 42); 551*bb4ee6a4SAndroid Build Coastguard Worker /// # } 552*bb4ee6a4SAndroid Build Coastguard Worker /// 553*bb4ee6a4SAndroid Build Coastguard Worker /// # let ex = Executor::new().unwrap(); 554*bb4ee6a4SAndroid Build Coastguard Worker /// # ex.run_until(do_it(&ex)).unwrap(); 555*bb4ee6a4SAndroid Build Coastguard Worker /// ``` spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,556*bb4ee6a4SAndroid Build Coastguard Worker pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> 557*bb4ee6a4SAndroid Build Coastguard Worker where 558*bb4ee6a4SAndroid Build Coastguard Worker F: FnOnce() -> R + Send + 'static, 559*bb4ee6a4SAndroid Build Coastguard Worker R: Send + 'static, 560*bb4ee6a4SAndroid Build Coastguard Worker { 561*bb4ee6a4SAndroid Build Coastguard Worker match self { 562*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 563*bb4ee6a4SAndroid Build Coastguard Worker Executor::Fd(ex) => ex.spawn_blocking(f), 564*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 565*bb4ee6a4SAndroid Build Coastguard Worker Executor::Uring(ex) => ex.spawn_blocking(f), 566*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 567*bb4ee6a4SAndroid Build Coastguard Worker Executor::Handle(ex) => ex.spawn_blocking(f), 568*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 569*bb4ee6a4SAndroid Build Coastguard Worker Executor::Overlapped(ex) => ex.spawn_blocking(f), 570*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 571*bb4ee6a4SAndroid Build Coastguard Worker Executor::Tokio(ex) => ex.spawn_blocking(f), 572*bb4ee6a4SAndroid Build Coastguard Worker } 573*bb4ee6a4SAndroid Build Coastguard Worker } 574*bb4ee6a4SAndroid Build Coastguard Worker 575*bb4ee6a4SAndroid Build Coastguard Worker /// Run the executor indefinitely, driving all spawned futures to completion. This method will 576*bb4ee6a4SAndroid Build Coastguard Worker /// block the current thread and only return in the case of an error. 577*bb4ee6a4SAndroid Build Coastguard Worker /// 578*bb4ee6a4SAndroid Build Coastguard Worker /// # Panics 579*bb4ee6a4SAndroid Build Coastguard Worker /// 580*bb4ee6a4SAndroid Build Coastguard Worker /// Once this method has been called on a thread, it may only be called on that thread from that 581*bb4ee6a4SAndroid Build Coastguard Worker /// point on. Attempting to call it from another thread will panic. 582*bb4ee6a4SAndroid Build Coastguard Worker /// 583*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples 584*bb4ee6a4SAndroid Build Coastguard Worker /// 585*bb4ee6a4SAndroid Build Coastguard Worker /// ``` 586*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::AsyncResult; 587*bb4ee6a4SAndroid Build Coastguard Worker /// # fn example_run() -> AsyncResult<()> { 588*bb4ee6a4SAndroid Build Coastguard Worker /// use std::thread; 589*bb4ee6a4SAndroid Build Coastguard Worker /// 590*bb4ee6a4SAndroid Build Coastguard Worker /// use cros_async::Executor; 591*bb4ee6a4SAndroid Build Coastguard Worker /// use futures::executor::block_on; 592*bb4ee6a4SAndroid Build Coastguard Worker /// 593*bb4ee6a4SAndroid Build Coastguard Worker /// let ex = Executor::new()?; 594*bb4ee6a4SAndroid Build Coastguard Worker /// 595*bb4ee6a4SAndroid Build Coastguard Worker /// // Spawn a thread that runs the executor. 596*bb4ee6a4SAndroid Build Coastguard Worker /// let ex2 = ex.clone(); 597*bb4ee6a4SAndroid Build Coastguard Worker /// thread::spawn(move || ex2.run()); 598*bb4ee6a4SAndroid Build Coastguard Worker /// 599*bb4ee6a4SAndroid Build Coastguard Worker /// let task = ex.spawn(async { 7 + 13 }); 600*bb4ee6a4SAndroid Build Coastguard Worker /// 601*bb4ee6a4SAndroid Build Coastguard Worker /// let result = block_on(task); 602*bb4ee6a4SAndroid Build Coastguard Worker /// assert_eq!(result, 20); 603*bb4ee6a4SAndroid Build Coastguard Worker /// # Ok(()) 604*bb4ee6a4SAndroid Build Coastguard Worker /// # } 605*bb4ee6a4SAndroid Build Coastguard Worker /// 606*bb4ee6a4SAndroid Build Coastguard Worker /// # example_run().unwrap(); 607*bb4ee6a4SAndroid Build Coastguard Worker /// ``` run(&self) -> AsyncResult<()>608*bb4ee6a4SAndroid Build Coastguard Worker pub fn run(&self) -> AsyncResult<()> { 609*bb4ee6a4SAndroid Build Coastguard Worker self.run_until(std::future::pending()) 610*bb4ee6a4SAndroid Build Coastguard Worker } 611*bb4ee6a4SAndroid Build Coastguard Worker 612*bb4ee6a4SAndroid Build Coastguard Worker /// Drive all futures spawned in this executor until `f` completes. This method will block the 613*bb4ee6a4SAndroid Build Coastguard Worker /// current thread only until `f` is complete and there may still be unfinished futures in the 614*bb4ee6a4SAndroid Build Coastguard Worker /// executor. 615*bb4ee6a4SAndroid Build Coastguard Worker /// 616*bb4ee6a4SAndroid Build Coastguard Worker /// # Panics 617*bb4ee6a4SAndroid Build Coastguard Worker /// 618*bb4ee6a4SAndroid Build Coastguard Worker /// Once this method has been called on a thread, from then onwards it may only be called on 619*bb4ee6a4SAndroid Build Coastguard Worker /// that thread. Attempting to call it from another thread will panic. 620*bb4ee6a4SAndroid Build Coastguard Worker /// 621*bb4ee6a4SAndroid Build Coastguard Worker /// # Examples 622*bb4ee6a4SAndroid Build Coastguard Worker /// 623*bb4ee6a4SAndroid Build Coastguard Worker /// ``` 624*bb4ee6a4SAndroid Build Coastguard Worker /// # use cros_async::AsyncResult; 625*bb4ee6a4SAndroid Build Coastguard Worker /// # fn example_run_until() -> AsyncResult<()> { 626*bb4ee6a4SAndroid Build Coastguard Worker /// use cros_async::Executor; 627*bb4ee6a4SAndroid Build Coastguard Worker /// 628*bb4ee6a4SAndroid Build Coastguard Worker /// let ex = Executor::new()?; 629*bb4ee6a4SAndroid Build Coastguard Worker /// 630*bb4ee6a4SAndroid Build Coastguard Worker /// let task = ex.spawn_local(async { 7 + 13 }); 631*bb4ee6a4SAndroid Build Coastguard Worker /// 632*bb4ee6a4SAndroid Build Coastguard Worker /// let result = ex.run_until(task)?; 633*bb4ee6a4SAndroid Build Coastguard Worker /// assert_eq!(result, 20); 634*bb4ee6a4SAndroid Build Coastguard Worker /// # Ok(()) 635*bb4ee6a4SAndroid Build Coastguard Worker /// # } 636*bb4ee6a4SAndroid Build Coastguard Worker /// 637*bb4ee6a4SAndroid Build Coastguard Worker /// # example_run_until().unwrap(); 638*bb4ee6a4SAndroid Build Coastguard Worker /// ``` run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>639*bb4ee6a4SAndroid Build Coastguard Worker pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> { 640*bb4ee6a4SAndroid Build Coastguard Worker match self { 641*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 642*bb4ee6a4SAndroid Build Coastguard Worker Executor::Fd(ex) => ex.run_until(f), 643*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 644*bb4ee6a4SAndroid Build Coastguard Worker Executor::Uring(ex) => ex.run_until(f), 645*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 646*bb4ee6a4SAndroid Build Coastguard Worker Executor::Handle(ex) => ex.run_until(f), 647*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(windows)] 648*bb4ee6a4SAndroid Build Coastguard Worker Executor::Overlapped(ex) => ex.run_until(f), 649*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 650*bb4ee6a4SAndroid Build Coastguard Worker Executor::Tokio(ex) => ex.run_until(f), 651*bb4ee6a4SAndroid Build Coastguard Worker } 652*bb4ee6a4SAndroid Build Coastguard Worker } 653*bb4ee6a4SAndroid Build Coastguard Worker } 654*bb4ee6a4SAndroid Build Coastguard Worker 655*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(any(target_os = "android", target_os = "linux"))] 656*bb4ee6a4SAndroid Build Coastguard Worker impl AsRawDescriptors for Executor { as_raw_descriptors(&self) -> Vec<RawDescriptor>657*bb4ee6a4SAndroid Build Coastguard Worker fn as_raw_descriptors(&self) -> Vec<RawDescriptor> { 658*bb4ee6a4SAndroid Build Coastguard Worker match self { 659*bb4ee6a4SAndroid Build Coastguard Worker Executor::Fd(ex) => ex.as_raw_descriptors(), 660*bb4ee6a4SAndroid Build Coastguard Worker Executor::Uring(ex) => ex.as_raw_descriptors(), 661*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(feature = "tokio")] 662*bb4ee6a4SAndroid Build Coastguard Worker Executor::Tokio(ex) => ex.as_raw_descriptors(), 663*bb4ee6a4SAndroid Build Coastguard Worker } 664*bb4ee6a4SAndroid Build Coastguard Worker } 665*bb4ee6a4SAndroid Build Coastguard Worker } 666