1 //! A collection of tasks spawned on a Tokio runtime. 2 //! 3 //! This module provides the [`JoinSet`] type, a collection which stores a set 4 //! of spawned tasks and allows asynchronously awaiting the output of those 5 //! tasks as they complete. See the documentation for the [`JoinSet`] type for 6 //! details. 7 use std::future::Future; 8 use std::pin::Pin; 9 use std::task::{Context, Poll}; 10 use std::{fmt, panic}; 11 12 use crate::runtime::Handle; 13 use crate::task::Id; 14 use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet}; 15 use crate::util::IdleNotifiedSet; 16 17 /// A collection of tasks spawned on a Tokio runtime. 18 /// 19 /// A `JoinSet` can be used to await the completion of some or all of the tasks 20 /// in the set. The set is not ordered, and the tasks will be returned in the 21 /// order they complete. 22 /// 23 /// All of the tasks must have the same return type `T`. 24 /// 25 /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted. 26 /// 27 /// # Examples 28 /// 29 /// Spawn multiple tasks and wait for them. 30 /// 31 /// ``` 32 /// use tokio::task::JoinSet; 33 /// 34 /// #[tokio::main] 35 /// async fn main() { 36 /// let mut set = JoinSet::new(); 37 /// 38 /// for i in 0..10 { 39 /// set.spawn(async move { i }); 40 /// } 41 /// 42 /// let mut seen = [false; 10]; 43 /// while let Some(res) = set.join_next().await { 44 /// let idx = res.unwrap(); 45 /// seen[idx] = true; 46 /// } 47 /// 48 /// for i in 0..10 { 49 /// assert!(seen[i]); 50 /// } 51 /// } 52 /// ``` 53 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] 54 pub struct JoinSet<T> { 55 inner: IdleNotifiedSet<JoinHandle<T>>, 56 } 57 58 /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather 59 /// than on the current default runtime. 60 /// 61 /// [`task::Builder`]: crate::task::Builder 62 #[cfg(all(tokio_unstable, feature = "tracing"))] 63 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] 64 #[must_use = "builders do nothing unless used to spawn a task"] 65 pub struct Builder<'a, T> { 66 joinset: &'a mut JoinSet<T>, 67 builder: super::Builder<'a>, 68 } 69 70 impl<T> JoinSet<T> { 71 /// Create a new `JoinSet`. new() -> Self72 pub fn new() -> Self { 73 Self { 74 inner: IdleNotifiedSet::new(), 75 } 76 } 77 78 /// Returns the number of tasks currently in the `JoinSet`. len(&self) -> usize79 pub fn len(&self) -> usize { 80 self.inner.len() 81 } 82 83 /// Returns whether the `JoinSet` is empty. is_empty(&self) -> bool84 pub fn is_empty(&self) -> bool { 85 self.inner.is_empty() 86 } 87 } 88 89 impl<T: 'static> JoinSet<T> { 90 /// Returns a [`Builder`] that can be used to configure a task prior to 91 /// spawning it on this `JoinSet`. 92 /// 93 /// # Examples 94 /// 95 /// ``` 96 /// use tokio::task::JoinSet; 97 /// 98 /// #[tokio::main] 99 /// async fn main() -> std::io::Result<()> { 100 /// let mut set = JoinSet::new(); 101 /// 102 /// // Use the builder to configure a task's name before spawning it. 103 /// set.build_task() 104 /// .name("my_task") 105 /// .spawn(async { /* ... */ })?; 106 /// 107 /// Ok(()) 108 /// } 109 /// ``` 110 #[cfg(all(tokio_unstable, feature = "tracing"))] 111 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] build_task(&mut self) -> Builder<'_, T>112 pub fn build_task(&mut self) -> Builder<'_, T> { 113 Builder { 114 builder: super::Builder::new(), 115 joinset: self, 116 } 117 } 118 119 /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] 120 /// that can be used to remotely cancel the task. 121 /// 122 /// The provided future will start running in the background immediately 123 /// when this method is called, even if you don't await anything on this 124 /// `JoinSet`. 125 /// 126 /// # Panics 127 /// 128 /// This method panics if called outside of a Tokio runtime. 129 /// 130 /// [`AbortHandle`]: crate::task::AbortHandle 131 #[track_caller] spawn<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,132 pub fn spawn<F>(&mut self, task: F) -> AbortHandle 133 where 134 F: Future<Output = T>, 135 F: Send + 'static, 136 T: Send, 137 { 138 self.insert(crate::spawn(task)) 139 } 140 141 /// Spawn the provided task on the provided runtime and store it in this 142 /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely 143 /// cancel the task. 144 /// 145 /// The provided future will start running in the background immediately 146 /// when this method is called, even if you don't await anything on this 147 /// `JoinSet`. 148 /// 149 /// [`AbortHandle`]: crate::task::AbortHandle 150 #[track_caller] spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,151 pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle 152 where 153 F: Future<Output = T>, 154 F: Send + 'static, 155 T: Send, 156 { 157 self.insert(handle.spawn(task)) 158 } 159 160 /// Spawn the provided task on the current [`LocalSet`] and store it in this 161 /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely 162 /// cancel the task. 163 /// 164 /// The provided future will start running in the background immediately 165 /// when this method is called, even if you don't await anything on this 166 /// `JoinSet`. 167 /// 168 /// # Panics 169 /// 170 /// This method panics if it is called outside of a `LocalSet`. 171 /// 172 /// [`LocalSet`]: crate::task::LocalSet 173 /// [`AbortHandle`]: crate::task::AbortHandle 174 #[track_caller] spawn_local<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: 'static,175 pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle 176 where 177 F: Future<Output = T>, 178 F: 'static, 179 { 180 self.insert(crate::task::spawn_local(task)) 181 } 182 183 /// Spawn the provided task on the provided [`LocalSet`] and store it in 184 /// this `JoinSet`, returning an [`AbortHandle`] that can be used to 185 /// remotely cancel the task. 186 /// 187 /// Unlike the [`spawn_local`] method, this method may be used to spawn local 188 /// tasks on a `LocalSet` that is _not_ currently running. The provided 189 /// future will start running whenever the `LocalSet` is next started. 190 /// 191 /// [`LocalSet`]: crate::task::LocalSet 192 /// [`AbortHandle`]: crate::task::AbortHandle 193 /// [`spawn_local`]: Self::spawn_local 194 #[track_caller] spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle where F: Future<Output = T>, F: 'static,195 pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle 196 where 197 F: Future<Output = T>, 198 F: 'static, 199 { 200 self.insert(local_set.spawn_local(task)) 201 } 202 203 /// Spawn the blocking code on the blocking threadpool and store 204 /// it in this `JoinSet`, returning an [`AbortHandle`] that can be 205 /// used to remotely cancel the task. 206 /// 207 /// # Examples 208 /// 209 /// Spawn multiple blocking tasks and wait for them. 210 /// 211 /// ``` 212 /// use tokio::task::JoinSet; 213 /// 214 /// #[tokio::main] 215 /// async fn main() { 216 /// let mut set = JoinSet::new(); 217 /// 218 /// for i in 0..10 { 219 /// set.spawn_blocking(move || { i }); 220 /// } 221 /// 222 /// let mut seen = [false; 10]; 223 /// while let Some(res) = set.join_next().await { 224 /// let idx = res.unwrap(); 225 /// seen[idx] = true; 226 /// } 227 /// 228 /// for i in 0..10 { 229 /// assert!(seen[i]); 230 /// } 231 /// } 232 /// ``` 233 /// 234 /// # Panics 235 /// 236 /// This method panics if called outside of a Tokio runtime. 237 /// 238 /// [`AbortHandle`]: crate::task::AbortHandle 239 #[track_caller] spawn_blocking<F>(&mut self, f: F) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,240 pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle 241 where 242 F: FnOnce() -> T, 243 F: Send + 'static, 244 T: Send, 245 { 246 self.insert(crate::runtime::spawn_blocking(f)) 247 } 248 249 /// Spawn the blocking code on the blocking threadpool of the 250 /// provided runtime and store it in this `JoinSet`, returning an 251 /// [`AbortHandle`] that can be used to remotely cancel the task. 252 /// 253 /// [`AbortHandle`]: crate::task::AbortHandle 254 #[track_caller] spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,255 pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle 256 where 257 F: FnOnce() -> T, 258 F: Send + 'static, 259 T: Send, 260 { 261 self.insert(handle.spawn_blocking(f)) 262 } 263 insert(&mut self, jh: JoinHandle<T>) -> AbortHandle264 fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle { 265 let abort = jh.abort_handle(); 266 let mut entry = self.inner.insert_idle(jh); 267 268 // Set the waker that is notified when the task completes. 269 entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker())); 270 abort 271 } 272 273 /// Waits until one of the tasks in the set completes and returns its output. 274 /// 275 /// Returns `None` if the set is empty. 276 /// 277 /// # Cancel Safety 278 /// 279 /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` 280 /// statement and some other branch completes first, it is guaranteed that no tasks were 281 /// removed from this `JoinSet`. join_next(&mut self) -> Option<Result<T, JoinError>>282 pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> { 283 std::future::poll_fn(|cx| self.poll_join_next(cx)).await 284 } 285 286 /// Waits until one of the tasks in the set completes and returns its 287 /// output, along with the [task ID] of the completed task. 288 /// 289 /// Returns `None` if the set is empty. 290 /// 291 /// When this method returns an error, then the id of the task that failed can be accessed 292 /// using the [`JoinError::id`] method. 293 /// 294 /// # Cancel Safety 295 /// 296 /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!` 297 /// statement and some other branch completes first, it is guaranteed that no tasks were 298 /// removed from this `JoinSet`. 299 /// 300 /// [task ID]: crate::task::Id 301 /// [`JoinError::id`]: fn@crate::task::JoinError::id join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>302 pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> { 303 std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await 304 } 305 306 /// Tries to join one of the tasks in the set that has completed and return its output. 307 /// 308 /// Returns `None` if there are no completed tasks, or if the set is empty. try_join_next(&mut self) -> Option<Result<T, JoinError>>309 pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> { 310 // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left. 311 loop { 312 let mut entry = self.inner.try_pop_notified()?; 313 314 let res = entry.with_value_and_context(|jh, ctx| { 315 // Since this function is not async and cannot be forced to yield, we should 316 // disable budgeting when we want to check for the `JoinHandle` readiness. 317 Pin::new(&mut unconstrained(jh)).poll(ctx) 318 }); 319 320 if let Poll::Ready(res) = res { 321 let _entry = entry.remove(); 322 323 return Some(res); 324 } 325 } 326 } 327 328 /// Tries to join one of the tasks in the set that has completed and return its output, 329 /// along with the [task ID] of the completed task. 330 /// 331 /// Returns `None` if there are no completed tasks, or if the set is empty. 332 /// 333 /// When this method returns an error, then the id of the task that failed can be accessed 334 /// using the [`JoinError::id`] method. 335 /// 336 /// [task ID]: crate::task::Id 337 /// [`JoinError::id`]: fn@crate::task::JoinError::id try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>338 pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> { 339 // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left. 340 loop { 341 let mut entry = self.inner.try_pop_notified()?; 342 343 let res = entry.with_value_and_context(|jh, ctx| { 344 // Since this function is not async and cannot be forced to yield, we should 345 // disable budgeting when we want to check for the `JoinHandle` readiness. 346 Pin::new(&mut unconstrained(jh)).poll(ctx) 347 }); 348 349 if let Poll::Ready(res) = res { 350 let entry = entry.remove(); 351 352 return Some(res.map(|output| (entry.id(), output))); 353 } 354 } 355 } 356 357 /// Aborts all tasks and waits for them to finish shutting down. 358 /// 359 /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in 360 /// a loop until it returns `None`. 361 /// 362 /// This method ignores any panics in the tasks shutting down. When this call returns, the 363 /// `JoinSet` will be empty. 364 /// 365 /// [`abort_all`]: fn@Self::abort_all 366 /// [`join_next`]: fn@Self::join_next shutdown(&mut self)367 pub async fn shutdown(&mut self) { 368 self.abort_all(); 369 while self.join_next().await.is_some() {} 370 } 371 372 /// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results. 373 /// 374 /// The results will be stored in the order they completed not the order they were spawned. 375 /// This is a convenience method that is equivalent to calling [`join_next`] in 376 /// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call 377 /// to `join_all` will panic and all remaining tasks on the `JoinSet` are 378 /// cancelled. To handle errors in any other way, manually call [`join_next`] 379 /// in a loop. 380 /// 381 /// # Examples 382 /// 383 /// Spawn multiple tasks and `join_all` them. 384 /// 385 /// ``` 386 /// use tokio::task::JoinSet; 387 /// use std::time::Duration; 388 /// 389 /// #[tokio::main] 390 /// async fn main() { 391 /// let mut set = JoinSet::new(); 392 /// 393 /// for i in 0..3 { 394 /// set.spawn(async move { 395 /// tokio::time::sleep(Duration::from_secs(3 - i)).await; 396 /// i 397 /// }); 398 /// } 399 /// 400 /// let output = set.join_all().await; 401 /// assert_eq!(output, vec![2, 1, 0]); 402 /// } 403 /// ``` 404 /// 405 /// Equivalent implementation of `join_all`, using [`join_next`] and loop. 406 /// 407 /// ``` 408 /// use tokio::task::JoinSet; 409 /// use std::panic; 410 /// 411 /// #[tokio::main] 412 /// async fn main() { 413 /// let mut set = JoinSet::new(); 414 /// 415 /// for i in 0..3 { 416 /// set.spawn(async move {i}); 417 /// } 418 /// 419 /// let mut output = Vec::new(); 420 /// while let Some(res) = set.join_next().await{ 421 /// match res { 422 /// Ok(t) => output.push(t), 423 /// Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()), 424 /// Err(err) => panic!("{err}"), 425 /// } 426 /// } 427 /// assert_eq!(output.len(),3); 428 /// } 429 /// ``` 430 /// [`join_next`]: fn@Self::join_next 431 /// [`JoinError::id`]: fn@crate::task::JoinError::id join_all(mut self) -> Vec<T>432 pub async fn join_all(mut self) -> Vec<T> { 433 let mut output = Vec::with_capacity(self.len()); 434 435 while let Some(res) = self.join_next().await { 436 match res { 437 Ok(t) => output.push(t), 438 Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()), 439 Err(err) => panic!("{err}"), 440 } 441 } 442 output 443 } 444 445 /// Aborts all tasks on this `JoinSet`. 446 /// 447 /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete 448 /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty. abort_all(&mut self)449 pub fn abort_all(&mut self) { 450 self.inner.for_each(|jh| jh.abort()); 451 } 452 453 /// Removes all tasks from this `JoinSet` without aborting them. 454 /// 455 /// The tasks removed by this call will continue to run in the background even if the `JoinSet` 456 /// is dropped. detach_all(&mut self)457 pub fn detach_all(&mut self) { 458 self.inner.drain(drop); 459 } 460 461 /// Polls for one of the tasks in the set to complete. 462 /// 463 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. 464 /// 465 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled 466 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to 467 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is 468 /// scheduled to receive a wakeup. 469 /// 470 /// # Returns 471 /// 472 /// This function returns: 473 /// 474 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is 475 /// available right now. 476 /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed. 477 /// The `value` is the return value of one of the tasks that completed. 478 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been 479 /// aborted. The `err` is the `JoinError` from the panicked/aborted task. 480 /// * `Poll::Ready(None)` if the `JoinSet` is empty. 481 /// 482 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. 483 /// This can happen if the [coop budget] is reached. 484 /// 485 /// [coop budget]: crate::task#cooperative-scheduling poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>>486 pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> { 487 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to 488 // the `notified` list if the waker is notified in the `poll` call below. 489 let mut entry = match self.inner.pop_notified(cx.waker()) { 490 Some(entry) => entry, 491 None => { 492 if self.is_empty() { 493 return Poll::Ready(None); 494 } else { 495 // The waker was set by `pop_notified`. 496 return Poll::Pending; 497 } 498 } 499 }; 500 501 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); 502 503 if let Poll::Ready(res) = res { 504 let _entry = entry.remove(); 505 Poll::Ready(Some(res)) 506 } else { 507 // A JoinHandle generally won't emit a wakeup without being ready unless 508 // the coop limit has been reached. We yield to the executor in this 509 // case. 510 cx.waker().wake_by_ref(); 511 Poll::Pending 512 } 513 } 514 515 /// Polls for one of the tasks in the set to complete. 516 /// 517 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. 518 /// 519 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled 520 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to 521 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is 522 /// scheduled to receive a wakeup. 523 /// 524 /// # Returns 525 /// 526 /// This function returns: 527 /// 528 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is 529 /// available right now. 530 /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed. 531 /// The `value` is the return value of one of the tasks that completed, and 532 /// `id` is the [task ID] of that task. 533 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been 534 /// aborted. The `err` is the `JoinError` from the panicked/aborted task. 535 /// * `Poll::Ready(None)` if the `JoinSet` is empty. 536 /// 537 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. 538 /// This can happen if the [coop budget] is reached. 539 /// 540 /// [coop budget]: crate::task#cooperative-scheduling 541 /// [task ID]: crate::task::Id poll_join_next_with_id( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Id, T), JoinError>>>542 pub fn poll_join_next_with_id( 543 &mut self, 544 cx: &mut Context<'_>, 545 ) -> Poll<Option<Result<(Id, T), JoinError>>> { 546 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to 547 // the `notified` list if the waker is notified in the `poll` call below. 548 let mut entry = match self.inner.pop_notified(cx.waker()) { 549 Some(entry) => entry, 550 None => { 551 if self.is_empty() { 552 return Poll::Ready(None); 553 } else { 554 // The waker was set by `pop_notified`. 555 return Poll::Pending; 556 } 557 } 558 }; 559 560 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); 561 562 if let Poll::Ready(res) = res { 563 let entry = entry.remove(); 564 // If the task succeeded, add the task ID to the output. Otherwise, the 565 // `JoinError` will already have the task's ID. 566 Poll::Ready(Some(res.map(|output| (entry.id(), output)))) 567 } else { 568 // A JoinHandle generally won't emit a wakeup without being ready unless 569 // the coop limit has been reached. We yield to the executor in this 570 // case. 571 cx.waker().wake_by_ref(); 572 Poll::Pending 573 } 574 } 575 } 576 577 impl<T> Drop for JoinSet<T> { drop(&mut self)578 fn drop(&mut self) { 579 self.inner.drain(|join_handle| join_handle.abort()); 580 } 581 } 582 583 impl<T> fmt::Debug for JoinSet<T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result584 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 585 f.debug_struct("JoinSet").field("len", &self.len()).finish() 586 } 587 } 588 589 impl<T> Default for JoinSet<T> { default() -> Self590 fn default() -> Self { 591 Self::new() 592 } 593 } 594 595 /// Collect an iterator of futures into a [`JoinSet`]. 596 /// 597 /// This is equivalent to calling [`JoinSet::spawn`] on each element of the iterator. 598 /// 599 /// # Examples 600 /// 601 /// The main example from [`JoinSet`]'s documentation can also be written using [`collect`]: 602 /// 603 /// ``` 604 /// use tokio::task::JoinSet; 605 /// 606 /// #[tokio::main] 607 /// async fn main() { 608 /// let mut set: JoinSet<_> = (0..10).map(|i| async move { i }).collect(); 609 /// 610 /// let mut seen = [false; 10]; 611 /// while let Some(res) = set.join_next().await { 612 /// let idx = res.unwrap(); 613 /// seen[idx] = true; 614 /// } 615 /// 616 /// for i in 0..10 { 617 /// assert!(seen[i]); 618 /// } 619 /// } 620 /// ``` 621 /// 622 /// [`collect`]: std::iter::Iterator::collect 623 impl<T, F> std::iter::FromIterator<F> for JoinSet<T> 624 where 625 F: Future<Output = T>, 626 F: Send + 'static, 627 T: Send + 'static, 628 { from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self629 fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self { 630 let mut set = Self::new(); 631 iter.into_iter().for_each(|task| { 632 set.spawn(task); 633 }); 634 set 635 } 636 } 637 638 // === impl Builder === 639 640 #[cfg(all(tokio_unstable, feature = "tracing"))] 641 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] 642 impl<'a, T: 'static> Builder<'a, T> { 643 /// Assigns a name to the task which will be spawned. name(self, name: &'a str) -> Self644 pub fn name(self, name: &'a str) -> Self { 645 let builder = self.builder.name(name); 646 Self { builder, ..self } 647 } 648 649 /// Spawn the provided task with this builder's settings and store it in the 650 /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely 651 /// cancel the task. 652 /// 653 /// # Returns 654 /// 655 /// An [`AbortHandle`] that can be used to remotely cancel the task. 656 /// 657 /// # Panics 658 /// 659 /// This method panics if called outside of a Tokio runtime. 660 /// 661 /// [`AbortHandle`]: crate::task::AbortHandle 662 #[track_caller] spawn<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,663 pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle> 664 where 665 F: Future<Output = T>, 666 F: Send + 'static, 667 T: Send, 668 { 669 Ok(self.joinset.insert(self.builder.spawn(future)?)) 670 } 671 672 /// Spawn the provided task on the provided [runtime handle] with this 673 /// builder's settings, and store it in the [`JoinSet`]. 674 /// 675 /// # Returns 676 /// 677 /// An [`AbortHandle`] that can be used to remotely cancel the task. 678 /// 679 /// 680 /// [`AbortHandle`]: crate::task::AbortHandle 681 /// [runtime handle]: crate::runtime::Handle 682 #[track_caller] spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,683 pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> 684 where 685 F: Future<Output = T>, 686 F: Send + 'static, 687 T: Send, 688 { 689 Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?)) 690 } 691 692 /// Spawn the blocking code on the blocking threadpool with this builder's 693 /// settings, and store it in the [`JoinSet`]. 694 /// 695 /// # Returns 696 /// 697 /// An [`AbortHandle`] that can be used to remotely cancel the task. 698 /// 699 /// # Panics 700 /// 701 /// This method panics if called outside of a Tokio runtime. 702 /// 703 /// [`JoinSet`]: crate::task::JoinSet 704 /// [`AbortHandle`]: crate::task::AbortHandle 705 #[track_caller] spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle> where F: FnOnce() -> T, F: Send + 'static, T: Send,706 pub fn spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle> 707 where 708 F: FnOnce() -> T, 709 F: Send + 'static, 710 T: Send, 711 { 712 Ok(self.joinset.insert(self.builder.spawn_blocking(f)?)) 713 } 714 715 /// Spawn the blocking code on the blocking threadpool of the provided 716 /// runtime handle with this builder's settings, and store it in the 717 /// [`JoinSet`]. 718 /// 719 /// # Returns 720 /// 721 /// An [`AbortHandle`] that can be used to remotely cancel the task. 722 /// 723 /// [`JoinSet`]: crate::task::JoinSet 724 /// [`AbortHandle`]: crate::task::AbortHandle 725 #[track_caller] spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle> where F: FnOnce() -> T, F: Send + 'static, T: Send,726 pub fn spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle> 727 where 728 F: FnOnce() -> T, 729 F: Send + 'static, 730 T: Send, 731 { 732 Ok(self 733 .joinset 734 .insert(self.builder.spawn_blocking_on(f, handle)?)) 735 } 736 737 /// Spawn the provided task on the current [`LocalSet`] with this builder's 738 /// settings, and store it in the [`JoinSet`]. 739 /// 740 /// # Returns 741 /// 742 /// An [`AbortHandle`] that can be used to remotely cancel the task. 743 /// 744 /// # Panics 745 /// 746 /// This method panics if it is called outside of a `LocalSet`. 747 /// 748 /// [`LocalSet`]: crate::task::LocalSet 749 /// [`AbortHandle`]: crate::task::AbortHandle 750 #[track_caller] spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,751 pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> 752 where 753 F: Future<Output = T>, 754 F: 'static, 755 { 756 Ok(self.joinset.insert(self.builder.spawn_local(future)?)) 757 } 758 759 /// Spawn the provided task on the provided [`LocalSet`] with this builder's 760 /// settings, and store it in the [`JoinSet`]. 761 /// 762 /// # Returns 763 /// 764 /// An [`AbortHandle`] that can be used to remotely cancel the task. 765 /// 766 /// [`LocalSet`]: crate::task::LocalSet 767 /// [`AbortHandle`]: crate::task::AbortHandle 768 #[track_caller] spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,769 pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> 770 where 771 F: Future<Output = T>, 772 F: 'static, 773 { 774 Ok(self 775 .joinset 776 .insert(self.builder.spawn_local_on(future, local_set)?)) 777 } 778 } 779 780 // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is 781 // `Debug`. 782 #[cfg(all(tokio_unstable, feature = "tracing"))] 783 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] 784 impl<'a, T> fmt::Debug for Builder<'a, T> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result785 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 786 f.debug_struct("join_set::Builder") 787 .field("joinset", &self.joinset) 788 .field("builder", &self.builder) 789 .finish() 790 } 791 } 792