1 // Copyright 2024 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::future::Future; 6 use std::pin::Pin; 7 use std::sync::Arc; 8 9 #[cfg(any(target_os = "android", target_os = "linux"))] 10 use base::warn; 11 #[cfg(any(target_os = "android", target_os = "linux"))] 12 use base::AsRawDescriptors; 13 #[cfg(any(target_os = "android", target_os = "linux"))] 14 use base::RawDescriptor; 15 use once_cell::sync::OnceCell; 16 use serde::Deserialize; 17 use serde_keyvalue::argh::FromArgValue; 18 use serde_keyvalue::ErrorKind; 19 use serde_keyvalue::KeyValueDeserializer; 20 21 use crate::common_executor; 22 use crate::common_executor::RawExecutor; 23 #[cfg(any(target_os = "android", target_os = "linux"))] 24 use crate::sys::linux; 25 #[cfg(windows)] 26 use crate::sys::windows; 27 use crate::sys::ExecutorKindSys; 28 use crate::AsyncResult; 29 use crate::IntoAsync; 30 use crate::IoSource; 31 32 cfg_if::cfg_if! { 33 if #[cfg(feature = "tokio")] { 34 use crate::tokio_executor::TokioExecutor; 35 use crate::tokio_executor::TokioTaskHandle; 36 } 37 } 38 39 #[derive(Clone, Copy, Debug, PartialEq, Eq)] 40 pub enum ExecutorKind { 41 SysVariants(ExecutorKindSys), 42 #[cfg(feature = "tokio")] 43 Tokio, 44 } 45 46 impl From<ExecutorKindSys> for ExecutorKind { from(e: ExecutorKindSys) -> ExecutorKind47 fn from(e: ExecutorKindSys) -> ExecutorKind { 48 ExecutorKind::SysVariants(e) 49 } 50 } 51 52 /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`. 53 /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and 54 /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value. 55 static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new(); 56 57 impl Default for ExecutorKind { default() -> Self58 fn default() -> Self { 59 #[cfg(any(target_os = "android", target_os = "linux"))] 60 let default_fn = || ExecutorKindSys::Fd.into(); 61 #[cfg(windows)] 62 let default_fn = || ExecutorKindSys::Handle.into(); 63 *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn) 64 } 65 } 66 67 /// The error type for [`Executor::set_default_executor_kind()`]. 68 #[derive(thiserror::Error, Debug)] 69 pub enum SetDefaultExecutorKindError { 70 /// The default executor kind is set more than once. 71 #[error("The default executor kind is already set to {0:?}")] 72 SetMoreThanOnce(ExecutorKind), 73 74 #[cfg(any(target_os = "android", target_os = "linux"))] 75 /// io_uring is unavailable. The reason might be the lack of the kernel support, 76 /// but is not limited to that. 77 #[error("io_uring is unavailable: {0}")] 78 UringUnavailable(linux::uring_executor::Error), 79 } 80 81 impl FromArgValue for ExecutorKind { from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String>82 fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> { 83 // `from_arg_value` returns a `String` as error, but our deserializer API defines its own 84 // error type. Perform parsing from a closure so we can easily map returned errors. 85 let builder = move || { 86 let mut des = KeyValueDeserializer::from(value); 87 88 let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) { 89 #[cfg(any(target_os = "android", target_os = "linux"))] 90 ("epoll", None) => ExecutorKindSys::Fd.into(), 91 #[cfg(any(target_os = "android", target_os = "linux"))] 92 ("uring", None) => ExecutorKindSys::Uring.into(), 93 #[cfg(windows)] 94 ("handle", None) => ExecutorKindSys::Handle.into(), 95 #[cfg(windows)] 96 ("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(), 97 #[cfg(windows)] 98 ("overlapped", Some(',')) => { 99 if des.parse_identifier()? != "concurrency" { 100 let kind = ErrorKind::SerdeError("expected `concurrency`".to_string()); 101 return Err(des.error_here(kind)); 102 } 103 if des.next_char() != Some('=') { 104 return Err(des.error_here(ErrorKind::ExpectedEqual)); 105 } 106 let concurrency = des.parse_number()?; 107 ExecutorKindSys::Overlapped { 108 concurrency: Some(concurrency), 109 } 110 .into() 111 } 112 #[cfg(feature = "tokio")] 113 ("tokio", None) => ExecutorKind::Tokio, 114 (_identifier, _next) => { 115 let kind = ErrorKind::SerdeError("unexpected kind".to_string()); 116 return Err(des.error_here(kind)); 117 } 118 }; 119 des.finish()?; 120 Ok(kind) 121 }; 122 123 builder().map_err(|e| e.to_string()) 124 } 125 } 126 127 impl serde::Serialize for ExecutorKind { serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer,128 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 129 where 130 S: serde::Serializer, 131 { 132 match self { 133 ExecutorKind::SysVariants(sv) => sv.serialize(serializer), 134 #[cfg(feature = "tokio")] 135 ExecutorKind::Tokio => "tokio".serialize(serializer), 136 } 137 } 138 } 139 140 impl<'de> Deserialize<'de> for ExecutorKind { deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> where D: serde::Deserializer<'de>,141 fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> 142 where 143 D: serde::Deserializer<'de>, 144 { 145 let string = String::deserialize(deserializer)?; 146 ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom) 147 } 148 } 149 150 /// Reference to a task managed by the executor. 151 /// 152 /// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to 153 /// continue running the background. 154 /// 155 /// `await`ing the `TaskHandle` waits for the task to finish and yields its result. 156 pub enum TaskHandle<R> { 157 #[cfg(any(target_os = "android", target_os = "linux"))] 158 Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>), 159 #[cfg(any(target_os = "android", target_os = "linux"))] 160 Uring(common_executor::RawTaskHandle<linux::UringReactor, R>), 161 #[cfg(windows)] 162 Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>), 163 #[cfg(feature = "tokio")] 164 Tokio(TokioTaskHandle<R>), 165 } 166 167 impl<R: Send + 'static> TaskHandle<R> { detach(self)168 pub fn detach(self) { 169 match self { 170 #[cfg(any(target_os = "android", target_os = "linux"))] 171 TaskHandle::Fd(f) => f.detach(), 172 #[cfg(any(target_os = "android", target_os = "linux"))] 173 TaskHandle::Uring(u) => u.detach(), 174 #[cfg(windows)] 175 TaskHandle::Handle(h) => h.detach(), 176 #[cfg(feature = "tokio")] 177 TaskHandle::Tokio(t) => t.detach(), 178 } 179 } 180 181 // Cancel the task and wait for it to stop. Returns the result of the task if it was already 182 // finished. cancel(self) -> Option<R>183 pub async fn cancel(self) -> Option<R> { 184 match self { 185 #[cfg(any(target_os = "android", target_os = "linux"))] 186 TaskHandle::Fd(f) => f.cancel().await, 187 #[cfg(any(target_os = "android", target_os = "linux"))] 188 TaskHandle::Uring(u) => u.cancel().await, 189 #[cfg(windows)] 190 TaskHandle::Handle(h) => h.cancel().await, 191 #[cfg(feature = "tokio")] 192 TaskHandle::Tokio(t) => t.cancel().await, 193 } 194 } 195 } 196 197 impl<R: 'static> Future for TaskHandle<R> { 198 type Output = R; 199 poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>200 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> { 201 match self.get_mut() { 202 #[cfg(any(target_os = "android", target_os = "linux"))] 203 TaskHandle::Fd(f) => Pin::new(f).poll(cx), 204 #[cfg(any(target_os = "android", target_os = "linux"))] 205 TaskHandle::Uring(u) => Pin::new(u).poll(cx), 206 #[cfg(windows)] 207 TaskHandle::Handle(h) => Pin::new(h).poll(cx), 208 #[cfg(feature = "tokio")] 209 TaskHandle::Tokio(t) => Pin::new(t).poll(cx), 210 } 211 } 212 } 213 214 pub(crate) trait ExecutorTrait { async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>215 fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>; 216 spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static217 fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> 218 where 219 F: Future + Send + 'static, 220 F::Output: Send + 'static; 221 spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static222 fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> 223 where 224 F: FnOnce() -> R + Send + 'static, 225 R: Send + 'static; 226 spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static227 fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> 228 where 229 F: Future + 'static, 230 F::Output: 'static; 231 run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>232 fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>; 233 } 234 235 /// An executor for scheduling tasks that poll futures to completion. 236 /// 237 /// All asynchronous operations must run within an executor, which is capable of spawning futures as 238 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations. 239 /// 240 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only 241 /// create a new reference, not a new executor. 242 /// 243 /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be 244 /// represented on the POSIX side as an enum, rather than a trait. This leads to some code & 245 /// interface duplication, but as far as we understand that is unavoidable. 246 /// 247 /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75> 248 /// for further details. 249 /// 250 /// # Examples 251 /// 252 /// Concurrently wait for multiple files to become readable/writable and then read/write the data. 253 /// 254 /// ``` 255 /// use std::cmp::min; 256 /// use std::error::Error; 257 /// use std::fs::{File, OpenOptions}; 258 /// 259 /// use cros_async::{AsyncResult, Executor, IoSource, complete3}; 260 /// const CHUNK_SIZE: usize = 32; 261 /// 262 /// // Write all bytes from `data` to `f`. 263 /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> { 264 /// while data.len() > 0 { 265 /// let (count, mut buf) = f.write_from_vec(None, data).await?; 266 /// 267 /// data = buf.split_off(count); 268 /// } 269 /// 270 /// Ok(()) 271 /// } 272 /// 273 /// // Transfer `len` bytes of data from `from` to `to`. 274 /// async fn transfer_data( 275 /// from: IoSource<File>, 276 /// to: IoSource<File>, 277 /// len: usize, 278 /// ) -> AsyncResult<usize> { 279 /// let mut rem = len; 280 /// 281 /// while rem > 0 { 282 /// let buf = vec![0u8; min(rem, CHUNK_SIZE)]; 283 /// let (count, mut data) = from.read_to_vec(None, buf).await?; 284 /// 285 /// if count == 0 { 286 /// // End of file. Return the number of bytes transferred. 287 /// return Ok(len - rem); 288 /// } 289 /// 290 /// data.truncate(count); 291 /// write_file(&to, data).await?; 292 /// 293 /// rem = rem.saturating_sub(count); 294 /// } 295 /// 296 /// Ok(len) 297 /// } 298 /// 299 /// #[cfg(any(target_os = "android", target_os = "linux"))] 300 /// # fn do_it() -> Result<(), Box<dyn Error>> { 301 /// let ex = Executor::new()?; 302 /// 303 /// let (rx, tx) = base::linux::pipe()?; 304 /// let zero = File::open("/dev/zero")?; 305 /// let zero_bytes = CHUNK_SIZE * 7; 306 /// let zero_to_pipe = transfer_data( 307 /// ex.async_from(zero)?, 308 /// ex.async_from(tx.try_clone()?)?, 309 /// zero_bytes, 310 /// ); 311 /// 312 /// let rand = File::open("/dev/urandom")?; 313 /// let rand_bytes = CHUNK_SIZE * 19; 314 /// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes); 315 /// 316 /// let null = OpenOptions::new().write(true).open("/dev/null")?; 317 /// let null_bytes = zero_bytes + rand_bytes; 318 /// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes); 319 /// 320 /// ex.run_until(complete3( 321 /// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) }, 322 /// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) }, 323 /// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) }, 324 /// ))?; 325 /// 326 /// # Ok(()) 327 /// # } 328 /// #[cfg(any(target_os = "android", target_os = "linux"))] 329 /// # do_it().unwrap(); 330 /// ``` 331 #[derive(Clone)] 332 pub enum Executor { 333 #[cfg(any(target_os = "android", target_os = "linux"))] 334 Fd(Arc<RawExecutor<linux::EpollReactor>>), 335 #[cfg(any(target_os = "android", target_os = "linux"))] 336 Uring(Arc<RawExecutor<linux::UringReactor>>), 337 #[cfg(windows)] 338 Handle(Arc<RawExecutor<windows::HandleReactor>>), 339 #[cfg(windows)] 340 Overlapped(Arc<RawExecutor<windows::HandleReactor>>), 341 #[cfg(feature = "tokio")] 342 Tokio(TokioExecutor), 343 } 344 345 impl Executor { 346 /// Create a new `Executor`. new() -> AsyncResult<Self>347 pub fn new() -> AsyncResult<Self> { 348 Executor::with_executor_kind(ExecutorKind::default()) 349 } 350 351 /// Create a new `Executor` of the given `ExecutorKind`. with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>352 pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> { 353 Ok(match kind { 354 #[cfg(any(target_os = "android", target_os = "linux"))] 355 ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?), 356 #[cfg(any(target_os = "android", target_os = "linux"))] 357 ExecutorKind::SysVariants(ExecutorKindSys::Uring) => { 358 Executor::Uring(RawExecutor::new()?) 359 } 360 #[cfg(windows)] 361 ExecutorKind::SysVariants(ExecutorKindSys::Handle) => { 362 Executor::Handle(RawExecutor::new()?) 363 } 364 #[cfg(windows)] 365 ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => { 366 let reactor = match concurrency { 367 Some(concurrency) => windows::HandleReactor::new_with(concurrency)?, 368 None => windows::HandleReactor::new()?, 369 }; 370 Executor::Overlapped(RawExecutor::new_with(reactor)?) 371 } 372 #[cfg(feature = "tokio")] 373 ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?), 374 }) 375 } 376 377 /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once. set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>378 pub fn set_default_executor_kind( 379 executor_kind: ExecutorKind, 380 ) -> Result<(), SetDefaultExecutorKindError> { 381 #[cfg(any(target_os = "android", target_os = "linux"))] 382 if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) { 383 linux::uring_executor::check_uring_availability() 384 .map_err(SetDefaultExecutorKindError::UringUnavailable)?; 385 if !crate::is_uring_stable() { 386 warn!( 387 "Enabling io_uring executor on the kernel version where io_uring is unstable" 388 ); 389 } 390 } 391 DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_| 392 // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set. 393 SetDefaultExecutorKindError::SetMoreThanOnce( 394 *DEFAULT_EXECUTOR_KIND 395 .get() 396 .expect("Failed to get DEFAULT_EXECUTOR_KIND"), 397 )) 398 } 399 400 /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned 401 /// `IoSource` to directly start async operations without needing a separate reference to the 402 /// executor. async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>403 pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> { 404 match self { 405 #[cfg(any(target_os = "android", target_os = "linux"))] 406 Executor::Fd(ex) => ex.async_from(f), 407 #[cfg(any(target_os = "android", target_os = "linux"))] 408 Executor::Uring(ex) => ex.async_from(f), 409 #[cfg(windows)] 410 Executor::Handle(ex) => ex.async_from(f), 411 #[cfg(windows)] 412 Executor::Overlapped(ex) => ex.async_from(f), 413 #[cfg(feature = "tokio")] 414 Executor::Tokio(ex) => ex.async_from(f), 415 } 416 } 417 418 /// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the 419 /// If the executor is not overlapped, then Handle source is returned. 420 /// returned `IoSource` to directly start async operations without needing a separate reference 421 /// to the executor. 422 #[cfg(windows)] async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>423 pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> { 424 match self { 425 Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new( 426 f, ex, false, 427 )?)), 428 _ => self.async_from(f), 429 } 430 } 431 432 /// Spawn a new future for this executor to run to completion. Callers may use the returned 433 /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel 434 /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the 435 /// future associated with it use `TaskHandle::detach`. 436 /// 437 /// # Examples 438 /// 439 /// ``` 440 /// # use cros_async::AsyncResult; 441 /// # fn example_spawn() -> AsyncResult<()> { 442 /// # use std::thread; 443 /// 444 /// # use cros_async::Executor; 445 /// use futures::executor::block_on; 446 /// 447 /// # let ex = Executor::new()?; 448 /// 449 /// # // Spawn a thread that runs the executor. 450 /// # let ex2 = ex.clone(); 451 /// # thread::spawn(move || ex2.run()); 452 /// 453 /// let task = ex.spawn(async { 7 + 13 }); 454 /// 455 /// let result = block_on(task); 456 /// assert_eq!(result, 20); 457 /// # Ok(()) 458 /// # } 459 /// 460 /// # example_spawn().unwrap(); 461 /// ``` spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,462 pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> 463 where 464 F: Future + Send + 'static, 465 F::Output: Send + 'static, 466 { 467 match self { 468 #[cfg(any(target_os = "android", target_os = "linux"))] 469 Executor::Fd(ex) => ex.spawn(f), 470 #[cfg(any(target_os = "android", target_os = "linux"))] 471 Executor::Uring(ex) => ex.spawn(f), 472 #[cfg(windows)] 473 Executor::Handle(ex) => ex.spawn(f), 474 #[cfg(windows)] 475 Executor::Overlapped(ex) => ex.spawn(f), 476 #[cfg(feature = "tokio")] 477 Executor::Tokio(ex) => ex.spawn(f), 478 } 479 } 480 481 /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without 482 /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same 483 /// thread where `run()` or `run_until()` is called. 484 /// 485 /// # Panics 486 /// 487 /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was 488 /// added by calling `spawn_local` from a different thread. 489 /// 490 /// # Examples 491 /// 492 /// ``` 493 /// # use cros_async::AsyncResult; 494 /// # fn example_spawn_local() -> AsyncResult<()> { 495 /// # use cros_async::Executor; 496 /// 497 /// # let ex = Executor::new()?; 498 /// 499 /// let task = ex.spawn_local(async { 7 + 13 }); 500 /// 501 /// let result = ex.run_until(task)?; 502 /// assert_eq!(result, 20); 503 /// Ok(()) 504 /// # } 505 /// 506 /// # example_spawn_local().unwrap(); 507 /// ``` spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,508 pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> 509 where 510 F: Future + 'static, 511 F::Output: 'static, 512 { 513 match self { 514 #[cfg(any(target_os = "android", target_os = "linux"))] 515 Executor::Fd(ex) => ex.spawn_local(f), 516 #[cfg(any(target_os = "android", target_os = "linux"))] 517 Executor::Uring(ex) => ex.spawn_local(f), 518 #[cfg(windows)] 519 Executor::Handle(ex) => ex.spawn_local(f), 520 #[cfg(windows)] 521 Executor::Overlapped(ex) => ex.spawn_local(f), 522 #[cfg(feature = "tokio")] 523 Executor::Tokio(ex) => ex.spawn_local(f), 524 } 525 } 526 527 /// Run the provided closure on a dedicated thread where blocking is allowed. 528 /// 529 /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping 530 /// the returned `TaskHandle` may not cancel the operation if it was already started on a 531 /// worker thread. 532 /// 533 /// # Panics 534 /// 535 /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not 536 /// already completed. 537 /// 538 /// # Examples 539 /// 540 /// ```edition2018 541 /// # use cros_async::Executor; 542 /// 543 /// # async fn do_it(ex: &Executor) { 544 /// let res = ex.spawn_blocking(move || { 545 /// // Do some CPU-intensive or blocking work here. 546 /// 547 /// 42 548 /// }).await; 549 /// 550 /// assert_eq!(res, 42); 551 /// # } 552 /// 553 /// # let ex = Executor::new().unwrap(); 554 /// # ex.run_until(do_it(&ex)).unwrap(); 555 /// ``` spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,556 pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> 557 where 558 F: FnOnce() -> R + Send + 'static, 559 R: Send + 'static, 560 { 561 match self { 562 #[cfg(any(target_os = "android", target_os = "linux"))] 563 Executor::Fd(ex) => ex.spawn_blocking(f), 564 #[cfg(any(target_os = "android", target_os = "linux"))] 565 Executor::Uring(ex) => ex.spawn_blocking(f), 566 #[cfg(windows)] 567 Executor::Handle(ex) => ex.spawn_blocking(f), 568 #[cfg(windows)] 569 Executor::Overlapped(ex) => ex.spawn_blocking(f), 570 #[cfg(feature = "tokio")] 571 Executor::Tokio(ex) => ex.spawn_blocking(f), 572 } 573 } 574 575 /// Run the executor indefinitely, driving all spawned futures to completion. This method will 576 /// block the current thread and only return in the case of an error. 577 /// 578 /// # Panics 579 /// 580 /// Once this method has been called on a thread, it may only be called on that thread from that 581 /// point on. Attempting to call it from another thread will panic. 582 /// 583 /// # Examples 584 /// 585 /// ``` 586 /// # use cros_async::AsyncResult; 587 /// # fn example_run() -> AsyncResult<()> { 588 /// use std::thread; 589 /// 590 /// use cros_async::Executor; 591 /// use futures::executor::block_on; 592 /// 593 /// let ex = Executor::new()?; 594 /// 595 /// // Spawn a thread that runs the executor. 596 /// let ex2 = ex.clone(); 597 /// thread::spawn(move || ex2.run()); 598 /// 599 /// let task = ex.spawn(async { 7 + 13 }); 600 /// 601 /// let result = block_on(task); 602 /// assert_eq!(result, 20); 603 /// # Ok(()) 604 /// # } 605 /// 606 /// # example_run().unwrap(); 607 /// ``` run(&self) -> AsyncResult<()>608 pub fn run(&self) -> AsyncResult<()> { 609 self.run_until(std::future::pending()) 610 } 611 612 /// Drive all futures spawned in this executor until `f` completes. This method will block the 613 /// current thread only until `f` is complete and there may still be unfinished futures in the 614 /// executor. 615 /// 616 /// # Panics 617 /// 618 /// Once this method has been called on a thread, from then onwards it may only be called on 619 /// that thread. Attempting to call it from another thread will panic. 620 /// 621 /// # Examples 622 /// 623 /// ``` 624 /// # use cros_async::AsyncResult; 625 /// # fn example_run_until() -> AsyncResult<()> { 626 /// use cros_async::Executor; 627 /// 628 /// let ex = Executor::new()?; 629 /// 630 /// let task = ex.spawn_local(async { 7 + 13 }); 631 /// 632 /// let result = ex.run_until(task)?; 633 /// assert_eq!(result, 20); 634 /// # Ok(()) 635 /// # } 636 /// 637 /// # example_run_until().unwrap(); 638 /// ``` run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>639 pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> { 640 match self { 641 #[cfg(any(target_os = "android", target_os = "linux"))] 642 Executor::Fd(ex) => ex.run_until(f), 643 #[cfg(any(target_os = "android", target_os = "linux"))] 644 Executor::Uring(ex) => ex.run_until(f), 645 #[cfg(windows)] 646 Executor::Handle(ex) => ex.run_until(f), 647 #[cfg(windows)] 648 Executor::Overlapped(ex) => ex.run_until(f), 649 #[cfg(feature = "tokio")] 650 Executor::Tokio(ex) => ex.run_until(f), 651 } 652 } 653 } 654 655 #[cfg(any(target_os = "android", target_os = "linux"))] 656 impl AsRawDescriptors for Executor { as_raw_descriptors(&self) -> Vec<RawDescriptor>657 fn as_raw_descriptors(&self) -> Vec<RawDescriptor> { 658 match self { 659 Executor::Fd(ex) => ex.as_raw_descriptors(), 660 Executor::Uring(ex) => ex.as_raw_descriptors(), 661 #[cfg(feature = "tokio")] 662 Executor::Tokio(ex) => ex.as_raw_descriptors(), 663 } 664 } 665 } 666