1 use core::fmt; 2 use core::future::Future; 3 use core::marker::PhantomData; 4 use core::mem; 5 use core::pin::Pin; 6 use core::ptr::NonNull; 7 use core::sync::atomic::Ordering; 8 use core::task::{Context, Poll}; 9 10 use crate::header::Header; 11 use crate::raw::Panic; 12 use crate::runnable::ScheduleInfo; 13 use crate::state::*; 14 15 /// A spawned task. 16 /// 17 /// A [`Task`] can be awaited to retrieve the output of its future. 18 /// 19 /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the 20 /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a 21 /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] 22 /// method. 23 /// 24 /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor 25 /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking 26 /// [`run()`][`super::Runnable::run()`]. 27 /// 28 /// # Examples 29 /// 30 /// ``` 31 /// use smol::{future, Executor}; 32 /// use std::thread; 33 /// 34 /// let ex = Executor::new(); 35 /// 36 /// // Spawn a future onto the executor. 37 /// let task = ex.spawn(async { 38 /// println!("Hello from a task!"); 39 /// 1 + 2 40 /// }); 41 /// 42 /// // Run an executor thread. 43 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 44 /// 45 /// // Wait for the task's output. 46 /// assert_eq!(future::block_on(task), 3); 47 /// ``` 48 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] 49 pub struct Task<T, M = ()> { 50 /// A raw task pointer. 51 pub(crate) ptr: NonNull<()>, 52 53 /// A marker capturing generic types `T` and `M`. 54 pub(crate) _marker: PhantomData<(T, M)>, 55 } 56 57 unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {} 58 unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {} 59 60 impl<T, M> Unpin for Task<T, M> {} 61 62 #[cfg(feature = "std")] 63 impl<T, M> std::panic::UnwindSafe for Task<T, M> {} 64 #[cfg(feature = "std")] 65 impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {} 66 67 impl<T, M> Task<T, M> { 68 /// Detaches the task to let it keep running in the background. 69 /// 70 /// # Examples 71 /// 72 /// ``` 73 /// use smol::{Executor, Timer}; 74 /// use std::time::Duration; 75 /// 76 /// let ex = Executor::new(); 77 /// 78 /// // Spawn a deamon future. 79 /// ex.spawn(async { 80 /// loop { 81 /// println!("I'm a daemon task looping forever."); 82 /// Timer::after(Duration::from_secs(1)).await; 83 /// } 84 /// }) 85 /// .detach(); 86 /// ``` detach(self)87 pub fn detach(self) { 88 let mut this = self; 89 let _out = this.set_detached(); 90 mem::forget(this); 91 } 92 93 /// Cancels the task and waits for it to stop running. 94 /// 95 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if 96 /// it didn't complete. 97 /// 98 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of 99 /// canceling because it also waits for the task to stop running. 100 /// 101 /// # Examples 102 /// 103 /// ``` 104 /// # if cfg!(miri) { return; } // Miri does not support epoll 105 /// use smol::{future, Executor, Timer}; 106 /// use std::thread; 107 /// use std::time::Duration; 108 /// 109 /// let ex = Executor::new(); 110 /// 111 /// // Spawn a deamon future. 112 /// let task = ex.spawn(async { 113 /// loop { 114 /// println!("Even though I'm in an infinite loop, you can still cancel me!"); 115 /// Timer::after(Duration::from_secs(1)).await; 116 /// } 117 /// }); 118 /// 119 /// // Run an executor thread. 120 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 121 /// 122 /// future::block_on(async { 123 /// Timer::after(Duration::from_secs(3)).await; 124 /// task.cancel().await; 125 /// }); 126 /// ``` cancel(self) -> Option<T>127 pub async fn cancel(self) -> Option<T> { 128 let mut this = self; 129 this.set_canceled(); 130 this.fallible().await 131 } 132 133 /// Converts this task into a [`FallibleTask`]. 134 /// 135 /// Like [`Task`], a fallible task will poll the task's output until it is 136 /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being 137 /// dropped without being run. Resolves to the task's output when completed, 138 /// or [`None`] if it didn't complete. 139 /// 140 /// # Examples 141 /// 142 /// ``` 143 /// use smol::{future, Executor}; 144 /// use std::thread; 145 /// 146 /// let ex = Executor::new(); 147 /// 148 /// // Spawn a future onto the executor. 149 /// let task = ex.spawn(async { 150 /// println!("Hello from a task!"); 151 /// 1 + 2 152 /// }) 153 /// .fallible(); 154 /// 155 /// // Run an executor thread. 156 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 157 /// 158 /// // Wait for the task's output. 159 /// assert_eq!(future::block_on(task), Some(3)); 160 /// ``` 161 /// 162 /// ``` 163 /// use smol::future; 164 /// 165 /// // Schedule function which drops the runnable without running it. 166 /// let schedule = move |runnable| drop(runnable); 167 /// 168 /// // Create a task with the future and the schedule function. 169 /// let (runnable, task) = async_task::spawn(async { 170 /// println!("Hello from a task!"); 171 /// 1 + 2 172 /// }, schedule); 173 /// runnable.schedule(); 174 /// 175 /// // Wait for the task's output. 176 /// assert_eq!(future::block_on(task.fallible()), None); 177 /// ``` fallible(self) -> FallibleTask<T, M>178 pub fn fallible(self) -> FallibleTask<T, M> { 179 FallibleTask { task: self } 180 } 181 182 /// Puts the task in canceled state. set_canceled(&mut self)183 fn set_canceled(&mut self) { 184 let ptr = self.ptr.as_ptr(); 185 let header = ptr as *const Header<M>; 186 187 unsafe { 188 let mut state = (*header).state.load(Ordering::Acquire); 189 190 loop { 191 // If the task has been completed or closed, it can't be canceled. 192 if state & (COMPLETED | CLOSED) != 0 { 193 break; 194 } 195 196 // If the task is not scheduled nor running, we'll need to schedule it. 197 let new = if state & (SCHEDULED | RUNNING) == 0 { 198 (state | SCHEDULED | CLOSED) + REFERENCE 199 } else { 200 state | CLOSED 201 }; 202 203 // Mark the task as closed. 204 match (*header).state.compare_exchange_weak( 205 state, 206 new, 207 Ordering::AcqRel, 208 Ordering::Acquire, 209 ) { 210 Ok(_) => { 211 // If the task is not scheduled nor running, schedule it one more time so 212 // that its future gets dropped by the executor. 213 if state & (SCHEDULED | RUNNING) == 0 { 214 ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); 215 } 216 217 // Notify the awaiter that the task has been closed. 218 if state & AWAITER != 0 { 219 (*header).notify(None); 220 } 221 222 break; 223 } 224 Err(s) => state = s, 225 } 226 } 227 } 228 } 229 230 /// Puts the task in detached state. set_detached(&mut self) -> Option<Result<T, Panic>>231 fn set_detached(&mut self) -> Option<Result<T, Panic>> { 232 let ptr = self.ptr.as_ptr(); 233 let header = ptr as *const Header<M>; 234 235 unsafe { 236 // A place where the output will be stored in case it needs to be dropped. 237 let mut output = None; 238 239 // Optimistically assume the `Task` is being detached just after creating the task. 240 // This is a common case so if the `Task` is datached, the overhead of it is only one 241 // compare-exchange operation. 242 if let Err(mut state) = (*header).state.compare_exchange_weak( 243 SCHEDULED | TASK | REFERENCE, 244 SCHEDULED | REFERENCE, 245 Ordering::AcqRel, 246 Ordering::Acquire, 247 ) { 248 loop { 249 // If the task has been completed but not yet closed, that means its output 250 // must be dropped. 251 if state & COMPLETED != 0 && state & CLOSED == 0 { 252 // Mark the task as closed in order to grab its output. 253 match (*header).state.compare_exchange_weak( 254 state, 255 state | CLOSED, 256 Ordering::AcqRel, 257 Ordering::Acquire, 258 ) { 259 Ok(_) => { 260 // Read the output. 261 output = Some( 262 (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>) 263 .read(), 264 ); 265 266 // Update the state variable because we're continuing the loop. 267 state |= CLOSED; 268 } 269 Err(s) => state = s, 270 } 271 } else { 272 // If this is the last reference to the task and it's not closed, then 273 // close it and schedule one more time so that its future gets dropped by 274 // the executor. 275 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { 276 SCHEDULED | CLOSED | REFERENCE 277 } else { 278 state & !TASK 279 }; 280 281 // Unset the `TASK` flag. 282 match (*header).state.compare_exchange_weak( 283 state, 284 new, 285 Ordering::AcqRel, 286 Ordering::Acquire, 287 ) { 288 Ok(_) => { 289 // If this is the last reference to the task, we need to either 290 // schedule dropping its future or destroy it. 291 if state & !(REFERENCE - 1) == 0 { 292 if state & CLOSED == 0 { 293 ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); 294 } else { 295 ((*header).vtable.destroy)(ptr); 296 } 297 } 298 299 break; 300 } 301 Err(s) => state = s, 302 } 303 } 304 } 305 } 306 307 output 308 } 309 } 310 311 /// Polls the task to retrieve its output. 312 /// 313 /// Returns `Some` if the task has completed or `None` if it was closed. 314 /// 315 /// A task becomes closed in the following cases: 316 /// 317 /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. 318 /// 2. Its output gets awaited by the `Task`. 319 /// 3. It panics while polling the future. 320 /// 4. It is completed and the `Task` gets dropped. poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>321 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { 322 let ptr = self.ptr.as_ptr(); 323 let header = ptr as *const Header<M>; 324 325 unsafe { 326 let mut state = (*header).state.load(Ordering::Acquire); 327 328 loop { 329 // If the task has been closed, notify the awaiter and return `None`. 330 if state & CLOSED != 0 { 331 // If the task is scheduled or running, we need to wait until its future is 332 // dropped. 333 if state & (SCHEDULED | RUNNING) != 0 { 334 // Replace the waker with one associated with the current task. 335 (*header).register(cx.waker()); 336 337 // Reload the state after registering. It is possible changes occurred just 338 // before registration so we need to check for that. 339 state = (*header).state.load(Ordering::Acquire); 340 341 // If the task is still scheduled or running, we need to wait because its 342 // future is not dropped yet. 343 if state & (SCHEDULED | RUNNING) != 0 { 344 return Poll::Pending; 345 } 346 } 347 348 // Even though the awaiter is most likely the current task, it could also be 349 // another task. 350 (*header).notify(Some(cx.waker())); 351 return Poll::Ready(None); 352 } 353 354 // If the task is not completed, register the current task. 355 if state & COMPLETED == 0 { 356 // Replace the waker with one associated with the current task. 357 (*header).register(cx.waker()); 358 359 // Reload the state after registering. It is possible that the task became 360 // completed or closed just before registration so we need to check for that. 361 state = (*header).state.load(Ordering::Acquire); 362 363 // If the task has been closed, restart. 364 if state & CLOSED != 0 { 365 continue; 366 } 367 368 // If the task is still not completed, we're blocked on it. 369 if state & COMPLETED == 0 { 370 return Poll::Pending; 371 } 372 } 373 374 // Since the task is now completed, mark it as closed in order to grab its output. 375 match (*header).state.compare_exchange( 376 state, 377 state | CLOSED, 378 Ordering::AcqRel, 379 Ordering::Acquire, 380 ) { 381 Ok(_) => { 382 // Notify the awaiter. Even though the awaiter is most likely the current 383 // task, it could also be another task. 384 if state & AWAITER != 0 { 385 (*header).notify(Some(cx.waker())); 386 } 387 388 // Take the output from the task. 389 let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>; 390 let output = output.read(); 391 392 // Propagate the panic if the task panicked. 393 let output = match output { 394 Ok(output) => output, 395 Err(panic) => { 396 #[cfg(feature = "std")] 397 std::panic::resume_unwind(panic); 398 399 #[cfg(not(feature = "std"))] 400 match panic {} 401 } 402 }; 403 404 return Poll::Ready(Some(output)); 405 } 406 Err(s) => state = s, 407 } 408 } 409 } 410 } 411 header(&self) -> &Header<M>412 fn header(&self) -> &Header<M> { 413 let ptr = self.ptr.as_ptr(); 414 let header = ptr as *const Header<M>; 415 unsafe { &*header } 416 } 417 418 /// Returns `true` if the current task is finished. 419 /// 420 /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. is_finished(&self) -> bool421 pub fn is_finished(&self) -> bool { 422 let ptr = self.ptr.as_ptr(); 423 let header = ptr as *const Header<M>; 424 425 unsafe { 426 let state = (*header).state.load(Ordering::Acquire); 427 state & (CLOSED | COMPLETED) != 0 428 } 429 } 430 431 /// Get the metadata associated with this task. 432 /// 433 /// Tasks can be created with a metadata object associated with them; by default, this 434 /// is a `()` value. See the [`Builder::metadata()`] method for more information. metadata(&self) -> &M435 pub fn metadata(&self) -> &M { 436 &self.header().metadata 437 } 438 } 439 440 impl<T, M> Drop for Task<T, M> { drop(&mut self)441 fn drop(&mut self) { 442 self.set_canceled(); 443 self.set_detached(); 444 } 445 } 446 447 impl<T, M> Future for Task<T, M> { 448 type Output = T; 449 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 451 match self.poll_task(cx) { 452 Poll::Ready(t) => Poll::Ready(t.expect("Task polled after completion")), 453 Poll::Pending => Poll::Pending, 454 } 455 } 456 } 457 458 impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result459 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 460 f.debug_struct("Task") 461 .field("header", self.header()) 462 .finish() 463 } 464 } 465 466 /// A spawned task with a fallible response. 467 /// 468 /// This type behaves like [`Task`], however it produces an `Option<T>` when 469 /// polled and will return `None` if the executor dropped its 470 /// [`Runnable`][`super::Runnable`] without being run. 471 /// 472 /// This can be useful to avoid the panic produced when polling the `Task` 473 /// future if the executor dropped its `Runnable`. 474 #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] 475 pub struct FallibleTask<T, M = ()> { 476 task: Task<T, M>, 477 } 478 479 impl<T, M> FallibleTask<T, M> { 480 /// Detaches the task to let it keep running in the background. 481 /// 482 /// # Examples 483 /// 484 /// ``` 485 /// use smol::{Executor, Timer}; 486 /// use std::time::Duration; 487 /// 488 /// let ex = Executor::new(); 489 /// 490 /// // Spawn a deamon future. 491 /// ex.spawn(async { 492 /// loop { 493 /// println!("I'm a daemon task looping forever."); 494 /// Timer::after(Duration::from_secs(1)).await; 495 /// } 496 /// }) 497 /// .fallible() 498 /// .detach(); 499 /// ``` detach(self)500 pub fn detach(self) { 501 self.task.detach() 502 } 503 504 /// Cancels the task and waits for it to stop running. 505 /// 506 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if 507 /// it didn't complete. 508 /// 509 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of 510 /// canceling because it also waits for the task to stop running. 511 /// 512 /// # Examples 513 /// 514 /// ``` 515 /// # if cfg!(miri) { return; } // Miri does not support epoll 516 /// use smol::{future, Executor, Timer}; 517 /// use std::thread; 518 /// use std::time::Duration; 519 /// 520 /// let ex = Executor::new(); 521 /// 522 /// // Spawn a deamon future. 523 /// let task = ex.spawn(async { 524 /// loop { 525 /// println!("Even though I'm in an infinite loop, you can still cancel me!"); 526 /// Timer::after(Duration::from_secs(1)).await; 527 /// } 528 /// }) 529 /// .fallible(); 530 /// 531 /// // Run an executor thread. 532 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); 533 /// 534 /// future::block_on(async { 535 /// Timer::after(Duration::from_secs(3)).await; 536 /// task.cancel().await; 537 /// }); 538 /// ``` cancel(self) -> Option<T>539 pub async fn cancel(self) -> Option<T> { 540 self.task.cancel().await 541 } 542 543 /// Returns `true` if the current task is finished. 544 /// 545 /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. is_finished(&self) -> bool546 pub fn is_finished(&self) -> bool { 547 self.task.is_finished() 548 } 549 } 550 551 impl<T, M> Future for FallibleTask<T, M> { 552 type Output = Option<T>; 553 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>554 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 555 self.task.poll_task(cx) 556 } 557 } 558 559 impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 561 f.debug_struct("FallibleTask") 562 .field("header", self.task.header()) 563 .finish() 564 } 565 } 566