tokio/runtime/task/join.rs
1use crate::runtime::task::{Header, RawTask};
2
3use std::fmt;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::pin::Pin;
8use std::task::{ready, Context, Poll, Waker};
9
10cfg_rt! {
11 /// An owned permission to join on a task (await its termination).
12 ///
13 /// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
14 /// for a Tokio task rather than a thread. Note that the background task
15 /// associated with this `JoinHandle` started running immediately when you
16 /// called spawn, even if you have not yet awaited the `JoinHandle`.
17 ///
18 /// A `JoinHandle` *detaches* the associated task when it is dropped, which
19 /// means that there is no longer any handle to the task, and no way to `join`
20 /// on it.
21 ///
22 /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
23 /// functions.
24 ///
25 /// # Cancel safety
26 ///
27 /// The `&mut JoinHandle<T>` type is cancel safe. If it is used as the event
28 /// in a `tokio::select!` statement and some other branch completes first,
29 /// then it is guaranteed that the output of the task is not lost.
30 ///
31 /// If a `JoinHandle` is dropped, then the task continues running in the
32 /// background and its return value is lost.
33 ///
34 /// # Examples
35 ///
36 /// Creation from [`task::spawn`]:
37 ///
38 /// ```
39 /// use tokio::task;
40 ///
41 /// # async fn doc() {
42 /// let join_handle: task::JoinHandle<_> = task::spawn(async {
43 /// // some work here
44 /// });
45 /// # }
46 /// ```
47 ///
48 /// Creation from [`task::spawn_blocking`]:
49 ///
50 /// ```
51 /// use tokio::task;
52 ///
53 /// # async fn doc() {
54 /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
55 /// // some blocking work here
56 /// });
57 /// # }
58 /// ```
59 ///
60 /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
61 /// If the return value is an `i32`, the join handle has type `JoinHandle<i32>`:
62 ///
63 /// ```
64 /// use tokio::task;
65 ///
66 /// # async fn doc() {
67 /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
68 /// 5 + 3
69 /// });
70 /// # }
71 ///
72 /// ```
73 ///
74 /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
75 ///
76 /// ```
77 /// use tokio::task;
78 ///
79 /// # async fn doc() {
80 /// let join_handle: task::JoinHandle<()> = task::spawn(async {
81 /// println!("I return nothing.");
82 /// });
83 /// # }
84 /// ```
85 ///
86 /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
87 /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
88 /// to be double chained to extract the returned value:
89 ///
90 /// ```
91 /// use tokio::task;
92 /// use std::io;
93 ///
94 /// # #[tokio::main(flavor = "current_thread")]
95 /// # async fn main() -> io::Result<()> {
96 /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
97 /// Ok(5 + 3)
98 /// });
99 ///
100 /// let result = join_handle.await??;
101 /// assert_eq!(result, 8);
102 /// Ok(())
103 /// # }
104 /// ```
105 ///
106 /// If the task panics, the error is a [`JoinError`] that contains the panic:
107 ///
108 /// ```
109 /// # #[cfg(not(target_family = "wasm"))]
110 /// # {
111 /// use tokio::task;
112 /// use std::io;
113 /// use std::panic;
114 ///
115 /// #[tokio::main]
116 /// async fn main() -> io::Result<()> {
117 /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
118 /// panic!("boom");
119 /// });
120 ///
121 /// let err = join_handle.await.unwrap_err();
122 /// assert!(err.is_panic());
123 /// Ok(())
124 /// }
125 /// # }
126 /// ```
127 /// Child being detached and outliving its parent:
128 ///
129 /// ```no_run
130 /// use tokio::task;
131 /// use tokio::time;
132 /// use std::time::Duration;
133 ///
134 /// # #[tokio::main(flavor = "current_thread")]
135 /// # async fn main() {
136 /// let original_task = task::spawn(async {
137 /// let _detached_task = task::spawn(async {
138 /// // Here we sleep to make sure that the first task returns before.
139 /// time::sleep(Duration::from_millis(10)).await;
140 /// // This will be called, even though the JoinHandle is dropped.
141 /// println!("♫ Still alive ♫");
142 /// });
143 /// });
144 ///
145 /// original_task.await.expect("The task being joined has panicked");
146 /// println!("Original task is joined.");
147 ///
148 /// // We make sure that the new task has time to run, before the main
149 /// // task returns.
150 ///
151 /// time::sleep(Duration::from_millis(1000)).await;
152 /// # }
153 /// ```
154 ///
155 /// [`task::spawn`]: crate::task::spawn()
156 /// [`task::spawn_blocking`]: crate::task::spawn_blocking
157 /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
158 /// [`JoinError`]: crate::task::JoinError
159 pub struct JoinHandle<T> {
160 raw: RawTask,
161 _p: PhantomData<T>,
162 }
163}
164
165unsafe impl<T: Send> Send for JoinHandle<T> {}
166unsafe impl<T: Send> Sync for JoinHandle<T> {}
167
168impl<T> UnwindSafe for JoinHandle<T> {}
169impl<T> RefUnwindSafe for JoinHandle<T> {}
170
171impl<T> JoinHandle<T> {
172 pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
173 JoinHandle {
174 raw,
175 _p: PhantomData,
176 }
177 }
178
179 /// Abort the task associated with the handle.
180 ///
181 /// Awaiting a cancelled task might complete as usual if the task was
182 /// already completed at the time it was cancelled, but most likely it
183 /// will fail with a [cancelled] `JoinError`.
184 ///
185 /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
186 /// because they are not async. If you call `abort` on a `spawn_blocking`
187 /// task, then this *will not have any effect*, and the task will continue
188 /// running normally. The exception is if the task has not started running
189 /// yet; in that case, calling `abort` may prevent the task from starting.
190 ///
191 /// See also [the module level docs] for more information on cancellation.
192 ///
193 /// ```rust
194 /// use tokio::time;
195 ///
196 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
197 /// # async fn main() {
198 /// let mut handles = Vec::new();
199 ///
200 /// handles.push(tokio::spawn(async {
201 /// time::sleep(time::Duration::from_secs(10)).await;
202 /// true
203 /// }));
204 ///
205 /// handles.push(tokio::spawn(async {
206 /// time::sleep(time::Duration::from_secs(10)).await;
207 /// false
208 /// }));
209 ///
210 /// for handle in &handles {
211 /// handle.abort();
212 /// }
213 ///
214 /// for handle in handles {
215 /// assert!(handle.await.unwrap_err().is_cancelled());
216 /// }
217 /// # }
218 /// ```
219 ///
220 /// [cancelled]: method@super::error::JoinError::is_cancelled
221 /// [the module level docs]: crate::task#cancellation
222 /// [`spawn_blocking`]: crate::task::spawn_blocking
223 pub fn abort(&self) {
224 self.raw.remote_abort();
225 }
226
227 /// Checks if the task associated with this `JoinHandle` has finished.
228 ///
229 /// Please note that this method can return `false` even if [`abort`] has been
230 /// called on the task. This is because the cancellation process may take
231 /// some time, and this method does not return `true` until it has
232 /// completed.
233 ///
234 /// ```rust
235 /// use tokio::time;
236 ///
237 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
238 /// # async fn main() {
239 /// let handle1 = tokio::spawn(async {
240 /// // do some stuff here
241 /// });
242 /// let handle2 = tokio::spawn(async {
243 /// // do some other stuff here
244 /// time::sleep(time::Duration::from_secs(10)).await;
245 /// });
246 /// // Wait for the task to finish
247 /// handle2.abort();
248 /// time::sleep(time::Duration::from_secs(1)).await;
249 /// assert!(handle1.is_finished());
250 /// assert!(handle2.is_finished());
251 /// # }
252 /// ```
253 /// [`abort`]: method@JoinHandle::abort
254 pub fn is_finished(&self) -> bool {
255 let state = self.raw.header().state.load();
256 state.is_complete()
257 }
258
259 /// Set the waker that is notified when the task completes.
260 pub(crate) fn set_join_waker(&mut self, waker: &Waker) {
261 if self.raw.try_set_join_waker(waker) {
262 // In this case the task has already completed. We wake the waker immediately.
263 waker.wake_by_ref();
264 }
265 }
266
267 /// Returns a new `AbortHandle` that can be used to remotely abort this task.
268 ///
269 /// Awaiting a task cancelled by the `AbortHandle` might complete as usual if the task was
270 /// already completed at the time it was cancelled, but most likely it
271 /// will fail with a [cancelled] `JoinError`.
272 ///
273 /// ```rust
274 /// use tokio::{time, task};
275 ///
276 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
277 /// # async fn main() {
278 /// let mut handles = Vec::new();
279 ///
280 /// handles.push(tokio::spawn(async {
281 /// time::sleep(time::Duration::from_secs(10)).await;
282 /// true
283 /// }));
284 ///
285 /// handles.push(tokio::spawn(async {
286 /// time::sleep(time::Duration::from_secs(10)).await;
287 /// false
288 /// }));
289 ///
290 /// let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect();
291 ///
292 /// for handle in abort_handles {
293 /// handle.abort();
294 /// }
295 ///
296 /// for handle in handles {
297 /// assert!(handle.await.unwrap_err().is_cancelled());
298 /// }
299 /// # }
300 /// ```
301 /// [cancelled]: method@super::error::JoinError::is_cancelled
302 #[must_use = "abort handles do nothing unless `.abort` is called"]
303 pub fn abort_handle(&self) -> super::AbortHandle {
304 self.raw.ref_inc();
305 super::AbortHandle::new(self.raw)
306 }
307
308 /// Returns a [task ID] that uniquely identifies this task relative to other
309 /// currently spawned tasks.
310 ///
311 /// [task ID]: crate::task::Id
312 pub fn id(&self) -> super::Id {
313 // Safety: The header pointer is valid.
314 unsafe { Header::get_id(self.raw.header_ptr()) }
315 }
316}
317
318impl<T> Unpin for JoinHandle<T> {}
319
320impl<T> Future for JoinHandle<T> {
321 type Output = super::Result<T>;
322
323 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
324 ready!(crate::trace::trace_leaf(cx));
325 let mut ret = Poll::Pending;
326
327 // Keep track of task budget
328 let coop = ready!(crate::task::coop::poll_proceed(cx));
329
330 // Try to read the task output. If the task is not yet complete, the
331 // waker is stored and is notified once the task does complete.
332 //
333 // The function must go via the vtable, which requires erasing generic
334 // types. To do this, the function "return" is placed on the stack
335 // **before** calling the function and is passed into the function using
336 // `*mut ()`.
337 //
338 // Safety:
339 //
340 // The type of `T` must match the task's output type.
341 unsafe {
342 self.raw
343 .try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
344 }
345
346 if ret.is_ready() {
347 coop.made_progress();
348 }
349
350 ret
351 }
352}
353
354impl<T> Drop for JoinHandle<T> {
355 fn drop(&mut self) {
356 if self.raw.state().drop_join_handle_fast().is_ok() {
357 return;
358 }
359
360 self.raw.drop_join_handle_slow();
361 }
362}
363
364impl<T> fmt::Debug for JoinHandle<T>
365where
366 T: fmt::Debug,
367{
368 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
369 // Safety: The header pointer is valid.
370 let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) };
371 let id = unsafe { id_ptr.as_ref() };
372 fmt.debug_struct("JoinHandle").field("id", id).finish()
373 }
374}