1 //! A collection of tasks spawned on a Tokio runtime.
2 //!
3 //! This module provides the [`JoinSet`] type, a collection which stores a set
4 //! of spawned tasks and allows asynchronously awaiting the output of those
5 //! tasks as they complete. See the documentation for the [`JoinSet`] type for
6 //! details.
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::task::{Context, Poll};
10 use std::{fmt, panic};
11 
12 use crate::runtime::Handle;
13 use crate::task::Id;
14 use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet};
15 use crate::util::IdleNotifiedSet;
16 
17 /// A collection of tasks spawned on a Tokio runtime.
18 ///
19 /// A `JoinSet` can be used to await the completion of some or all of the tasks
20 /// in the set. The set is not ordered, and the tasks will be returned in the
21 /// order they complete.
22 ///
23 /// All of the tasks must have the same return type `T`.
24 ///
25 /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
26 ///
27 /// # Examples
28 ///
29 /// Spawn multiple tasks and wait for them.
30 ///
31 /// ```
32 /// use tokio::task::JoinSet;
33 ///
34 /// #[tokio::main]
35 /// async fn main() {
36 ///     let mut set = JoinSet::new();
37 ///
38 ///     for i in 0..10 {
39 ///         set.spawn(async move { i });
40 ///     }
41 ///
42 ///     let mut seen = [false; 10];
43 ///     while let Some(res) = set.join_next().await {
44 ///         let idx = res.unwrap();
45 ///         seen[idx] = true;
46 ///     }
47 ///
48 ///     for i in 0..10 {
49 ///         assert!(seen[i]);
50 ///     }
51 /// }
52 /// ```
53 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
54 pub struct JoinSet<T> {
55     inner: IdleNotifiedSet<JoinHandle<T>>,
56 }
57 
58 /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
59 /// than on the current default runtime.
60 ///
61 /// [`task::Builder`]: crate::task::Builder
62 #[cfg(all(tokio_unstable, feature = "tracing"))]
63 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
64 #[must_use = "builders do nothing unless used to spawn a task"]
65 pub struct Builder<'a, T> {
66     joinset: &'a mut JoinSet<T>,
67     builder: super::Builder<'a>,
68 }
69 
70 impl<T> JoinSet<T> {
71     /// Create a new `JoinSet`.
new() -> Self72     pub fn new() -> Self {
73         Self {
74             inner: IdleNotifiedSet::new(),
75         }
76     }
77 
78     /// Returns the number of tasks currently in the `JoinSet`.
len(&self) -> usize79     pub fn len(&self) -> usize {
80         self.inner.len()
81     }
82 
83     /// Returns whether the `JoinSet` is empty.
is_empty(&self) -> bool84     pub fn is_empty(&self) -> bool {
85         self.inner.is_empty()
86     }
87 }
88 
89 impl<T: 'static> JoinSet<T> {
90     /// Returns a [`Builder`] that can be used to configure a task prior to
91     /// spawning it on this `JoinSet`.
92     ///
93     /// # Examples
94     ///
95     /// ```
96     /// use tokio::task::JoinSet;
97     ///
98     /// #[tokio::main]
99     /// async fn main() -> std::io::Result<()> {
100     ///     let mut set = JoinSet::new();
101     ///
102     ///     // Use the builder to configure a task's name before spawning it.
103     ///     set.build_task()
104     ///         .name("my_task")
105     ///         .spawn(async { /* ... */ })?;
106     ///
107     ///     Ok(())
108     /// }
109     /// ```
110     #[cfg(all(tokio_unstable, feature = "tracing"))]
111     #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
build_task(&mut self) -> Builder<'_, T>112     pub fn build_task(&mut self) -> Builder<'_, T> {
113         Builder {
114             builder: super::Builder::new(),
115             joinset: self,
116         }
117     }
118 
119     /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
120     /// that can be used to remotely cancel the task.
121     ///
122     /// The provided future will start running in the background immediately
123     /// when this method is called, even if you don't await anything on this
124     /// `JoinSet`.
125     ///
126     /// # Panics
127     ///
128     /// This method panics if called outside of a Tokio runtime.
129     ///
130     /// [`AbortHandle`]: crate::task::AbortHandle
131     #[track_caller]
spawn<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,132     pub fn spawn<F>(&mut self, task: F) -> AbortHandle
133     where
134         F: Future<Output = T>,
135         F: Send + 'static,
136         T: Send,
137     {
138         self.insert(crate::spawn(task))
139     }
140 
141     /// Spawn the provided task on the provided runtime and store it in this
142     /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
143     /// cancel the task.
144     ///
145     /// The provided future will start running in the background immediately
146     /// when this method is called, even if you don't await anything on this
147     /// `JoinSet`.
148     ///
149     /// [`AbortHandle`]: crate::task::AbortHandle
150     #[track_caller]
spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle where F: Future<Output = T>, F: Send + 'static, T: Send,151     pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
152     where
153         F: Future<Output = T>,
154         F: Send + 'static,
155         T: Send,
156     {
157         self.insert(handle.spawn(task))
158     }
159 
160     /// Spawn the provided task on the current [`LocalSet`] and store it in this
161     /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
162     /// cancel the task.
163     ///
164     /// The provided future will start running in the background immediately
165     /// when this method is called, even if you don't await anything on this
166     /// `JoinSet`.
167     ///
168     /// # Panics
169     ///
170     /// This method panics if it is called outside of a `LocalSet`.
171     ///
172     /// [`LocalSet`]: crate::task::LocalSet
173     /// [`AbortHandle`]: crate::task::AbortHandle
174     #[track_caller]
spawn_local<F>(&mut self, task: F) -> AbortHandle where F: Future<Output = T>, F: 'static,175     pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
176     where
177         F: Future<Output = T>,
178         F: 'static,
179     {
180         self.insert(crate::task::spawn_local(task))
181     }
182 
183     /// Spawn the provided task on the provided [`LocalSet`] and store it in
184     /// this `JoinSet`, returning an [`AbortHandle`] that can be used to
185     /// remotely cancel the task.
186     ///
187     /// Unlike the [`spawn_local`] method, this method may be used to spawn local
188     /// tasks on a `LocalSet` that is _not_ currently running. The provided
189     /// future will start running whenever the `LocalSet` is next started.
190     ///
191     /// [`LocalSet`]: crate::task::LocalSet
192     /// [`AbortHandle`]: crate::task::AbortHandle
193     /// [`spawn_local`]: Self::spawn_local
194     #[track_caller]
spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle where F: Future<Output = T>, F: 'static,195     pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
196     where
197         F: Future<Output = T>,
198         F: 'static,
199     {
200         self.insert(local_set.spawn_local(task))
201     }
202 
203     /// Spawn the blocking code on the blocking threadpool and store
204     /// it in this `JoinSet`, returning an [`AbortHandle`] that can be
205     /// used to remotely cancel the task.
206     ///
207     /// # Examples
208     ///
209     /// Spawn multiple blocking tasks and wait for them.
210     ///
211     /// ```
212     /// use tokio::task::JoinSet;
213     ///
214     /// #[tokio::main]
215     /// async fn main() {
216     ///     let mut set = JoinSet::new();
217     ///
218     ///     for i in 0..10 {
219     ///         set.spawn_blocking(move || { i });
220     ///     }
221     ///
222     ///     let mut seen = [false; 10];
223     ///     while let Some(res) = set.join_next().await {
224     ///         let idx = res.unwrap();
225     ///         seen[idx] = true;
226     ///     }
227     ///
228     ///     for i in 0..10 {
229     ///         assert!(seen[i]);
230     ///     }
231     /// }
232     /// ```
233     ///
234     /// # Panics
235     ///
236     /// This method panics if called outside of a Tokio runtime.
237     ///
238     /// [`AbortHandle`]: crate::task::AbortHandle
239     #[track_caller]
spawn_blocking<F>(&mut self, f: F) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,240     pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
241     where
242         F: FnOnce() -> T,
243         F: Send + 'static,
244         T: Send,
245     {
246         self.insert(crate::runtime::spawn_blocking(f))
247     }
248 
249     /// Spawn the blocking code on the blocking threadpool of the
250     /// provided runtime and store it in this `JoinSet`, returning an
251     /// [`AbortHandle`] that can be used to remotely cancel the task.
252     ///
253     /// [`AbortHandle`]: crate::task::AbortHandle
254     #[track_caller]
spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle where F: FnOnce() -> T, F: Send + 'static, T: Send,255     pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
256     where
257         F: FnOnce() -> T,
258         F: Send + 'static,
259         T: Send,
260     {
261         self.insert(handle.spawn_blocking(f))
262     }
263 
insert(&mut self, jh: JoinHandle<T>) -> AbortHandle264     fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle {
265         let abort = jh.abort_handle();
266         let mut entry = self.inner.insert_idle(jh);
267 
268         // Set the waker that is notified when the task completes.
269         entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
270         abort
271     }
272 
273     /// Waits until one of the tasks in the set completes and returns its output.
274     ///
275     /// Returns `None` if the set is empty.
276     ///
277     /// # Cancel Safety
278     ///
279     /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
280     /// statement and some other branch completes first, it is guaranteed that no tasks were
281     /// removed from this `JoinSet`.
join_next(&mut self) -> Option<Result<T, JoinError>>282     pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
283         std::future::poll_fn(|cx| self.poll_join_next(cx)).await
284     }
285 
286     /// Waits until one of the tasks in the set completes and returns its
287     /// output, along with the [task ID] of the completed task.
288     ///
289     /// Returns `None` if the set is empty.
290     ///
291     /// When this method returns an error, then the id of the task that failed can be accessed
292     /// using the [`JoinError::id`] method.
293     ///
294     /// # Cancel Safety
295     ///
296     /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!`
297     /// statement and some other branch completes first, it is guaranteed that no tasks were
298     /// removed from this `JoinSet`.
299     ///
300     /// [task ID]: crate::task::Id
301     /// [`JoinError::id`]: fn@crate::task::JoinError::id
join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>302     pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
303         std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
304     }
305 
306     /// Tries to join one of the tasks in the set that has completed and return its output.
307     ///
308     /// Returns `None` if there are no completed tasks, or if the set is empty.
try_join_next(&mut self) -> Option<Result<T, JoinError>>309     pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> {
310         // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
311         loop {
312             let mut entry = self.inner.try_pop_notified()?;
313 
314             let res = entry.with_value_and_context(|jh, ctx| {
315                 // Since this function is not async and cannot be forced to yield, we should
316                 // disable budgeting when we want to check for the `JoinHandle` readiness.
317                 Pin::new(&mut unconstrained(jh)).poll(ctx)
318             });
319 
320             if let Poll::Ready(res) = res {
321                 let _entry = entry.remove();
322 
323                 return Some(res);
324             }
325         }
326     }
327 
328     /// Tries to join one of the tasks in the set that has completed and return its output,
329     /// along with the [task ID] of the completed task.
330     ///
331     /// Returns `None` if there are no completed tasks, or if the set is empty.
332     ///
333     /// When this method returns an error, then the id of the task that failed can be accessed
334     /// using the [`JoinError::id`] method.
335     ///
336     /// [task ID]: crate::task::Id
337     /// [`JoinError::id`]: fn@crate::task::JoinError::id
try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>>338     pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
339         // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
340         loop {
341             let mut entry = self.inner.try_pop_notified()?;
342 
343             let res = entry.with_value_and_context(|jh, ctx| {
344                 // Since this function is not async and cannot be forced to yield, we should
345                 // disable budgeting when we want to check for the `JoinHandle` readiness.
346                 Pin::new(&mut unconstrained(jh)).poll(ctx)
347             });
348 
349             if let Poll::Ready(res) = res {
350                 let entry = entry.remove();
351 
352                 return Some(res.map(|output| (entry.id(), output)));
353             }
354         }
355     }
356 
357     /// Aborts all tasks and waits for them to finish shutting down.
358     ///
359     /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
360     /// a loop until it returns `None`.
361     ///
362     /// This method ignores any panics in the tasks shutting down. When this call returns, the
363     /// `JoinSet` will be empty.
364     ///
365     /// [`abort_all`]: fn@Self::abort_all
366     /// [`join_next`]: fn@Self::join_next
shutdown(&mut self)367     pub async fn shutdown(&mut self) {
368         self.abort_all();
369         while self.join_next().await.is_some() {}
370     }
371 
372     /// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results.
373     ///
374     /// The results will be stored in the order they completed not the order they were spawned.
375     /// This is a convenience method that is equivalent to calling [`join_next`] in
376     /// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call
377     /// to `join_all` will panic and all remaining tasks on the `JoinSet` are
378     /// cancelled. To handle errors in any other way, manually call [`join_next`]
379     /// in a loop.
380     ///
381     /// # Examples
382     ///
383     /// Spawn multiple tasks and `join_all` them.
384     ///
385     /// ```
386     /// use tokio::task::JoinSet;
387     /// use std::time::Duration;
388     ///
389     /// #[tokio::main]
390     /// async fn main() {
391     ///     let mut set = JoinSet::new();
392     ///
393     ///     for i in 0..3 {
394     ///        set.spawn(async move {
395     ///            tokio::time::sleep(Duration::from_secs(3 - i)).await;
396     ///            i
397     ///        });
398     ///     }
399     ///
400     ///     let output = set.join_all().await;
401     ///     assert_eq!(output, vec![2, 1, 0]);
402     /// }
403     /// ```
404     ///
405     /// Equivalent implementation of `join_all`, using [`join_next`] and loop.
406     ///
407     /// ```
408     /// use tokio::task::JoinSet;
409     /// use std::panic;
410     ///
411     /// #[tokio::main]
412     /// async fn main() {
413     ///     let mut set = JoinSet::new();
414     ///
415     ///     for i in 0..3 {
416     ///        set.spawn(async move {i});
417     ///     }
418     ///
419     ///     let mut output = Vec::new();
420     ///     while let Some(res) = set.join_next().await{
421     ///         match res {
422     ///             Ok(t) => output.push(t),
423     ///             Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
424     ///             Err(err) => panic!("{err}"),
425     ///         }
426     ///     }
427     ///     assert_eq!(output.len(),3);
428     /// }
429     /// ```
430     /// [`join_next`]: fn@Self::join_next
431     /// [`JoinError::id`]: fn@crate::task::JoinError::id
join_all(mut self) -> Vec<T>432     pub async fn join_all(mut self) -> Vec<T> {
433         let mut output = Vec::with_capacity(self.len());
434 
435         while let Some(res) = self.join_next().await {
436             match res {
437                 Ok(t) => output.push(t),
438                 Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
439                 Err(err) => panic!("{err}"),
440             }
441         }
442         output
443     }
444 
445     /// Aborts all tasks on this `JoinSet`.
446     ///
447     /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
448     /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty.
abort_all(&mut self)449     pub fn abort_all(&mut self) {
450         self.inner.for_each(|jh| jh.abort());
451     }
452 
453     /// Removes all tasks from this `JoinSet` without aborting them.
454     ///
455     /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
456     /// is dropped.
detach_all(&mut self)457     pub fn detach_all(&mut self) {
458         self.inner.drain(drop);
459     }
460 
461     /// Polls for one of the tasks in the set to complete.
462     ///
463     /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
464     ///
465     /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
466     /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
467     /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
468     /// scheduled to receive a wakeup.
469     ///
470     /// # Returns
471     ///
472     /// This function returns:
473     ///
474     ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
475     ///     available right now.
476     ///  * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
477     ///     The `value` is the return value of one of the tasks that completed.
478     ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
479     ///     aborted. The `err` is the `JoinError` from the panicked/aborted task.
480     ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
481     ///
482     /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
483     /// This can happen if the [coop budget] is reached.
484     ///
485     /// [coop budget]: crate::task#cooperative-scheduling
poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>>486     pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
487         // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
488         // the `notified` list if the waker is notified in the `poll` call below.
489         let mut entry = match self.inner.pop_notified(cx.waker()) {
490             Some(entry) => entry,
491             None => {
492                 if self.is_empty() {
493                     return Poll::Ready(None);
494                 } else {
495                     // The waker was set by `pop_notified`.
496                     return Poll::Pending;
497                 }
498             }
499         };
500 
501         let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
502 
503         if let Poll::Ready(res) = res {
504             let _entry = entry.remove();
505             Poll::Ready(Some(res))
506         } else {
507             // A JoinHandle generally won't emit a wakeup without being ready unless
508             // the coop limit has been reached. We yield to the executor in this
509             // case.
510             cx.waker().wake_by_ref();
511             Poll::Pending
512         }
513     }
514 
515     /// Polls for one of the tasks in the set to complete.
516     ///
517     /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
518     ///
519     /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
520     /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
521     /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
522     /// scheduled to receive a wakeup.
523     ///
524     /// # Returns
525     ///
526     /// This function returns:
527     ///
528     ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
529     ///     available right now.
530     ///  * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
531     ///     The `value` is the return value of one of the tasks that completed, and
532     ///    `id` is the [task ID] of that task.
533     ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
534     ///     aborted. The `err` is the `JoinError` from the panicked/aborted task.
535     ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
536     ///
537     /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
538     /// This can happen if the [coop budget] is reached.
539     ///
540     /// [coop budget]: crate::task#cooperative-scheduling
541     /// [task ID]: crate::task::Id
poll_join_next_with_id( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Id, T), JoinError>>>542     pub fn poll_join_next_with_id(
543         &mut self,
544         cx: &mut Context<'_>,
545     ) -> Poll<Option<Result<(Id, T), JoinError>>> {
546         // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
547         // the `notified` list if the waker is notified in the `poll` call below.
548         let mut entry = match self.inner.pop_notified(cx.waker()) {
549             Some(entry) => entry,
550             None => {
551                 if self.is_empty() {
552                     return Poll::Ready(None);
553                 } else {
554                     // The waker was set by `pop_notified`.
555                     return Poll::Pending;
556                 }
557             }
558         };
559 
560         let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
561 
562         if let Poll::Ready(res) = res {
563             let entry = entry.remove();
564             // If the task succeeded, add the task ID to the output. Otherwise, the
565             // `JoinError` will already have the task's ID.
566             Poll::Ready(Some(res.map(|output| (entry.id(), output))))
567         } else {
568             // A JoinHandle generally won't emit a wakeup without being ready unless
569             // the coop limit has been reached. We yield to the executor in this
570             // case.
571             cx.waker().wake_by_ref();
572             Poll::Pending
573         }
574     }
575 }
576 
577 impl<T> Drop for JoinSet<T> {
drop(&mut self)578     fn drop(&mut self) {
579         self.inner.drain(|join_handle| join_handle.abort());
580     }
581 }
582 
583 impl<T> fmt::Debug for JoinSet<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result584     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585         f.debug_struct("JoinSet").field("len", &self.len()).finish()
586     }
587 }
588 
589 impl<T> Default for JoinSet<T> {
default() -> Self590     fn default() -> Self {
591         Self::new()
592     }
593 }
594 
595 /// Collect an iterator of futures into a [`JoinSet`].
596 ///
597 /// This is equivalent to calling [`JoinSet::spawn`] on each element of the iterator.
598 ///
599 /// # Examples
600 ///
601 /// The main example from [`JoinSet`]'s documentation can also be written using [`collect`]:
602 ///
603 /// ```
604 /// use tokio::task::JoinSet;
605 ///
606 /// #[tokio::main]
607 /// async fn main() {
608 ///     let mut set: JoinSet<_> = (0..10).map(|i| async move { i }).collect();
609 ///
610 ///     let mut seen = [false; 10];
611 ///     while let Some(res) = set.join_next().await {
612 ///         let idx = res.unwrap();
613 ///         seen[idx] = true;
614 ///     }
615 ///
616 ///     for i in 0..10 {
617 ///         assert!(seen[i]);
618 ///     }
619 /// }
620 /// ```
621 ///
622 /// [`collect`]: std::iter::Iterator::collect
623 impl<T, F> std::iter::FromIterator<F> for JoinSet<T>
624 where
625     F: Future<Output = T>,
626     F: Send + 'static,
627     T: Send + 'static,
628 {
from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self629     fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self {
630         let mut set = Self::new();
631         iter.into_iter().for_each(|task| {
632             set.spawn(task);
633         });
634         set
635     }
636 }
637 
638 // === impl Builder ===
639 
640 #[cfg(all(tokio_unstable, feature = "tracing"))]
641 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
642 impl<'a, T: 'static> Builder<'a, T> {
643     /// Assigns a name to the task which will be spawned.
name(self, name: &'a str) -> Self644     pub fn name(self, name: &'a str) -> Self {
645         let builder = self.builder.name(name);
646         Self { builder, ..self }
647     }
648 
649     /// Spawn the provided task with this builder's settings and store it in the
650     /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
651     /// cancel the task.
652     ///
653     /// # Returns
654     ///
655     /// An [`AbortHandle`] that can be used to remotely cancel the task.
656     ///
657     /// # Panics
658     ///
659     /// This method panics if called outside of a Tokio runtime.
660     ///
661     /// [`AbortHandle`]: crate::task::AbortHandle
662     #[track_caller]
spawn<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,663     pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
664     where
665         F: Future<Output = T>,
666         F: Send + 'static,
667         T: Send,
668     {
669         Ok(self.joinset.insert(self.builder.spawn(future)?))
670     }
671 
672     /// Spawn the provided task on the provided [runtime handle] with this
673     /// builder's settings, and store it in the [`JoinSet`].
674     ///
675     /// # Returns
676     ///
677     /// An [`AbortHandle`] that can be used to remotely cancel the task.
678     ///
679     ///
680     /// [`AbortHandle`]: crate::task::AbortHandle
681     /// [runtime handle]: crate::runtime::Handle
682     #[track_caller]
spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: Send + 'static, T: Send,683     pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
684     where
685         F: Future<Output = T>,
686         F: Send + 'static,
687         T: Send,
688     {
689         Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
690     }
691 
692     /// Spawn the blocking code on the blocking threadpool with this builder's
693     /// settings, and store it in the [`JoinSet`].
694     ///
695     /// # Returns
696     ///
697     /// An [`AbortHandle`] that can be used to remotely cancel the task.
698     ///
699     /// # Panics
700     ///
701     /// This method panics if called outside of a Tokio runtime.
702     ///
703     /// [`JoinSet`]: crate::task::JoinSet
704     /// [`AbortHandle`]: crate::task::AbortHandle
705     #[track_caller]
spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle> where F: FnOnce() -> T, F: Send + 'static, T: Send,706     pub fn spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle>
707     where
708         F: FnOnce() -> T,
709         F: Send + 'static,
710         T: Send,
711     {
712         Ok(self.joinset.insert(self.builder.spawn_blocking(f)?))
713     }
714 
715     /// Spawn the blocking code on the blocking threadpool of the provided
716     /// runtime handle with this builder's settings, and store it in the
717     /// [`JoinSet`].
718     ///
719     /// # Returns
720     ///
721     /// An [`AbortHandle`] that can be used to remotely cancel the task.
722     ///
723     /// [`JoinSet`]: crate::task::JoinSet
724     /// [`AbortHandle`]: crate::task::AbortHandle
725     #[track_caller]
spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle> where F: FnOnce() -> T, F: Send + 'static, T: Send,726     pub fn spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle>
727     where
728         F: FnOnce() -> T,
729         F: Send + 'static,
730         T: Send,
731     {
732         Ok(self
733             .joinset
734             .insert(self.builder.spawn_blocking_on(f, handle)?))
735     }
736 
737     /// Spawn the provided task on the current [`LocalSet`] with this builder's
738     /// settings, and store it in the [`JoinSet`].
739     ///
740     /// # Returns
741     ///
742     /// An [`AbortHandle`] that can be used to remotely cancel the task.
743     ///
744     /// # Panics
745     ///
746     /// This method panics if it is called outside of a `LocalSet`.
747     ///
748     /// [`LocalSet`]: crate::task::LocalSet
749     /// [`AbortHandle`]: crate::task::AbortHandle
750     #[track_caller]
spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,751     pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
752     where
753         F: Future<Output = T>,
754         F: 'static,
755     {
756         Ok(self.joinset.insert(self.builder.spawn_local(future)?))
757     }
758 
759     /// Spawn the provided task on the provided [`LocalSet`] with this builder's
760     /// settings, and store it in the [`JoinSet`].
761     ///
762     /// # Returns
763     ///
764     /// An [`AbortHandle`] that can be used to remotely cancel the task.
765     ///
766     /// [`LocalSet`]: crate::task::LocalSet
767     /// [`AbortHandle`]: crate::task::AbortHandle
768     #[track_caller]
spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle> where F: Future<Output = T>, F: 'static,769     pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
770     where
771         F: Future<Output = T>,
772         F: 'static,
773     {
774         Ok(self
775             .joinset
776             .insert(self.builder.spawn_local_on(future, local_set)?))
777     }
778 }
779 
780 // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
781 // `Debug`.
782 #[cfg(all(tokio_unstable, feature = "tracing"))]
783 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
784 impl<'a, T> fmt::Debug for Builder<'a, T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result785     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
786         f.debug_struct("join_set::Builder")
787             .field("joinset", &self.joinset)
788             .field("builder", &self.builder)
789             .finish()
790     }
791 }
792