tokio/task/
join_set.rs

1//! A collection of tasks spawned on a Tokio runtime.
2//!
3//! This module provides the [`JoinSet`] type, a collection which stores a set
4//! of spawned tasks and allows asynchronously awaiting the output of those
5//! tasks as they complete. See the documentation for the [`JoinSet`] type for
6//! details.
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::{fmt, panic};
11
12use crate::runtime::Handle;
13use crate::task::Id;
14use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet};
15use crate::util::IdleNotifiedSet;
16
17/// A collection of tasks spawned on a Tokio runtime.
18///
19/// A `JoinSet` can be used to await the completion of some or all of the tasks
20/// in the set. The set is not ordered, and the tasks will be returned in the
21/// order they complete.
22///
23/// All of the tasks must have the same return type `T`.
24///
25/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
26///
27/// # Examples
28///
29/// Spawn multiple tasks and wait for them.
30///
31/// ```
32/// use tokio::task::JoinSet;
33///
34/// # #[tokio::main(flavor = "current_thread")]
35/// # async fn main() {
36/// let mut set = JoinSet::new();
37///
38/// for i in 0..10 {
39///     set.spawn(async move { i });
40/// }
41///
42/// let mut seen = [false; 10];
43/// while let Some(res) = set.join_next().await {
44///     let idx = res.unwrap();
45///     seen[idx] = true;
46/// }
47///
48/// for i in 0..10 {
49///     assert!(seen[i]);
50/// }
51/// # }
52/// ```
53///
54/// # Task ID guarantees
55///
56/// While a task is tracked in a `JoinSet`, that task's ID is unique relative
57/// to all other running tasks in Tokio. For this purpose, tracking a task in a
58/// `JoinSet` is equivalent to holding a [`JoinHandle`] to it. See the [task ID]
59/// documentation for more info.
60///
61/// [`JoinHandle`]: crate::task::JoinHandle
62/// [task ID]: crate::task::Id
63#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
64pub struct JoinSet<T> {
65    inner: IdleNotifiedSet<JoinHandle<T>>,
66}
67
68/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
69/// than on the current default runtime.
70///
71/// [`task::Builder`]: crate::task::Builder
72#[cfg(all(tokio_unstable, feature = "tracing"))]
73#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
74#[must_use = "builders do nothing unless used to spawn a task"]
75pub struct Builder<'a, T> {
76    joinset: &'a mut JoinSet<T>,
77    builder: super::Builder<'a>,
78}
79
80impl<T> JoinSet<T> {
81    /// Create a new `JoinSet`.
82    pub fn new() -> Self {
83        Self {
84            inner: IdleNotifiedSet::new(),
85        }
86    }
87
88    /// Returns the number of tasks currently in the `JoinSet`.
89    pub fn len(&self) -> usize {
90        self.inner.len()
91    }
92
93    /// Returns whether the `JoinSet` is empty.
94    pub fn is_empty(&self) -> bool {
95        self.inner.is_empty()
96    }
97}
98
99impl<T: 'static> JoinSet<T> {
100    /// Returns a [`Builder`] that can be used to configure a task prior to
101    /// spawning it on this `JoinSet`.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use tokio::task::JoinSet;
107    ///
108    /// #[tokio::main]
109    /// async fn main() -> std::io::Result<()> {
110    ///     let mut set = JoinSet::new();
111    ///
112    ///     // Use the builder to configure a task's name before spawning it.
113    ///     set.build_task()
114    ///         .name("my_task")
115    ///         .spawn(async { /* ... */ })?;
116    ///
117    ///     Ok(())
118    /// }
119    /// ```
120    #[cfg(all(tokio_unstable, feature = "tracing"))]
121    #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
122    pub fn build_task(&mut self) -> Builder<'_, T> {
123        Builder {
124            builder: super::Builder::new(),
125            joinset: self,
126        }
127    }
128
129    /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
130    /// that can be used to remotely cancel the task.
131    ///
132    /// The provided future will start running in the background immediately
133    /// when this method is called, even if you don't await anything on this
134    /// `JoinSet`.
135    ///
136    /// # Panics
137    ///
138    /// This method panics if called outside of a Tokio runtime.
139    ///
140    /// [`AbortHandle`]: crate::task::AbortHandle
141    #[track_caller]
142    pub fn spawn<F>(&mut self, task: F) -> AbortHandle
143    where
144        F: Future<Output = T>,
145        F: Send + 'static,
146        T: Send,
147    {
148        self.insert(crate::spawn(task))
149    }
150
151    /// Spawn the provided task on the provided runtime and store it in this
152    /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
153    /// cancel the task.
154    ///
155    /// The provided future will start running in the background immediately
156    /// when this method is called, even if you don't await anything on this
157    /// `JoinSet`.
158    ///
159    /// [`AbortHandle`]: crate::task::AbortHandle
160    #[track_caller]
161    pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
162    where
163        F: Future<Output = T>,
164        F: Send + 'static,
165        T: Send,
166    {
167        self.insert(handle.spawn(task))
168    }
169
170    /// Spawn the provided task on the current [`LocalSet`] or [`LocalRuntime`]
171    /// and store it in this `JoinSet`, returning an [`AbortHandle`] that can
172    /// be used to remotely cancel the task.
173    ///
174    /// The provided future will start running in the background immediately
175    /// when this method is called, even if you don't await anything on this
176    /// `JoinSet`.
177    ///
178    /// # Panics
179    ///
180    /// This method panics if it is called outside of a `LocalSet`or `LocalRuntime`.
181    ///
182    /// [`LocalSet`]: crate::task::LocalSet
183    /// [`LocalRuntime`]: crate::runtime::LocalRuntime
184    /// [`AbortHandle`]: crate::task::AbortHandle
185    #[track_caller]
186    pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
187    where
188        F: Future<Output = T>,
189        F: 'static,
190    {
191        self.insert(crate::task::spawn_local(task))
192    }
193
194    /// Spawn the provided task on the provided [`LocalSet`] and store it in
195    /// this `JoinSet`, returning an [`AbortHandle`] that can be used to
196    /// remotely cancel the task.
197    ///
198    /// Unlike the [`spawn_local`] method, this method may be used to spawn local
199    /// tasks on a `LocalSet` that is _not_ currently running. The provided
200    /// future will start running whenever the `LocalSet` is next started.
201    ///
202    /// [`LocalSet`]: crate::task::LocalSet
203    /// [`AbortHandle`]: crate::task::AbortHandle
204    /// [`spawn_local`]: Self::spawn_local
205    #[track_caller]
206    pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
207    where
208        F: Future<Output = T>,
209        F: 'static,
210    {
211        self.insert(local_set.spawn_local(task))
212    }
213
214    /// Spawn the blocking code on the blocking threadpool and store
215    /// it in this `JoinSet`, returning an [`AbortHandle`] that can be
216    /// used to remotely cancel the task.
217    ///
218    /// # Examples
219    ///
220    /// Spawn multiple blocking tasks and wait for them.
221    ///
222    /// ```
223    /// # #[cfg(not(target_family = "wasm"))]
224    /// # {
225    /// use tokio::task::JoinSet;
226    ///
227    /// #[tokio::main]
228    /// async fn main() {
229    ///     let mut set = JoinSet::new();
230    ///
231    ///     for i in 0..10 {
232    ///         set.spawn_blocking(move || { i });
233    ///     }
234    ///
235    ///     let mut seen = [false; 10];
236    ///     while let Some(res) = set.join_next().await {
237    ///         let idx = res.unwrap();
238    ///         seen[idx] = true;
239    ///     }
240    ///
241    ///     for i in 0..10 {
242    ///         assert!(seen[i]);
243    ///     }
244    /// }
245    /// # }
246    /// ```
247    ///
248    /// # Panics
249    ///
250    /// This method panics if called outside of a Tokio runtime.
251    ///
252    /// [`AbortHandle`]: crate::task::AbortHandle
253    #[track_caller]
254    pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
255    where
256        F: FnOnce() -> T,
257        F: Send + 'static,
258        T: Send,
259    {
260        self.insert(crate::runtime::spawn_blocking(f))
261    }
262
263    /// Spawn the blocking code on the blocking threadpool of the
264    /// provided runtime and store it in this `JoinSet`, returning an
265    /// [`AbortHandle`] that can be used to remotely cancel the task.
266    ///
267    /// [`AbortHandle`]: crate::task::AbortHandle
268    #[track_caller]
269    pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
270    where
271        F: FnOnce() -> T,
272        F: Send + 'static,
273        T: Send,
274    {
275        self.insert(handle.spawn_blocking(f))
276    }
277
278    fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle {
279        let abort = jh.abort_handle();
280        let mut entry = self.inner.insert_idle(jh);
281
282        // Set the waker that is notified when the task completes.
283        entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
284        abort
285    }
286
287    /// Waits until one of the tasks in the set completes and returns its output.
288    ///
289    /// Returns `None` if the set is empty.
290    ///
291    /// # Cancel Safety
292    ///
293    /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
294    /// statement and some other branch completes first, it is guaranteed that no tasks were
295    /// removed from this `JoinSet`.
296    pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
297        std::future::poll_fn(|cx| self.poll_join_next(cx)).await
298    }
299
300    /// Waits until one of the tasks in the set completes and returns its
301    /// output, along with the [task ID] of the completed task.
302    ///
303    /// Returns `None` if the set is empty.
304    ///
305    /// When this method returns an error, then the id of the task that failed can be accessed
306    /// using the [`JoinError::id`] method.
307    ///
308    /// # Cancel Safety
309    ///
310    /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!`
311    /// statement and some other branch completes first, it is guaranteed that no tasks were
312    /// removed from this `JoinSet`.
313    ///
314    /// [task ID]: crate::task::Id
315    /// [`JoinError::id`]: fn@crate::task::JoinError::id
316    pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
317        std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
318    }
319
320    /// Tries to join one of the tasks in the set that has completed and return its output.
321    ///
322    /// Returns `None` if there are no completed tasks, or if the set is empty.
323    pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> {
324        // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
325        loop {
326            let mut entry = self.inner.try_pop_notified()?;
327
328            let res = entry.with_value_and_context(|jh, ctx| {
329                // Since this function is not async and cannot be forced to yield, we should
330                // disable budgeting when we want to check for the `JoinHandle` readiness.
331                Pin::new(&mut unconstrained(jh)).poll(ctx)
332            });
333
334            if let Poll::Ready(res) = res {
335                let _entry = entry.remove();
336
337                return Some(res);
338            }
339        }
340    }
341
342    /// Tries to join one of the tasks in the set that has completed and return its output,
343    /// along with the [task ID] of the completed task.
344    ///
345    /// Returns `None` if there are no completed tasks, or if the set is empty.
346    ///
347    /// When this method returns an error, then the id of the task that failed can be accessed
348    /// using the [`JoinError::id`] method.
349    ///
350    /// [task ID]: crate::task::Id
351    /// [`JoinError::id`]: fn@crate::task::JoinError::id
352    pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
353        // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
354        loop {
355            let mut entry = self.inner.try_pop_notified()?;
356
357            let res = entry.with_value_and_context(|jh, ctx| {
358                // Since this function is not async and cannot be forced to yield, we should
359                // disable budgeting when we want to check for the `JoinHandle` readiness.
360                Pin::new(&mut unconstrained(jh)).poll(ctx)
361            });
362
363            if let Poll::Ready(res) = res {
364                let entry = entry.remove();
365
366                return Some(res.map(|output| (entry.id(), output)));
367            }
368        }
369    }
370
371    /// Aborts all tasks and waits for them to finish shutting down.
372    ///
373    /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
374    /// a loop until it returns `None`.
375    ///
376    /// This method ignores any panics in the tasks shutting down. When this call returns, the
377    /// `JoinSet` will be empty.
378    ///
379    /// [`abort_all`]: fn@Self::abort_all
380    /// [`join_next`]: fn@Self::join_next
381    pub async fn shutdown(&mut self) {
382        self.abort_all();
383        while self.join_next().await.is_some() {}
384    }
385
386    /// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results.
387    ///
388    /// The results will be stored in the order they completed not the order they were spawned.
389    /// This is a convenience method that is equivalent to calling [`join_next`] in
390    /// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call
391    /// to `join_all` will panic and all remaining tasks on the `JoinSet` are
392    /// cancelled. To handle errors in any other way, manually call [`join_next`]
393    /// in a loop.
394    ///
395    /// # Examples
396    ///
397    /// Spawn multiple tasks and `join_all` them.
398    ///
399    /// ```
400    /// use tokio::task::JoinSet;
401    /// use std::time::Duration;
402    ///
403    /// # #[tokio::main(flavor = "current_thread")]
404    /// # async fn main() {
405    /// let mut set = JoinSet::new();
406    ///
407    /// for i in 0..3 {
408    ///     set.spawn(async move {
409    ///         tokio::time::sleep(Duration::from_secs(3 - i)).await;
410    ///         i
411    ///     });
412    /// }
413    ///
414    /// let output = set.join_all().await;
415    /// assert_eq!(output, vec![2, 1, 0]);
416    /// # }
417    /// ```
418    ///
419    /// Equivalent implementation of `join_all`, using [`join_next`] and loop.
420    ///
421    /// ```
422    /// use tokio::task::JoinSet;
423    /// use std::panic;
424    ///
425    /// # #[tokio::main(flavor = "current_thread")]
426    /// # async fn main() {
427    /// let mut set = JoinSet::new();
428    ///
429    /// for i in 0..3 {
430    ///     set.spawn(async move {i});
431    /// }
432    ///
433    /// let mut output = Vec::new();
434    /// while let Some(res) = set.join_next().await{
435    ///     match res {
436    ///         Ok(t) => output.push(t),
437    ///         Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
438    ///         Err(err) => panic!("{err}"),
439    ///     }
440    /// }
441    /// assert_eq!(output.len(),3);
442    /// # }
443    /// ```
444    /// [`join_next`]: fn@Self::join_next
445    /// [`JoinError::id`]: fn@crate::task::JoinError::id
446    pub async fn join_all(mut self) -> Vec<T> {
447        let mut output = Vec::with_capacity(self.len());
448
449        while let Some(res) = self.join_next().await {
450            match res {
451                Ok(t) => output.push(t),
452                Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
453                Err(err) => panic!("{err}"),
454            }
455        }
456        output
457    }
458
459    /// Aborts all tasks on this `JoinSet`.
460    ///
461    /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
462    /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty.
463    pub fn abort_all(&mut self) {
464        self.inner.for_each(|jh| jh.abort());
465    }
466
467    /// Removes all tasks from this `JoinSet` without aborting them.
468    ///
469    /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
470    /// is dropped.
471    pub fn detach_all(&mut self) {
472        self.inner.drain(drop);
473    }
474
475    /// Polls for one of the tasks in the set to complete.
476    ///
477    /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
478    ///
479    /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
480    /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
481    /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
482    /// scheduled to receive a wakeup.
483    ///
484    /// # Returns
485    ///
486    /// This function returns:
487    ///
488    ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
489    ///    available right now.
490    ///  * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
491    ///    The `value` is the return value of one of the tasks that completed.
492    ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
493    ///    aborted. The `err` is the `JoinError` from the panicked/aborted task.
494    ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
495    ///
496    /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
497    /// This can happen if the [coop budget] is reached.
498    ///
499    /// [coop budget]: crate::task::coop#cooperative-scheduling
500    pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
501        // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
502        // the `notified` list if the waker is notified in the `poll` call below.
503        let mut entry = match self.inner.pop_notified(cx.waker()) {
504            Some(entry) => entry,
505            None => {
506                if self.is_empty() {
507                    return Poll::Ready(None);
508                } else {
509                    // The waker was set by `pop_notified`.
510                    return Poll::Pending;
511                }
512            }
513        };
514
515        let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
516
517        if let Poll::Ready(res) = res {
518            let _entry = entry.remove();
519            Poll::Ready(Some(res))
520        } else {
521            // A JoinHandle generally won't emit a wakeup without being ready unless
522            // the coop limit has been reached. We yield to the executor in this
523            // case.
524            cx.waker().wake_by_ref();
525            Poll::Pending
526        }
527    }
528
529    /// Polls for one of the tasks in the set to complete.
530    ///
531    /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
532    ///
533    /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
534    /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
535    /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
536    /// scheduled to receive a wakeup.
537    ///
538    /// # Returns
539    ///
540    /// This function returns:
541    ///
542    ///  * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
543    ///    available right now.
544    ///  * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
545    ///    The `value` is the return value of one of the tasks that completed, and
546    ///    `id` is the [task ID] of that task.
547    ///  * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
548    ///    aborted. The `err` is the `JoinError` from the panicked/aborted task.
549    ///  * `Poll::Ready(None)` if the `JoinSet` is empty.
550    ///
551    /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
552    /// This can happen if the [coop budget] is reached.
553    ///
554    /// [coop budget]: crate::task::coop#cooperative-scheduling
555    /// [task ID]: crate::task::Id
556    pub fn poll_join_next_with_id(
557        &mut self,
558        cx: &mut Context<'_>,
559    ) -> Poll<Option<Result<(Id, T), JoinError>>> {
560        // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
561        // the `notified` list if the waker is notified in the `poll` call below.
562        let mut entry = match self.inner.pop_notified(cx.waker()) {
563            Some(entry) => entry,
564            None => {
565                if self.is_empty() {
566                    return Poll::Ready(None);
567                } else {
568                    // The waker was set by `pop_notified`.
569                    return Poll::Pending;
570                }
571            }
572        };
573
574        let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
575
576        if let Poll::Ready(res) = res {
577            let entry = entry.remove();
578            // If the task succeeded, add the task ID to the output. Otherwise, the
579            // `JoinError` will already have the task's ID.
580            Poll::Ready(Some(res.map(|output| (entry.id(), output))))
581        } else {
582            // A JoinHandle generally won't emit a wakeup without being ready unless
583            // the coop limit has been reached. We yield to the executor in this
584            // case.
585            cx.waker().wake_by_ref();
586            Poll::Pending
587        }
588    }
589}
590
591impl<T> Drop for JoinSet<T> {
592    fn drop(&mut self) {
593        self.inner.drain(|join_handle| join_handle.abort());
594    }
595}
596
597impl<T> fmt::Debug for JoinSet<T> {
598    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599        f.debug_struct("JoinSet").field("len", &self.len()).finish()
600    }
601}
602
603impl<T> Default for JoinSet<T> {
604    fn default() -> Self {
605        Self::new()
606    }
607}
608
609/// Collect an iterator of futures into a [`JoinSet`].
610///
611/// This is equivalent to calling [`JoinSet::spawn`] on each element of the iterator.
612///
613/// # Examples
614///
615/// The main example from [`JoinSet`]'s documentation can also be written using [`collect`]:
616///
617/// ```
618/// use tokio::task::JoinSet;
619///
620/// # #[tokio::main(flavor = "current_thread")]
621/// # async fn main() {
622/// let mut set: JoinSet<_> = (0..10).map(|i| async move { i }).collect();
623///
624/// let mut seen = [false; 10];
625/// while let Some(res) = set.join_next().await {
626///     let idx = res.unwrap();
627///     seen[idx] = true;
628/// }
629///
630/// for i in 0..10 {
631///      assert!(seen[i]);
632/// }
633/// # }
634/// ```
635///
636/// [`collect`]: std::iter::Iterator::collect
637impl<T, F> std::iter::FromIterator<F> for JoinSet<T>
638where
639    F: Future<Output = T>,
640    F: Send + 'static,
641    T: Send + 'static,
642{
643    fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self {
644        let mut set = Self::new();
645        iter.into_iter().for_each(|task| {
646            set.spawn(task);
647        });
648        set
649    }
650}
651
652// === impl Builder ===
653
654#[cfg(all(tokio_unstable, feature = "tracing"))]
655#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
656impl<'a, T: 'static> Builder<'a, T> {
657    /// Assigns a name to the task which will be spawned.
658    pub fn name(self, name: &'a str) -> Self {
659        let builder = self.builder.name(name);
660        Self { builder, ..self }
661    }
662
663    /// Spawn the provided task with this builder's settings and store it in the
664    /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
665    /// cancel the task.
666    ///
667    /// # Returns
668    ///
669    /// An [`AbortHandle`] that can be used to remotely cancel the task.
670    ///
671    /// # Panics
672    ///
673    /// This method panics if called outside of a Tokio runtime.
674    ///
675    /// [`AbortHandle`]: crate::task::AbortHandle
676    #[track_caller]
677    pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
678    where
679        F: Future<Output = T>,
680        F: Send + 'static,
681        T: Send,
682    {
683        Ok(self.joinset.insert(self.builder.spawn(future)?))
684    }
685
686    /// Spawn the provided task on the provided [runtime handle] with this
687    /// builder's settings, and store it in the [`JoinSet`].
688    ///
689    /// # Returns
690    ///
691    /// An [`AbortHandle`] that can be used to remotely cancel the task.
692    ///
693    ///
694    /// [`AbortHandle`]: crate::task::AbortHandle
695    /// [runtime handle]: crate::runtime::Handle
696    #[track_caller]
697    pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
698    where
699        F: Future<Output = T>,
700        F: Send + 'static,
701        T: Send,
702    {
703        Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
704    }
705
706    /// Spawn the blocking code on the blocking threadpool with this builder's
707    /// settings, and store it in the [`JoinSet`].
708    ///
709    /// # Returns
710    ///
711    /// An [`AbortHandle`] that can be used to remotely cancel the task.
712    ///
713    /// # Panics
714    ///
715    /// This method panics if called outside of a Tokio runtime.
716    ///
717    /// [`JoinSet`]: crate::task::JoinSet
718    /// [`AbortHandle`]: crate::task::AbortHandle
719    #[track_caller]
720    pub fn spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle>
721    where
722        F: FnOnce() -> T,
723        F: Send + 'static,
724        T: Send,
725    {
726        Ok(self.joinset.insert(self.builder.spawn_blocking(f)?))
727    }
728
729    /// Spawn the blocking code on the blocking threadpool of the provided
730    /// runtime handle with this builder's settings, and store it in the
731    /// [`JoinSet`].
732    ///
733    /// # Returns
734    ///
735    /// An [`AbortHandle`] that can be used to remotely cancel the task.
736    ///
737    /// [`JoinSet`]: crate::task::JoinSet
738    /// [`AbortHandle`]: crate::task::AbortHandle
739    #[track_caller]
740    pub fn spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle>
741    where
742        F: FnOnce() -> T,
743        F: Send + 'static,
744        T: Send,
745    {
746        Ok(self
747            .joinset
748            .insert(self.builder.spawn_blocking_on(f, handle)?))
749    }
750
751    /// Spawn the provided task on the current [`LocalSet`] with this builder's
752    /// settings, and store it in the [`JoinSet`].
753    ///
754    /// # Returns
755    ///
756    /// An [`AbortHandle`] that can be used to remotely cancel the task.
757    ///
758    /// # Panics
759    ///
760    /// This method panics if it is called outside of a `LocalSet`.
761    ///
762    /// [`LocalSet`]: crate::task::LocalSet
763    /// [`AbortHandle`]: crate::task::AbortHandle
764    #[track_caller]
765    pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
766    where
767        F: Future<Output = T>,
768        F: 'static,
769    {
770        Ok(self.joinset.insert(self.builder.spawn_local(future)?))
771    }
772
773    /// Spawn the provided task on the provided [`LocalSet`] with this builder's
774    /// settings, and store it in the [`JoinSet`].
775    ///
776    /// # Returns
777    ///
778    /// An [`AbortHandle`] that can be used to remotely cancel the task.
779    ///
780    /// [`LocalSet`]: crate::task::LocalSet
781    /// [`AbortHandle`]: crate::task::AbortHandle
782    #[track_caller]
783    pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
784    where
785        F: Future<Output = T>,
786        F: 'static,
787    {
788        Ok(self
789            .joinset
790            .insert(self.builder.spawn_local_on(future, local_set)?))
791    }
792}
793
794// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
795// `Debug`.
796#[cfg(all(tokio_unstable, feature = "tracing"))]
797#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
798impl<'a, T> fmt::Debug for Builder<'a, T> {
799    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800        f.debug_struct("join_set::Builder")
801            .field("joinset", &self.joinset)
802            .field("builder", &self.builder)
803            .finish()
804    }
805}