tokio/task/join_set.rs
1//! A collection of tasks spawned on a Tokio runtime.
2//!
3//! This module provides the [`JoinSet`] type, a collection which stores a set
4//! of spawned tasks and allows asynchronously awaiting the output of those
5//! tasks as they complete. See the documentation for the [`JoinSet`] type for
6//! details.
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::{fmt, panic};
11
12use crate::runtime::Handle;
13use crate::task::Id;
14use crate::task::{unconstrained, AbortHandle, JoinError, JoinHandle, LocalSet};
15use crate::util::IdleNotifiedSet;
16
17/// A collection of tasks spawned on a Tokio runtime.
18///
19/// A `JoinSet` can be used to await the completion of some or all of the tasks
20/// in the set. The set is not ordered, and the tasks will be returned in the
21/// order they complete.
22///
23/// All of the tasks must have the same return type `T`.
24///
25/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
26///
27/// # Examples
28///
29/// Spawn multiple tasks and wait for them.
30///
31/// ```
32/// use tokio::task::JoinSet;
33///
34/// # #[tokio::main(flavor = "current_thread")]
35/// # async fn main() {
36/// let mut set = JoinSet::new();
37///
38/// for i in 0..10 {
39/// set.spawn(async move { i });
40/// }
41///
42/// let mut seen = [false; 10];
43/// while let Some(res) = set.join_next().await {
44/// let idx = res.unwrap();
45/// seen[idx] = true;
46/// }
47///
48/// for i in 0..10 {
49/// assert!(seen[i]);
50/// }
51/// # }
52/// ```
53///
54/// # Task ID guarantees
55///
56/// While a task is tracked in a `JoinSet`, that task's ID is unique relative
57/// to all other running tasks in Tokio. For this purpose, tracking a task in a
58/// `JoinSet` is equivalent to holding a [`JoinHandle`] to it. See the [task ID]
59/// documentation for more info.
60///
61/// [`JoinHandle`]: crate::task::JoinHandle
62/// [task ID]: crate::task::Id
63#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
64pub struct JoinSet<T> {
65 inner: IdleNotifiedSet<JoinHandle<T>>,
66}
67
68/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
69/// than on the current default runtime.
70///
71/// [`task::Builder`]: crate::task::Builder
72#[cfg(all(tokio_unstable, feature = "tracing"))]
73#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
74#[must_use = "builders do nothing unless used to spawn a task"]
75pub struct Builder<'a, T> {
76 joinset: &'a mut JoinSet<T>,
77 builder: super::Builder<'a>,
78}
79
80impl<T> JoinSet<T> {
81 /// Create a new `JoinSet`.
82 pub fn new() -> Self {
83 Self {
84 inner: IdleNotifiedSet::new(),
85 }
86 }
87
88 /// Returns the number of tasks currently in the `JoinSet`.
89 pub fn len(&self) -> usize {
90 self.inner.len()
91 }
92
93 /// Returns whether the `JoinSet` is empty.
94 pub fn is_empty(&self) -> bool {
95 self.inner.is_empty()
96 }
97}
98
99impl<T: 'static> JoinSet<T> {
100 /// Returns a [`Builder`] that can be used to configure a task prior to
101 /// spawning it on this `JoinSet`.
102 ///
103 /// # Examples
104 ///
105 /// ```
106 /// use tokio::task::JoinSet;
107 ///
108 /// #[tokio::main]
109 /// async fn main() -> std::io::Result<()> {
110 /// let mut set = JoinSet::new();
111 ///
112 /// // Use the builder to configure a task's name before spawning it.
113 /// set.build_task()
114 /// .name("my_task")
115 /// .spawn(async { /* ... */ })?;
116 ///
117 /// Ok(())
118 /// }
119 /// ```
120 #[cfg(all(tokio_unstable, feature = "tracing"))]
121 #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
122 pub fn build_task(&mut self) -> Builder<'_, T> {
123 Builder {
124 builder: super::Builder::new(),
125 joinset: self,
126 }
127 }
128
129 /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
130 /// that can be used to remotely cancel the task.
131 ///
132 /// The provided future will start running in the background immediately
133 /// when this method is called, even if you don't await anything on this
134 /// `JoinSet`.
135 ///
136 /// # Panics
137 ///
138 /// This method panics if called outside of a Tokio runtime.
139 ///
140 /// [`AbortHandle`]: crate::task::AbortHandle
141 #[track_caller]
142 pub fn spawn<F>(&mut self, task: F) -> AbortHandle
143 where
144 F: Future<Output = T>,
145 F: Send + 'static,
146 T: Send,
147 {
148 self.insert(crate::spawn(task))
149 }
150
151 /// Spawn the provided task on the provided runtime and store it in this
152 /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely
153 /// cancel the task.
154 ///
155 /// The provided future will start running in the background immediately
156 /// when this method is called, even if you don't await anything on this
157 /// `JoinSet`.
158 ///
159 /// [`AbortHandle`]: crate::task::AbortHandle
160 #[track_caller]
161 pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
162 where
163 F: Future<Output = T>,
164 F: Send + 'static,
165 T: Send,
166 {
167 self.insert(handle.spawn(task))
168 }
169
170 /// Spawn the provided task on the current [`LocalSet`] or [`LocalRuntime`]
171 /// and store it in this `JoinSet`, returning an [`AbortHandle`] that can
172 /// be used to remotely cancel the task.
173 ///
174 /// The provided future will start running in the background immediately
175 /// when this method is called, even if you don't await anything on this
176 /// `JoinSet`.
177 ///
178 /// # Panics
179 ///
180 /// This method panics if it is called outside of a `LocalSet`or `LocalRuntime`.
181 ///
182 /// [`LocalSet`]: crate::task::LocalSet
183 /// [`LocalRuntime`]: crate::runtime::LocalRuntime
184 /// [`AbortHandle`]: crate::task::AbortHandle
185 #[track_caller]
186 pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
187 where
188 F: Future<Output = T>,
189 F: 'static,
190 {
191 self.insert(crate::task::spawn_local(task))
192 }
193
194 /// Spawn the provided task on the provided [`LocalSet`] and store it in
195 /// this `JoinSet`, returning an [`AbortHandle`] that can be used to
196 /// remotely cancel the task.
197 ///
198 /// Unlike the [`spawn_local`] method, this method may be used to spawn local
199 /// tasks on a `LocalSet` that is _not_ currently running. The provided
200 /// future will start running whenever the `LocalSet` is next started.
201 ///
202 /// [`LocalSet`]: crate::task::LocalSet
203 /// [`AbortHandle`]: crate::task::AbortHandle
204 /// [`spawn_local`]: Self::spawn_local
205 #[track_caller]
206 pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
207 where
208 F: Future<Output = T>,
209 F: 'static,
210 {
211 self.insert(local_set.spawn_local(task))
212 }
213
214 /// Spawn the blocking code on the blocking threadpool and store
215 /// it in this `JoinSet`, returning an [`AbortHandle`] that can be
216 /// used to remotely cancel the task.
217 ///
218 /// # Examples
219 ///
220 /// Spawn multiple blocking tasks and wait for them.
221 ///
222 /// ```
223 /// # #[cfg(not(target_family = "wasm"))]
224 /// # {
225 /// use tokio::task::JoinSet;
226 ///
227 /// #[tokio::main]
228 /// async fn main() {
229 /// let mut set = JoinSet::new();
230 ///
231 /// for i in 0..10 {
232 /// set.spawn_blocking(move || { i });
233 /// }
234 ///
235 /// let mut seen = [false; 10];
236 /// while let Some(res) = set.join_next().await {
237 /// let idx = res.unwrap();
238 /// seen[idx] = true;
239 /// }
240 ///
241 /// for i in 0..10 {
242 /// assert!(seen[i]);
243 /// }
244 /// }
245 /// # }
246 /// ```
247 ///
248 /// # Panics
249 ///
250 /// This method panics if called outside of a Tokio runtime.
251 ///
252 /// [`AbortHandle`]: crate::task::AbortHandle
253 #[track_caller]
254 pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
255 where
256 F: FnOnce() -> T,
257 F: Send + 'static,
258 T: Send,
259 {
260 self.insert(crate::runtime::spawn_blocking(f))
261 }
262
263 /// Spawn the blocking code on the blocking threadpool of the
264 /// provided runtime and store it in this `JoinSet`, returning an
265 /// [`AbortHandle`] that can be used to remotely cancel the task.
266 ///
267 /// [`AbortHandle`]: crate::task::AbortHandle
268 #[track_caller]
269 pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
270 where
271 F: FnOnce() -> T,
272 F: Send + 'static,
273 T: Send,
274 {
275 self.insert(handle.spawn_blocking(f))
276 }
277
278 fn insert(&mut self, jh: JoinHandle<T>) -> AbortHandle {
279 let abort = jh.abort_handle();
280 let mut entry = self.inner.insert_idle(jh);
281
282 // Set the waker that is notified when the task completes.
283 entry.with_value_and_context(|jh, ctx| jh.set_join_waker(ctx.waker()));
284 abort
285 }
286
287 /// Waits until one of the tasks in the set completes and returns its output.
288 ///
289 /// Returns `None` if the set is empty.
290 ///
291 /// # Cancel Safety
292 ///
293 /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
294 /// statement and some other branch completes first, it is guaranteed that no tasks were
295 /// removed from this `JoinSet`.
296 pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
297 std::future::poll_fn(|cx| self.poll_join_next(cx)).await
298 }
299
300 /// Waits until one of the tasks in the set completes and returns its
301 /// output, along with the [task ID] of the completed task.
302 ///
303 /// Returns `None` if the set is empty.
304 ///
305 /// When this method returns an error, then the id of the task that failed can be accessed
306 /// using the [`JoinError::id`] method.
307 ///
308 /// # Cancel Safety
309 ///
310 /// This method is cancel safe. If `join_next_with_id` is used as the event in a `tokio::select!`
311 /// statement and some other branch completes first, it is guaranteed that no tasks were
312 /// removed from this `JoinSet`.
313 ///
314 /// [task ID]: crate::task::Id
315 /// [`JoinError::id`]: fn@crate::task::JoinError::id
316 pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
317 std::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
318 }
319
320 /// Tries to join one of the tasks in the set that has completed and return its output.
321 ///
322 /// Returns `None` if there are no completed tasks, or if the set is empty.
323 pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> {
324 // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
325 loop {
326 let mut entry = self.inner.try_pop_notified()?;
327
328 let res = entry.with_value_and_context(|jh, ctx| {
329 // Since this function is not async and cannot be forced to yield, we should
330 // disable budgeting when we want to check for the `JoinHandle` readiness.
331 Pin::new(&mut unconstrained(jh)).poll(ctx)
332 });
333
334 if let Poll::Ready(res) = res {
335 let _entry = entry.remove();
336
337 return Some(res);
338 }
339 }
340 }
341
342 /// Tries to join one of the tasks in the set that has completed and return its output,
343 /// along with the [task ID] of the completed task.
344 ///
345 /// Returns `None` if there are no completed tasks, or if the set is empty.
346 ///
347 /// When this method returns an error, then the id of the task that failed can be accessed
348 /// using the [`JoinError::id`] method.
349 ///
350 /// [task ID]: crate::task::Id
351 /// [`JoinError::id`]: fn@crate::task::JoinError::id
352 pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
353 // Loop over all notified `JoinHandle`s to find one that's ready, or until none are left.
354 loop {
355 let mut entry = self.inner.try_pop_notified()?;
356
357 let res = entry.with_value_and_context(|jh, ctx| {
358 // Since this function is not async and cannot be forced to yield, we should
359 // disable budgeting when we want to check for the `JoinHandle` readiness.
360 Pin::new(&mut unconstrained(jh)).poll(ctx)
361 });
362
363 if let Poll::Ready(res) = res {
364 let entry = entry.remove();
365
366 return Some(res.map(|output| (entry.id(), output)));
367 }
368 }
369 }
370
371 /// Aborts all tasks and waits for them to finish shutting down.
372 ///
373 /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
374 /// a loop until it returns `None`.
375 ///
376 /// This method ignores any panics in the tasks shutting down. When this call returns, the
377 /// `JoinSet` will be empty.
378 ///
379 /// [`abort_all`]: fn@Self::abort_all
380 /// [`join_next`]: fn@Self::join_next
381 pub async fn shutdown(&mut self) {
382 self.abort_all();
383 while self.join_next().await.is_some() {}
384 }
385
386 /// Awaits the completion of all tasks in this `JoinSet`, returning a vector of their results.
387 ///
388 /// The results will be stored in the order they completed not the order they were spawned.
389 /// This is a convenience method that is equivalent to calling [`join_next`] in
390 /// a loop. If any tasks on the `JoinSet` fail with an [`JoinError`], then this call
391 /// to `join_all` will panic and all remaining tasks on the `JoinSet` are
392 /// cancelled. To handle errors in any other way, manually call [`join_next`]
393 /// in a loop.
394 ///
395 /// # Examples
396 ///
397 /// Spawn multiple tasks and `join_all` them.
398 ///
399 /// ```
400 /// use tokio::task::JoinSet;
401 /// use std::time::Duration;
402 ///
403 /// # #[tokio::main(flavor = "current_thread")]
404 /// # async fn main() {
405 /// let mut set = JoinSet::new();
406 ///
407 /// for i in 0..3 {
408 /// set.spawn(async move {
409 /// tokio::time::sleep(Duration::from_secs(3 - i)).await;
410 /// i
411 /// });
412 /// }
413 ///
414 /// let output = set.join_all().await;
415 /// assert_eq!(output, vec![2, 1, 0]);
416 /// # }
417 /// ```
418 ///
419 /// Equivalent implementation of `join_all`, using [`join_next`] and loop.
420 ///
421 /// ```
422 /// use tokio::task::JoinSet;
423 /// use std::panic;
424 ///
425 /// # #[tokio::main(flavor = "current_thread")]
426 /// # async fn main() {
427 /// let mut set = JoinSet::new();
428 ///
429 /// for i in 0..3 {
430 /// set.spawn(async move {i});
431 /// }
432 ///
433 /// let mut output = Vec::new();
434 /// while let Some(res) = set.join_next().await{
435 /// match res {
436 /// Ok(t) => output.push(t),
437 /// Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
438 /// Err(err) => panic!("{err}"),
439 /// }
440 /// }
441 /// assert_eq!(output.len(),3);
442 /// # }
443 /// ```
444 /// [`join_next`]: fn@Self::join_next
445 /// [`JoinError::id`]: fn@crate::task::JoinError::id
446 pub async fn join_all(mut self) -> Vec<T> {
447 let mut output = Vec::with_capacity(self.len());
448
449 while let Some(res) = self.join_next().await {
450 match res {
451 Ok(t) => output.push(t),
452 Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
453 Err(err) => panic!("{err}"),
454 }
455 }
456 output
457 }
458
459 /// Aborts all tasks on this `JoinSet`.
460 ///
461 /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete
462 /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty.
463 pub fn abort_all(&mut self) {
464 self.inner.for_each(|jh| jh.abort());
465 }
466
467 /// Removes all tasks from this `JoinSet` without aborting them.
468 ///
469 /// The tasks removed by this call will continue to run in the background even if the `JoinSet`
470 /// is dropped.
471 pub fn detach_all(&mut self) {
472 self.inner.drain(drop);
473 }
474
475 /// Polls for one of the tasks in the set to complete.
476 ///
477 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
478 ///
479 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
480 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
481 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
482 /// scheduled to receive a wakeup.
483 ///
484 /// # Returns
485 ///
486 /// This function returns:
487 ///
488 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
489 /// available right now.
490 /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
491 /// The `value` is the return value of one of the tasks that completed.
492 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
493 /// aborted. The `err` is the `JoinError` from the panicked/aborted task.
494 /// * `Poll::Ready(None)` if the `JoinSet` is empty.
495 ///
496 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
497 /// This can happen if the [coop budget] is reached.
498 ///
499 /// [coop budget]: crate::task::coop#cooperative-scheduling
500 pub fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
501 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
502 // the `notified` list if the waker is notified in the `poll` call below.
503 let mut entry = match self.inner.pop_notified(cx.waker()) {
504 Some(entry) => entry,
505 None => {
506 if self.is_empty() {
507 return Poll::Ready(None);
508 } else {
509 // The waker was set by `pop_notified`.
510 return Poll::Pending;
511 }
512 }
513 };
514
515 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
516
517 if let Poll::Ready(res) = res {
518 let _entry = entry.remove();
519 Poll::Ready(Some(res))
520 } else {
521 // A JoinHandle generally won't emit a wakeup without being ready unless
522 // the coop limit has been reached. We yield to the executor in this
523 // case.
524 cx.waker().wake_by_ref();
525 Poll::Pending
526 }
527 }
528
529 /// Polls for one of the tasks in the set to complete.
530 ///
531 /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
532 ///
533 /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
534 /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
535 /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
536 /// scheduled to receive a wakeup.
537 ///
538 /// # Returns
539 ///
540 /// This function returns:
541 ///
542 /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
543 /// available right now.
544 /// * `Poll::Ready(Some(Ok((id, value))))` if one of the tasks in this `JoinSet` has completed.
545 /// The `value` is the return value of one of the tasks that completed, and
546 /// `id` is the [task ID] of that task.
547 /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
548 /// aborted. The `err` is the `JoinError` from the panicked/aborted task.
549 /// * `Poll::Ready(None)` if the `JoinSet` is empty.
550 ///
551 /// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
552 /// This can happen if the [coop budget] is reached.
553 ///
554 /// [coop budget]: crate::task::coop#cooperative-scheduling
555 /// [task ID]: crate::task::Id
556 pub fn poll_join_next_with_id(
557 &mut self,
558 cx: &mut Context<'_>,
559 ) -> Poll<Option<Result<(Id, T), JoinError>>> {
560 // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
561 // the `notified` list if the waker is notified in the `poll` call below.
562 let mut entry = match self.inner.pop_notified(cx.waker()) {
563 Some(entry) => entry,
564 None => {
565 if self.is_empty() {
566 return Poll::Ready(None);
567 } else {
568 // The waker was set by `pop_notified`.
569 return Poll::Pending;
570 }
571 }
572 };
573
574 let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));
575
576 if let Poll::Ready(res) = res {
577 let entry = entry.remove();
578 // If the task succeeded, add the task ID to the output. Otherwise, the
579 // `JoinError` will already have the task's ID.
580 Poll::Ready(Some(res.map(|output| (entry.id(), output))))
581 } else {
582 // A JoinHandle generally won't emit a wakeup without being ready unless
583 // the coop limit has been reached. We yield to the executor in this
584 // case.
585 cx.waker().wake_by_ref();
586 Poll::Pending
587 }
588 }
589}
590
591impl<T> Drop for JoinSet<T> {
592 fn drop(&mut self) {
593 self.inner.drain(|join_handle| join_handle.abort());
594 }
595}
596
597impl<T> fmt::Debug for JoinSet<T> {
598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599 f.debug_struct("JoinSet").field("len", &self.len()).finish()
600 }
601}
602
603impl<T> Default for JoinSet<T> {
604 fn default() -> Self {
605 Self::new()
606 }
607}
608
609/// Collect an iterator of futures into a [`JoinSet`].
610///
611/// This is equivalent to calling [`JoinSet::spawn`] on each element of the iterator.
612///
613/// # Examples
614///
615/// The main example from [`JoinSet`]'s documentation can also be written using [`collect`]:
616///
617/// ```
618/// use tokio::task::JoinSet;
619///
620/// # #[tokio::main(flavor = "current_thread")]
621/// # async fn main() {
622/// let mut set: JoinSet<_> = (0..10).map(|i| async move { i }).collect();
623///
624/// let mut seen = [false; 10];
625/// while let Some(res) = set.join_next().await {
626/// let idx = res.unwrap();
627/// seen[idx] = true;
628/// }
629///
630/// for i in 0..10 {
631/// assert!(seen[i]);
632/// }
633/// # }
634/// ```
635///
636/// [`collect`]: std::iter::Iterator::collect
637impl<T, F> std::iter::FromIterator<F> for JoinSet<T>
638where
639 F: Future<Output = T>,
640 F: Send + 'static,
641 T: Send + 'static,
642{
643 fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self {
644 let mut set = Self::new();
645 iter.into_iter().for_each(|task| {
646 set.spawn(task);
647 });
648 set
649 }
650}
651
652// === impl Builder ===
653
654#[cfg(all(tokio_unstable, feature = "tracing"))]
655#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
656impl<'a, T: 'static> Builder<'a, T> {
657 /// Assigns a name to the task which will be spawned.
658 pub fn name(self, name: &'a str) -> Self {
659 let builder = self.builder.name(name);
660 Self { builder, ..self }
661 }
662
663 /// Spawn the provided task with this builder's settings and store it in the
664 /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
665 /// cancel the task.
666 ///
667 /// # Returns
668 ///
669 /// An [`AbortHandle`] that can be used to remotely cancel the task.
670 ///
671 /// # Panics
672 ///
673 /// This method panics if called outside of a Tokio runtime.
674 ///
675 /// [`AbortHandle`]: crate::task::AbortHandle
676 #[track_caller]
677 pub fn spawn<F>(self, future: F) -> std::io::Result<AbortHandle>
678 where
679 F: Future<Output = T>,
680 F: Send + 'static,
681 T: Send,
682 {
683 Ok(self.joinset.insert(self.builder.spawn(future)?))
684 }
685
686 /// Spawn the provided task on the provided [runtime handle] with this
687 /// builder's settings, and store it in the [`JoinSet`].
688 ///
689 /// # Returns
690 ///
691 /// An [`AbortHandle`] that can be used to remotely cancel the task.
692 ///
693 ///
694 /// [`AbortHandle`]: crate::task::AbortHandle
695 /// [runtime handle]: crate::runtime::Handle
696 #[track_caller]
697 pub fn spawn_on<F>(self, future: F, handle: &Handle) -> std::io::Result<AbortHandle>
698 where
699 F: Future<Output = T>,
700 F: Send + 'static,
701 T: Send,
702 {
703 Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?))
704 }
705
706 /// Spawn the blocking code on the blocking threadpool with this builder's
707 /// settings, and store it in the [`JoinSet`].
708 ///
709 /// # Returns
710 ///
711 /// An [`AbortHandle`] that can be used to remotely cancel the task.
712 ///
713 /// # Panics
714 ///
715 /// This method panics if called outside of a Tokio runtime.
716 ///
717 /// [`JoinSet`]: crate::task::JoinSet
718 /// [`AbortHandle`]: crate::task::AbortHandle
719 #[track_caller]
720 pub fn spawn_blocking<F>(self, f: F) -> std::io::Result<AbortHandle>
721 where
722 F: FnOnce() -> T,
723 F: Send + 'static,
724 T: Send,
725 {
726 Ok(self.joinset.insert(self.builder.spawn_blocking(f)?))
727 }
728
729 /// Spawn the blocking code on the blocking threadpool of the provided
730 /// runtime handle with this builder's settings, and store it in the
731 /// [`JoinSet`].
732 ///
733 /// # Returns
734 ///
735 /// An [`AbortHandle`] that can be used to remotely cancel the task.
736 ///
737 /// [`JoinSet`]: crate::task::JoinSet
738 /// [`AbortHandle`]: crate::task::AbortHandle
739 #[track_caller]
740 pub fn spawn_blocking_on<F>(self, f: F, handle: &Handle) -> std::io::Result<AbortHandle>
741 where
742 F: FnOnce() -> T,
743 F: Send + 'static,
744 T: Send,
745 {
746 Ok(self
747 .joinset
748 .insert(self.builder.spawn_blocking_on(f, handle)?))
749 }
750
751 /// Spawn the provided task on the current [`LocalSet`] with this builder's
752 /// settings, and store it in the [`JoinSet`].
753 ///
754 /// # Returns
755 ///
756 /// An [`AbortHandle`] that can be used to remotely cancel the task.
757 ///
758 /// # Panics
759 ///
760 /// This method panics if it is called outside of a `LocalSet`.
761 ///
762 /// [`LocalSet`]: crate::task::LocalSet
763 /// [`AbortHandle`]: crate::task::AbortHandle
764 #[track_caller]
765 pub fn spawn_local<F>(self, future: F) -> std::io::Result<AbortHandle>
766 where
767 F: Future<Output = T>,
768 F: 'static,
769 {
770 Ok(self.joinset.insert(self.builder.spawn_local(future)?))
771 }
772
773 /// Spawn the provided task on the provided [`LocalSet`] with this builder's
774 /// settings, and store it in the [`JoinSet`].
775 ///
776 /// # Returns
777 ///
778 /// An [`AbortHandle`] that can be used to remotely cancel the task.
779 ///
780 /// [`LocalSet`]: crate::task::LocalSet
781 /// [`AbortHandle`]: crate::task::AbortHandle
782 #[track_caller]
783 pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> std::io::Result<AbortHandle>
784 where
785 F: Future<Output = T>,
786 F: 'static,
787 {
788 Ok(self
789 .joinset
790 .insert(self.builder.spawn_local_on(future, local_set)?))
791 }
792}
793
794// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
795// `Debug`.
796#[cfg(all(tokio_unstable, feature = "tracing"))]
797#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
798impl<'a, T> fmt::Debug for Builder<'a, T> {
799 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800 f.debug_struct("join_set::Builder")
801 .field("joinset", &self.joinset)
802 .field("builder", &self.builder)
803 .finish()
804 }
805}