tokio/runtime/task/
join.rs

1use crate::runtime::task::{Header, RawTask};
2
3use std::fmt;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::pin::Pin;
8use std::task::{ready, Context, Poll, Waker};
9
10cfg_rt! {
11    /// An owned permission to join on a task (await its termination).
12    ///
13    /// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
14    /// for a Tokio task rather than a thread. Note that the background task
15    /// associated with this `JoinHandle` started running immediately when you
16    /// called spawn, even if you have not yet awaited the `JoinHandle`.
17    ///
18    /// A `JoinHandle` *detaches* the associated task when it is dropped, which
19    /// means that there is no longer any handle to the task, and no way to `join`
20    /// on it.
21    ///
22    /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
23    /// functions.
24    ///
25    /// # Cancel safety
26    ///
27    /// The `&mut JoinHandle<T>` type is cancel safe. If it is used as the event
28    /// in a `tokio::select!` statement and some other branch completes first,
29    /// then it is guaranteed that the output of the task is not lost.
30    ///
31    /// If a `JoinHandle` is dropped, then the task continues running in the
32    /// background and its return value is lost.
33    ///
34    /// # Examples
35    ///
36    /// Creation from [`task::spawn`]:
37    ///
38    /// ```
39    /// use tokio::task;
40    ///
41    /// # async fn doc() {
42    /// let join_handle: task::JoinHandle<_> = task::spawn(async {
43    ///     // some work here
44    /// });
45    /// # }
46    /// ```
47    ///
48    /// Creation from [`task::spawn_blocking`]:
49    ///
50    /// ```
51    /// use tokio::task;
52    ///
53    /// # async fn doc() {
54    /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
55    ///     // some blocking work here
56    /// });
57    /// # }
58    /// ```
59    ///
60    /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
61    /// If the return value is an `i32`, the join handle has type `JoinHandle<i32>`:
62    ///
63    /// ```
64    /// use tokio::task;
65    ///
66    /// # async fn doc() {
67    /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
68    ///     5 + 3
69    /// });
70    /// # }
71    ///
72    /// ```
73    ///
74    /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
75    ///
76    /// ```
77    /// use tokio::task;
78    ///
79    /// # async fn doc() {
80    /// let join_handle: task::JoinHandle<()> = task::spawn(async {
81    ///     println!("I return nothing.");
82    /// });
83    /// # }
84    /// ```
85    ///
86    /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
87    /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
88    /// to be double chained to extract the returned value:
89    ///
90    /// ```
91    /// use tokio::task;
92    /// use std::io;
93    ///
94    /// # #[tokio::main(flavor = "current_thread")]
95    /// # async fn main() -> io::Result<()> {
96    /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
97    ///     Ok(5 + 3)
98    /// });
99    ///
100    /// let result = join_handle.await??;
101    /// assert_eq!(result, 8);
102    /// Ok(())
103    /// # }
104    /// ```
105    ///
106    /// If the task panics, the error is a [`JoinError`] that contains the panic:
107    ///
108    /// ```
109    /// # #[cfg(not(target_family = "wasm"))]
110    /// # {
111    /// use tokio::task;
112    /// use std::io;
113    /// use std::panic;
114    ///
115    /// #[tokio::main]
116    /// async fn main() -> io::Result<()> {
117    ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
118    ///         panic!("boom");
119    ///     });
120    ///
121    ///     let err = join_handle.await.unwrap_err();
122    ///     assert!(err.is_panic());
123    ///     Ok(())
124    /// }
125    /// # }
126    /// ```
127    /// Child being detached and outliving its parent:
128    ///
129    /// ```no_run
130    /// use tokio::task;
131    /// use tokio::time;
132    /// use std::time::Duration;
133    ///
134    /// # #[tokio::main(flavor = "current_thread")]
135    /// # async fn main() {
136    /// let original_task = task::spawn(async {
137    ///     let _detached_task = task::spawn(async {
138    ///         // Here we sleep to make sure that the first task returns before.
139    ///         time::sleep(Duration::from_millis(10)).await;
140    ///         // This will be called, even though the JoinHandle is dropped.
141    ///         println!("♫ Still alive ♫");
142    ///     });
143    /// });
144    ///
145    /// original_task.await.expect("The task being joined has panicked");
146    /// println!("Original task is joined.");
147    ///
148    /// // We make sure that the new task has time to run, before the main
149    /// // task returns.
150    ///
151    /// time::sleep(Duration::from_millis(1000)).await;
152    /// # }
153    /// ```
154    ///
155    /// [`task::spawn`]: crate::task::spawn()
156    /// [`task::spawn_blocking`]: crate::task::spawn_blocking
157    /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
158    /// [`JoinError`]: crate::task::JoinError
159    pub struct JoinHandle<T> {
160        raw: RawTask,
161        _p: PhantomData<T>,
162    }
163}
164
165unsafe impl<T: Send> Send for JoinHandle<T> {}
166unsafe impl<T: Send> Sync for JoinHandle<T> {}
167
168impl<T> UnwindSafe for JoinHandle<T> {}
169impl<T> RefUnwindSafe for JoinHandle<T> {}
170
171impl<T> JoinHandle<T> {
172    pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
173        JoinHandle {
174            raw,
175            _p: PhantomData,
176        }
177    }
178
179    /// Abort the task associated with the handle.
180    ///
181    /// Awaiting a cancelled task might complete as usual if the task was
182    /// already completed at the time it was cancelled, but most likely it
183    /// will fail with a [cancelled] `JoinError`.
184    ///
185    /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
186    /// because they are not async. If you call `abort` on a `spawn_blocking`
187    /// task, then this *will not have any effect*, and the task will continue
188    /// running normally. The exception is if the task has not started running
189    /// yet; in that case, calling `abort` may prevent the task from starting.
190    ///
191    /// See also [the module level docs] for more information on cancellation.
192    ///
193    /// ```rust
194    /// use tokio::time;
195    ///
196    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
197    /// # async fn main() {
198    /// let mut handles = Vec::new();
199    ///
200    /// handles.push(tokio::spawn(async {
201    ///    time::sleep(time::Duration::from_secs(10)).await;
202    ///    true
203    /// }));
204    ///
205    /// handles.push(tokio::spawn(async {
206    ///    time::sleep(time::Duration::from_secs(10)).await;
207    ///    false
208    /// }));
209    ///
210    /// for handle in &handles {
211    ///     handle.abort();
212    /// }
213    ///
214    /// for handle in handles {
215    ///     assert!(handle.await.unwrap_err().is_cancelled());
216    /// }
217    /// # }
218    /// ```
219    ///
220    /// [cancelled]: method@super::error::JoinError::is_cancelled
221    /// [the module level docs]: crate::task#cancellation
222    /// [`spawn_blocking`]: crate::task::spawn_blocking
223    pub fn abort(&self) {
224        self.raw.remote_abort();
225    }
226
227    /// Checks if the task associated with this `JoinHandle` has finished.
228    ///
229    /// Please note that this method can return `false` even if [`abort`] has been
230    /// called on the task. This is because the cancellation process may take
231    /// some time, and this method does not return `true` until it has
232    /// completed.
233    ///
234    /// ```rust
235    /// use tokio::time;
236    ///
237    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
238    /// # async fn main() {
239    /// let handle1 = tokio::spawn(async {
240    ///     // do some stuff here
241    /// });
242    /// let handle2 = tokio::spawn(async {
243    ///     // do some other stuff here
244    ///     time::sleep(time::Duration::from_secs(10)).await;
245    /// });
246    /// // Wait for the task to finish
247    /// handle2.abort();
248    /// time::sleep(time::Duration::from_secs(1)).await;
249    /// assert!(handle1.is_finished());
250    /// assert!(handle2.is_finished());
251    /// # }
252    /// ```
253    /// [`abort`]: method@JoinHandle::abort
254    pub fn is_finished(&self) -> bool {
255        let state = self.raw.header().state.load();
256        state.is_complete()
257    }
258
259    /// Set the waker that is notified when the task completes.
260    pub(crate) fn set_join_waker(&mut self, waker: &Waker) {
261        if self.raw.try_set_join_waker(waker) {
262            // In this case the task has already completed. We wake the waker immediately.
263            waker.wake_by_ref();
264        }
265    }
266
267    /// Returns a new `AbortHandle` that can be used to remotely abort this task.
268    ///
269    /// Awaiting a task cancelled by the `AbortHandle` might complete as usual if the task was
270    /// already completed at the time it was cancelled, but most likely it
271    /// will fail with a [cancelled] `JoinError`.
272    ///
273    /// ```rust
274    /// use tokio::{time, task};
275    ///
276    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
277    /// # async fn main() {
278    /// let mut handles = Vec::new();
279    ///
280    /// handles.push(tokio::spawn(async {
281    ///    time::sleep(time::Duration::from_secs(10)).await;
282    ///    true
283    /// }));
284    ///
285    /// handles.push(tokio::spawn(async {
286    ///    time::sleep(time::Duration::from_secs(10)).await;
287    ///    false
288    /// }));
289    ///
290    /// let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect();
291    ///
292    /// for handle in abort_handles {
293    ///     handle.abort();
294    /// }
295    ///
296    /// for handle in handles {
297    ///     assert!(handle.await.unwrap_err().is_cancelled());
298    /// }
299    /// # }
300    /// ```
301    /// [cancelled]: method@super::error::JoinError::is_cancelled
302    #[must_use = "abort handles do nothing unless `.abort` is called"]
303    pub fn abort_handle(&self) -> super::AbortHandle {
304        self.raw.ref_inc();
305        super::AbortHandle::new(self.raw)
306    }
307
308    /// Returns a [task ID] that uniquely identifies this task relative to other
309    /// currently spawned tasks.
310    ///
311    /// [task ID]: crate::task::Id
312    pub fn id(&self) -> super::Id {
313        // Safety: The header pointer is valid.
314        unsafe { Header::get_id(self.raw.header_ptr()) }
315    }
316}
317
318impl<T> Unpin for JoinHandle<T> {}
319
320impl<T> Future for JoinHandle<T> {
321    type Output = super::Result<T>;
322
323    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
324        ready!(crate::trace::trace_leaf(cx));
325        let mut ret = Poll::Pending;
326
327        // Keep track of task budget
328        let coop = ready!(crate::task::coop::poll_proceed(cx));
329
330        // Try to read the task output. If the task is not yet complete, the
331        // waker is stored and is notified once the task does complete.
332        //
333        // The function must go via the vtable, which requires erasing generic
334        // types. To do this, the function "return" is placed on the stack
335        // **before** calling the function and is passed into the function using
336        // `*mut ()`.
337        //
338        // Safety:
339        //
340        // The type of `T` must match the task's output type.
341        unsafe {
342            self.raw
343                .try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
344        }
345
346        if ret.is_ready() {
347            coop.made_progress();
348        }
349
350        ret
351    }
352}
353
354impl<T> Drop for JoinHandle<T> {
355    fn drop(&mut self) {
356        if self.raw.state().drop_join_handle_fast().is_ok() {
357            return;
358        }
359
360        self.raw.drop_join_handle_slow();
361    }
362}
363
364impl<T> fmt::Debug for JoinHandle<T>
365where
366    T: fmt::Debug,
367{
368    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
369        // Safety: The header pointer is valid.
370        let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) };
371        let id = unsafe { id_ptr.as_ref() };
372        fmt.debug_struct("JoinHandle").field("id", id).finish()
373    }
374}