tokio/task/local.rs
1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4#[cfg(tokio_unstable)]
5use crate::runtime;
6use crate::runtime::task::{
7 self, JoinHandle, LocalOwnedTasks, SpawnLocation, Task, TaskHarnessScheduleHooks,
8};
9use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
10use crate::sync::AtomicWaker;
11use crate::util::trace::SpawnMeta;
12use crate::util::RcCell;
13
14use std::cell::Cell;
15use std::collections::VecDeque;
16use std::fmt;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::mem;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::task::Poll;
23
24use pin_project_lite::pin_project;
25
26cfg_rt! {
27 /// A set of tasks which are executed on the same thread.
28 ///
29 /// In some cases, it is necessary to run one or more futures that do not
30 /// implement [`Send`] and thus are unsafe to send between threads. In these
31 /// cases, a [local task set] may be used to schedule one or more `!Send`
32 /// futures to run together on the same thread.
33 ///
34 /// For example, the following code will not compile:
35 ///
36 /// ```rust,compile_fail
37 /// use std::rc::Rc;
38 ///
39 /// #[tokio::main]
40 /// async fn main() {
41 /// // `Rc` does not implement `Send`, and thus may not be sent between
42 /// // threads safely.
43 /// let nonsend_data = Rc::new("my nonsend data...");
44 ///
45 /// let nonsend_data = nonsend_data.clone();
46 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
47 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
48 /// // will not compile.
49 /// tokio::spawn(async move {
50 /// println!("{}", nonsend_data);
51 /// // ...
52 /// }).await.unwrap();
53 /// }
54 /// ```
55 ///
56 /// # Use with `run_until`
57 ///
58 /// To spawn `!Send` futures, we can use a local task set to schedule them
59 /// on the thread calling [`Runtime::block_on`]. When running inside of the
60 /// local task set, we can use [`task::spawn_local`], which can spawn
61 /// `!Send` futures. For example:
62 ///
63 /// ```rust
64 /// use std::rc::Rc;
65 /// use tokio::task;
66 ///
67 /// # #[tokio::main(flavor = "current_thread")]
68 /// # async fn main() {
69 /// let nonsend_data = Rc::new("my nonsend data...");
70 ///
71 /// // Construct a local task set that can run `!Send` futures.
72 /// let local = task::LocalSet::new();
73 ///
74 /// // Run the local task set.
75 /// local.run_until(async move {
76 /// let nonsend_data = nonsend_data.clone();
77 /// // `spawn_local` ensures that the future is spawned on the local
78 /// // task set.
79 /// task::spawn_local(async move {
80 /// println!("{}", nonsend_data);
81 /// // ...
82 /// }).await.unwrap();
83 /// }).await;
84 /// # }
85 /// ```
86 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
87 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
88 /// cannot be used inside a task spawned with `tokio::spawn`.
89 ///
90 /// ## Awaiting a `LocalSet`
91 ///
92 /// Additionally, a `LocalSet` itself implements `Future`, completing when
93 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
94 /// several futures on a `LocalSet` and drive the whole set until they
95 /// complete. For example,
96 ///
97 /// ```rust
98 /// use tokio::{task, time};
99 /// use std::rc::Rc;
100 ///
101 /// # #[tokio::main(flavor = "current_thread")]
102 /// # async fn main() {
103 /// let nonsend_data = Rc::new("world");
104 /// let local = task::LocalSet::new();
105 ///
106 /// let nonsend_data2 = nonsend_data.clone();
107 /// local.spawn_local(async move {
108 /// // ...
109 /// println!("hello {}", nonsend_data2)
110 /// });
111 ///
112 /// local.spawn_local(async move {
113 /// time::sleep(time::Duration::from_millis(100)).await;
114 /// println!("goodbye {}", nonsend_data)
115 /// });
116 ///
117 /// // ...
118 ///
119 /// local.await;
120 /// # }
121 /// ```
122 /// **Note:** Awaiting a `LocalSet` can only be done inside
123 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
124 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
125 /// `tokio::spawn`.
126 ///
127 /// ## Use inside `tokio::spawn`
128 ///
129 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
130 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
131 /// something else. The solution is to create the `LocalSet` somewhere else,
132 /// and communicate with it using an [`mpsc`] channel.
133 ///
134 /// The following example puts the `LocalSet` inside a new thread.
135 /// ```
136 /// # #[cfg(not(target_family = "wasm"))]
137 /// # {
138 /// use tokio::runtime::Builder;
139 /// use tokio::sync::{mpsc, oneshot};
140 /// use tokio::task::LocalSet;
141 ///
142 /// // This struct describes the task you want to spawn. Here we include
143 /// // some simple examples. The oneshot channel allows sending a response
144 /// // to the spawner.
145 /// #[derive(Debug)]
146 /// enum Task {
147 /// PrintNumber(u32),
148 /// AddOne(u32, oneshot::Sender<u32>),
149 /// }
150 ///
151 /// #[derive(Clone)]
152 /// struct LocalSpawner {
153 /// send: mpsc::UnboundedSender<Task>,
154 /// }
155 ///
156 /// impl LocalSpawner {
157 /// pub fn new() -> Self {
158 /// let (send, mut recv) = mpsc::unbounded_channel();
159 ///
160 /// let rt = Builder::new_current_thread()
161 /// .enable_all()
162 /// .build()
163 /// .unwrap();
164 ///
165 /// std::thread::spawn(move || {
166 /// let local = LocalSet::new();
167 ///
168 /// local.spawn_local(async move {
169 /// while let Some(new_task) = recv.recv().await {
170 /// tokio::task::spawn_local(run_task(new_task));
171 /// }
172 /// // If the while loop returns, then all the LocalSpawner
173 /// // objects have been dropped.
174 /// });
175 ///
176 /// // This will return once all senders are dropped and all
177 /// // spawned tasks have returned.
178 /// rt.block_on(local);
179 /// });
180 ///
181 /// Self {
182 /// send,
183 /// }
184 /// }
185 ///
186 /// pub fn spawn(&self, task: Task) {
187 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
188 /// }
189 /// }
190 ///
191 /// // This task may do !Send stuff. We use printing a number as an example,
192 /// // but it could be anything.
193 /// //
194 /// // The Task struct is an enum to support spawning many different kinds
195 /// // of operations.
196 /// async fn run_task(task: Task) {
197 /// match task {
198 /// Task::PrintNumber(n) => {
199 /// println!("{}", n);
200 /// },
201 /// Task::AddOne(n, response) => {
202 /// // We ignore failures to send the response.
203 /// let _ = response.send(n + 1);
204 /// },
205 /// }
206 /// }
207 ///
208 /// #[tokio::main]
209 /// async fn main() {
210 /// let spawner = LocalSpawner::new();
211 ///
212 /// let (send, response) = oneshot::channel();
213 /// spawner.spawn(Task::AddOne(10, send));
214 /// let eleven = response.await.unwrap();
215 /// assert_eq!(eleven, 11);
216 /// }
217 /// # }
218 /// ```
219 ///
220 /// [`Send`]: trait@std::marker::Send
221 /// [local task set]: struct@LocalSet
222 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
223 /// [`task::spawn_local`]: fn@spawn_local
224 /// [`mpsc`]: mod@crate::sync::mpsc
225 pub struct LocalSet {
226 /// Current scheduler tick.
227 tick: Cell<u8>,
228
229 /// State available from thread-local.
230 context: Rc<Context>,
231
232 /// This type should not be Send.
233 _not_send: PhantomData<*const ()>,
234 }
235}
236
237/// State available from the thread-local.
238struct Context {
239 /// State shared between threads.
240 shared: Arc<Shared>,
241
242 /// True if a task panicked without being handled and the local set is
243 /// configured to shutdown on unhandled panic.
244 unhandled_panic: Cell<bool>,
245}
246
247/// `LocalSet` state shared between threads.
248struct Shared {
249 /// # Safety
250 ///
251 /// This field must *only* be accessed from the thread that owns the
252 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
253 local_state: LocalState,
254
255 /// Remote run queue sender.
256 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
257
258 /// Wake the `LocalSet` task.
259 waker: AtomicWaker,
260
261 /// How to respond to unhandled task panics.
262 #[cfg(tokio_unstable)]
263 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
264}
265
266/// Tracks the `LocalSet` state that must only be accessed from the thread that
267/// created the `LocalSet`.
268struct LocalState {
269 /// The `ThreadId` of the thread that owns the `LocalSet`.
270 owner: ThreadId,
271
272 /// Local run queue sender and receiver.
273 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
274
275 /// Collection of all active tasks spawned onto this executor.
276 owned: LocalOwnedTasks<Arc<Shared>>,
277}
278
279pin_project! {
280 #[derive(Debug)]
281 struct RunUntil<'a, F> {
282 local_set: &'a LocalSet,
283 #[pin]
284 future: F,
285 }
286}
287
288tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
289 ctx: RcCell::new(),
290 wake_on_schedule: Cell::new(false),
291} });
292
293struct LocalData {
294 ctx: RcCell<Context>,
295 wake_on_schedule: Cell<bool>,
296}
297
298impl LocalData {
299 /// Should be called except when we call `LocalSet::enter`.
300 /// Especially when we poll a `LocalSet`.
301 #[must_use = "dropping this guard will reset the entered state"]
302 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
303 let ctx = self.ctx.replace(Some(ctx));
304 let wake_on_schedule = self.wake_on_schedule.replace(false);
305 LocalDataEnterGuard {
306 local_data_ref: self,
307 ctx,
308 wake_on_schedule,
309 }
310 }
311}
312
313/// A guard for `LocalData::enter()`
314struct LocalDataEnterGuard<'a> {
315 local_data_ref: &'a LocalData,
316 ctx: Option<Rc<Context>>,
317 wake_on_schedule: bool,
318}
319
320impl<'a> Drop for LocalDataEnterGuard<'a> {
321 fn drop(&mut self) {
322 self.local_data_ref.ctx.set(self.ctx.take());
323 self.local_data_ref
324 .wake_on_schedule
325 .set(self.wake_on_schedule)
326 }
327}
328
329cfg_rt! {
330 /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
331 ///
332 /// This is possible when either using one of these types
333 /// explicitly, or (with `tokio_unstable`) by opting to use the
334 /// `"local"` runtime flavor in `tokio::main`:
335 ///
336 /// ```ignore
337 /// #[tokio::main(flavor = "local")]
338 /// ```
339 ///
340 /// The spawned future will run on the same thread that called `spawn_local`.
341 ///
342 /// The provided future will start running in the background immediately
343 /// when `spawn_local` is called, even if you don't await the returned
344 /// `JoinHandle`.
345 ///
346 /// # Panics
347 ///
348 /// This function panics if called outside of a [`LocalSet`] or [`LocalRuntime`].
349 ///
350 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
351 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
352 /// `spawn_local` if you want to stay within the `LocalSet`.
353 ///
354 /// # Examples
355 ///
356 /// ```rust
357 /// use std::rc::Rc;
358 /// use tokio::task;
359 ///
360 /// # #[tokio::main(flavor = "current_thread")]
361 /// # async fn main() {
362 /// let nonsend_data = Rc::new("my nonsend data...");
363 ///
364 /// let local = task::LocalSet::new();
365 ///
366 /// // Run the local task set.
367 /// local.run_until(async move {
368 /// let nonsend_data = nonsend_data.clone();
369 /// task::spawn_local(async move {
370 /// println!("{}", nonsend_data);
371 /// // ...
372 /// }).await.unwrap();
373 /// }).await;
374 /// # }
375 /// ```
376 ///
377 /// [`LocalSet`]: struct@crate::task::LocalSet
378 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
379 /// [`tokio::spawn`]: fn@crate::task::spawn
380 #[track_caller]
381 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
382 where
383 F: Future + 'static,
384 F::Output: 'static,
385 {
386 let fut_size = std::mem::size_of::<F>();
387 if fut_size > BOX_FUTURE_THRESHOLD {
388 spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
389 } else {
390 spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
391 }
392 }
393
394
395 #[track_caller]
396 pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
397 where F: Future + 'static,
398 F::Output: 'static
399 {
400 use crate::runtime::{context, task};
401
402 let mut future = Some(future);
403
404 let res = context::with_current(|handle| {
405 Some(if handle.is_local() {
406 if !handle.can_spawn_local_on_local_runtime() {
407 return None;
408 }
409
410 let future = future.take().unwrap();
411
412 #[cfg(all(
413 tokio_unstable,
414 feature = "taskdump",
415 feature = "rt",
416 target_os = "linux",
417 any(
418 target_arch = "aarch64",
419 target_arch = "x86",
420 target_arch = "x86_64"
421 )
422 ))]
423 let future = task::trace::Trace::root(future);
424 let id = task::Id::next();
425 let task = crate::util::trace::task(future, "task", meta, id.as_u64());
426
427 // safety: we have verified that this is a `LocalRuntime` owned by the current thread
428 unsafe { handle.spawn_local(task, id, meta.spawned_at) }
429 } else {
430 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
431 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
432 Some(cx) => cx.spawn(future.take().unwrap(), meta)
433 }
434 })
435 });
436
437 match res {
438 Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
439 Ok(Some(join_handle)) => join_handle,
440 Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
441 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
442 Some(cx) => cx.spawn(future.unwrap(), meta)
443 }
444 }
445 }
446}
447
448/// Initial queue capacity.
449const INITIAL_CAPACITY: usize = 64;
450
451/// Max number of tasks to poll per tick.
452const MAX_TASKS_PER_TICK: usize = 61;
453
454/// How often it check the remote queue first.
455const REMOTE_FIRST_INTERVAL: u8 = 31;
456
457/// Context guard for `LocalSet`
458pub struct LocalEnterGuard {
459 ctx: Option<Rc<Context>>,
460
461 /// Distinguishes whether the context was entered or being polled.
462 /// When we enter it, the value `wake_on_schedule` is set. In this case
463 /// `spawn_local` refers the context, whereas it is not being polled now.
464 wake_on_schedule: bool,
465}
466
467impl Drop for LocalEnterGuard {
468 fn drop(&mut self) {
469 CURRENT.with(
470 |LocalData {
471 ctx,
472 wake_on_schedule,
473 }| {
474 ctx.set(self.ctx.take());
475 wake_on_schedule.set(self.wake_on_schedule);
476 },
477 );
478 }
479}
480
481impl fmt::Debug for LocalEnterGuard {
482 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483 f.debug_struct("LocalEnterGuard").finish()
484 }
485}
486
487impl LocalSet {
488 /// Returns a new local task set.
489 pub fn new() -> LocalSet {
490 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
491
492 LocalSet {
493 tick: Cell::new(0),
494 context: Rc::new(Context {
495 shared: Arc::new(Shared {
496 local_state: LocalState {
497 owner,
498 owned: LocalOwnedTasks::new(),
499 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
500 },
501 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
502 waker: AtomicWaker::new(),
503 #[cfg(tokio_unstable)]
504 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
505 }),
506 unhandled_panic: Cell::new(false),
507 }),
508 _not_send: PhantomData,
509 }
510 }
511
512 /// Enters the context of this `LocalSet`.
513 ///
514 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
515 /// context you are inside.
516 ///
517 /// [`spawn_local`]: fn@crate::task::spawn_local
518 pub fn enter(&self) -> LocalEnterGuard {
519 CURRENT.with(
520 |LocalData {
521 ctx,
522 wake_on_schedule,
523 ..
524 }| {
525 let ctx = ctx.replace(Some(self.context.clone()));
526 let wake_on_schedule = wake_on_schedule.replace(true);
527 LocalEnterGuard {
528 ctx,
529 wake_on_schedule,
530 }
531 },
532 )
533 }
534
535 /// Spawns a `!Send` task onto the local task set.
536 ///
537 /// This task is guaranteed to be run on the current thread.
538 ///
539 /// Unlike the free function [`spawn_local`], this method may be used to
540 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
541 /// future will start running once the `LocalSet` is next started, even if
542 /// you don't await the returned `JoinHandle`.
543 ///
544 /// # Examples
545 ///
546 /// ```rust
547 /// use tokio::task;
548 ///
549 /// # #[tokio::main(flavor = "current_thread")]
550 /// # async fn main() {
551 /// let local = task::LocalSet::new();
552 ///
553 /// // Spawn a future on the local set. This future will be run when
554 /// // we call `run_until` to drive the task set.
555 /// local.spawn_local(async {
556 /// // ...
557 /// });
558 ///
559 /// // Run the local task set.
560 /// local.run_until(async move {
561 /// // ...
562 /// }).await;
563 ///
564 /// // When `run` finishes, we can spawn _more_ futures, which will
565 /// // run in subsequent calls to `run_until`.
566 /// local.spawn_local(async {
567 /// // ...
568 /// });
569 ///
570 /// local.run_until(async move {
571 /// // ...
572 /// }).await;
573 /// # }
574 /// ```
575 /// [`spawn_local`]: fn@spawn_local
576 #[track_caller]
577 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
578 where
579 F: Future + 'static,
580 F::Output: 'static,
581 {
582 let fut_size = mem::size_of::<F>();
583 if fut_size > BOX_FUTURE_THRESHOLD {
584 self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
585 } else {
586 self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
587 }
588 }
589
590 /// Runs a future to completion on the provided runtime, driving any local
591 /// futures spawned on this task set on the current thread.
592 ///
593 /// This runs the given future on the runtime, blocking until it is
594 /// complete, and yielding its resolved result. Any tasks or timers which
595 /// the future spawns internally will be executed on the runtime. The future
596 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
597 /// current thread.
598 ///
599 /// This method should not be called from an asynchronous context.
600 ///
601 /// # Panics
602 ///
603 /// This function panics if the executor is at capacity, if the provided
604 /// future panics, or if called within an asynchronous execution context.
605 ///
606 /// # Notes
607 ///
608 /// Since this function internally calls [`Runtime::block_on`], and drives
609 /// futures in the local task set inside that call to `block_on`, the local
610 /// futures may not use [in-place blocking]. If a blocking call needs to be
611 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
612 ///
613 /// For example, this will panic:
614 /// ```should_panic,ignore-wasm
615 /// use tokio::runtime::Runtime;
616 /// use tokio::task;
617 ///
618 /// let rt = Runtime::new().unwrap();
619 /// let local = task::LocalSet::new();
620 /// local.block_on(&rt, async {
621 /// let join = task::spawn_local(async {
622 /// let blocking_result = task::block_in_place(|| {
623 /// // ...
624 /// });
625 /// // ...
626 /// });
627 /// join.await.unwrap();
628 /// })
629 /// ```
630 /// This, however, will not panic:
631 /// ```
632 /// # #[cfg(not(target_family = "wasm"))]
633 /// # {
634 /// use tokio::runtime::Runtime;
635 /// use tokio::task;
636 ///
637 /// let rt = Runtime::new().unwrap();
638 /// let local = task::LocalSet::new();
639 /// local.block_on(&rt, async {
640 /// let join = task::spawn_local(async {
641 /// let blocking_result = task::spawn_blocking(|| {
642 /// // ...
643 /// }).await;
644 /// // ...
645 /// });
646 /// join.await.unwrap();
647 /// })
648 /// # }
649 /// ```
650 ///
651 /// [`spawn_local`]: fn@spawn_local
652 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
653 /// [in-place blocking]: fn@crate::task::block_in_place
654 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
655 #[track_caller]
656 #[cfg(feature = "rt")]
657 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
658 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
659 where
660 F: Future,
661 {
662 rt.block_on(self.run_until(future))
663 }
664
665 /// Runs a future to completion on the local set, returning its output.
666 ///
667 /// This returns a future that runs the given future with a local set,
668 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
669 /// Any local futures spawned on the local set will be driven in the
670 /// background until the future passed to `run_until` completes. When the future
671 /// passed to `run_until` finishes, any local futures which have not completed
672 /// will remain on the local set, and will be driven on subsequent calls to
673 /// `run_until` or when [awaiting the local set] itself.
674 ///
675 /// # Cancel safety
676 ///
677 /// This method is cancel safe when `future` is cancel safe.
678 ///
679 /// # Examples
680 ///
681 /// ```rust
682 /// use tokio::task;
683 ///
684 /// # #[tokio::main(flavor = "current_thread")]
685 /// # async fn main() {
686 /// task::LocalSet::new().run_until(async {
687 /// task::spawn_local(async move {
688 /// // ...
689 /// }).await.unwrap();
690 /// // ...
691 /// }).await;
692 /// # }
693 /// ```
694 ///
695 /// [`spawn_local`]: fn@spawn_local
696 /// [awaiting the local set]: #awaiting-a-localset
697 pub async fn run_until<F>(&self, future: F) -> F::Output
698 where
699 F: Future,
700 {
701 let run_until = RunUntil {
702 future,
703 local_set: self,
704 };
705 run_until.await
706 }
707
708 #[track_caller]
709 pub(in crate::task) fn spawn_named<F>(
710 &self,
711 future: F,
712 meta: SpawnMeta<'_>,
713 ) -> JoinHandle<F::Output>
714 where
715 F: Future + 'static,
716 F::Output: 'static,
717 {
718 self.spawn_named_inner(future, meta)
719 }
720
721 #[track_caller]
722 fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
723 where
724 F: Future + 'static,
725 F::Output: 'static,
726 {
727 let handle = self.context.spawn(future, meta);
728
729 // Because a task was spawned from *outside* the `LocalSet`, wake the
730 // `LocalSet` future to execute the new task, if it hasn't been woken.
731 //
732 // Spawning via the free fn `spawn` does not require this, as it can
733 // only be called from *within* a future executing on the `LocalSet` —
734 // in that case, the `LocalSet` must already be awake.
735 self.context.shared.waker.wake();
736 handle
737 }
738
739 /// Ticks the scheduler, returning whether the local future needs to be
740 /// notified again.
741 fn tick(&self) -> bool {
742 for _ in 0..MAX_TASKS_PER_TICK {
743 // Make sure we didn't hit an unhandled panic
744 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
745
746 match self.next_task() {
747 // Run the task
748 //
749 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
750 // used. We are responsible for maintaining the invariant that
751 // `run_unchecked` is only called on threads that spawned the
752 // task initially. Because `LocalSet` itself is `!Send`, and
753 // `spawn_local` spawns into the `LocalSet` on the current
754 // thread, the invariant is maintained.
755 Some(task) => crate::task::coop::budget(|| task.run()),
756 // We have fully drained the queue of notified tasks, so the
757 // local future doesn't need to be notified again — it can wait
758 // until something else wakes a task in the local set.
759 None => return false,
760 }
761 }
762
763 true
764 }
765
766 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
767 let tick = self.tick.get();
768 self.tick.set(tick.wrapping_add(1));
769
770 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
771 self.context
772 .shared
773 .queue
774 .lock()
775 .as_mut()
776 .and_then(|queue| queue.pop_front())
777 .or_else(|| self.pop_local())
778 } else {
779 self.pop_local().or_else(|| {
780 self.context
781 .shared
782 .queue
783 .lock()
784 .as_mut()
785 .and_then(VecDeque::pop_front)
786 })
787 };
788
789 task.map(|task| unsafe {
790 // Safety: because the `LocalSet` itself is `!Send`, we know we are
791 // on the same thread if we have access to the `LocalSet`, and can
792 // therefore access the local run queue.
793 self.context.shared.local_state.assert_owner(task)
794 })
795 }
796
797 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
798 unsafe {
799 // Safety: because the `LocalSet` itself is `!Send`, we know we are
800 // on the same thread if we have access to the `LocalSet`, and can
801 // therefore access the local run queue.
802 self.context.shared.local_state.task_pop_front()
803 }
804 }
805
806 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
807 CURRENT.with(|local_data| {
808 let _guard = local_data.enter(self.context.clone());
809 f()
810 })
811 }
812
813 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
814 /// fails.
815 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
816 let mut f = Some(f);
817
818 let res = CURRENT.try_with(|local_data| {
819 let _guard = local_data.enter(self.context.clone());
820 (f.take().unwrap())()
821 });
822
823 match res {
824 Ok(res) => res,
825 Err(_access_error) => (f.take().unwrap())(),
826 }
827 }
828}
829
830cfg_unstable! {
831 impl LocalSet {
832 /// Configure how the `LocalSet` responds to an unhandled panic on a
833 /// spawned task.
834 ///
835 /// By default, an unhandled panic (i.e. a panic not caught by
836 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
837 /// execution. The panic is error value is forwarded to the task's
838 /// [`JoinHandle`] and all other spawned tasks continue running.
839 ///
840 /// The `unhandled_panic` option enables configuring this behavior.
841 ///
842 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
843 /// spawned tasks have no impact on the `LocalSet`'s execution.
844 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
845 /// shutdown immediately when a spawned task panics even if that
846 /// task's `JoinHandle` has not been dropped. All other spawned tasks
847 /// will immediately terminate and further calls to
848 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
849 ///
850 /// # Panics
851 ///
852 /// This method panics if called after the `LocalSet` has started
853 /// running.
854 ///
855 /// # Unstable
856 ///
857 /// This option is currently unstable and its implementation is
858 /// incomplete. The API may change or be removed in the future. See
859 /// tokio-rs/tokio#4516 for more details.
860 ///
861 /// # Examples
862 ///
863 /// The following demonstrates a `LocalSet` configured to shutdown on
864 /// panic. The first spawned task panics and results in the `LocalSet`
865 /// shutting down. The second spawned task never has a chance to
866 /// execute. The call to `run_until` will panic due to the runtime being
867 /// forcibly shutdown.
868 ///
869 /// ```should_panic
870 /// use tokio::runtime::UnhandledPanic;
871 ///
872 /// # #[tokio::main(flavor = "current_thread")]
873 /// # async fn main() {
874 /// tokio::task::LocalSet::new()
875 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
876 /// .run_until(async {
877 /// tokio::task::spawn_local(async { panic!("boom"); });
878 /// tokio::task::spawn_local(async {
879 /// // This task never completes
880 /// });
881 ///
882 /// // Do some work, but `run_until` will panic before it completes
883 /// # loop { tokio::task::yield_now().await; }
884 /// })
885 /// .await;
886 /// # }
887 /// ```
888 ///
889 /// [`JoinHandle`]: struct@crate::task::JoinHandle
890 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
891 // TODO: This should be set as a builder
892 Rc::get_mut(&mut self.context)
893 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
894 .expect("Unhandled Panic behavior modified after starting LocalSet")
895 .unhandled_panic = behavior;
896 self
897 }
898
899 /// Returns the [`Id`] of the current `LocalSet` runtime.
900 ///
901 /// # Examples
902 ///
903 /// ```rust
904 /// use tokio::task;
905 ///
906 /// # #[tokio::main(flavor = "current_thread")]
907 /// # async fn main() {
908 /// let local_set = task::LocalSet::new();
909 /// println!("Local set id: {}", local_set.id());
910 /// # }
911 /// ```
912 ///
913 /// **Note**: This is an [unstable API][unstable]. The public API of this type
914 /// may break in 1.x releases. See [the documentation on unstable
915 /// features][unstable] for details.
916 ///
917 /// [unstable]: crate#unstable-features
918 /// [`Id`]: struct@crate::runtime::Id
919 pub fn id(&self) -> runtime::Id {
920 self.context.shared.local_state.owned.id.into()
921 }
922 }
923}
924
925impl fmt::Debug for LocalSet {
926 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
927 fmt.debug_struct("LocalSet").finish()
928 }
929}
930
931impl Future for LocalSet {
932 type Output = ();
933
934 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
935 let _no_blocking = crate::runtime::context::disallow_block_in_place();
936
937 // Register the waker before starting to work
938 self.context.shared.waker.register_by_ref(cx.waker());
939
940 if self.with(|| self.tick()) {
941 // If `tick` returns true, we need to notify the local future again:
942 // there are still tasks remaining in the run queue.
943 cx.waker().wake_by_ref();
944 Poll::Pending
945
946 // Safety: called from the thread that owns `LocalSet`. Because
947 // `LocalSet` is `!Send`, this is safe.
948 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
949 // If the scheduler has no remaining futures, we're done!
950 Poll::Ready(())
951 } else {
952 // There are still futures in the local set, but we've polled all the
953 // futures in the run queue. Therefore, we can just return Pending
954 // since the remaining futures will be woken from somewhere else.
955 Poll::Pending
956 }
957 }
958}
959
960impl Default for LocalSet {
961 fn default() -> LocalSet {
962 LocalSet::new()
963 }
964}
965
966impl Drop for LocalSet {
967 fn drop(&mut self) {
968 self.with_if_possible(|| {
969 let _no_blocking = crate::runtime::context::disallow_block_in_place();
970
971 // Shut down all tasks in the LocalOwnedTasks and close it to
972 // prevent new tasks from ever being added.
973 unsafe {
974 // Safety: called from the thread that owns `LocalSet`
975 self.context.shared.local_state.close_and_shutdown_all();
976 }
977
978 // We already called shutdown on all tasks above, so there is no
979 // need to call shutdown.
980
981 // Safety: note that this *intentionally* bypasses the unsafe
982 // `Shared::local_queue()` method. This is in order to avoid the
983 // debug assertion that we are on the thread that owns the
984 // `LocalSet`, because on some systems (e.g. at least some macOS
985 // versions), attempting to get the current thread ID can panic due
986 // to the thread's local data that stores the thread ID being
987 // dropped *before* the `LocalSet`.
988 //
989 // Despite avoiding the assertion here, it is safe for us to access
990 // the local queue in `Drop`, because the `LocalSet` itself is
991 // `!Send`, so we can reasonably guarantee that it will not be
992 // `Drop`ped from another thread.
993 let local_queue = unsafe {
994 // Safety: called from the thread that owns `LocalSet`
995 self.context.shared.local_state.take_local_queue()
996 };
997 for task in local_queue {
998 drop(task);
999 }
1000
1001 // Take the queue from the Shared object to prevent pushing
1002 // notifications to it in the future.
1003 let queue = self.context.shared.queue.lock().take().unwrap();
1004 for task in queue {
1005 drop(task);
1006 }
1007
1008 // Safety: called from the thread that owns `LocalSet`
1009 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
1010 });
1011 }
1012}
1013
1014// === impl Context ===
1015
1016impl Context {
1017 #[track_caller]
1018 fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
1019 where
1020 F: Future + 'static,
1021 F::Output: 'static,
1022 {
1023 let id = crate::runtime::task::Id::next();
1024 let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1025
1026 // Safety: called from the thread that owns the `LocalSet`
1027 let (handle, notified) = {
1028 self.shared.local_state.assert_called_from_owner_thread();
1029 self.shared.local_state.owned.bind(
1030 future,
1031 self.shared.clone(),
1032 id,
1033 SpawnLocation::capture(),
1034 )
1035 };
1036
1037 if let Some(notified) = notified {
1038 self.shared.schedule(notified);
1039 }
1040
1041 handle
1042 }
1043}
1044
1045// === impl LocalFuture ===
1046
1047impl<T: Future> Future for RunUntil<'_, T> {
1048 type Output = T::Output;
1049
1050 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1051 let me = self.project();
1052
1053 me.local_set.with(|| {
1054 me.local_set
1055 .context
1056 .shared
1057 .waker
1058 .register_by_ref(cx.waker());
1059
1060 let _no_blocking = crate::runtime::context::disallow_block_in_place();
1061 let f = me.future;
1062
1063 if let Poll::Ready(output) = f.poll(cx) {
1064 return Poll::Ready(output);
1065 }
1066
1067 if me.local_set.tick() {
1068 // If `tick` returns `true`, we need to notify the local future again:
1069 // there are still tasks remaining in the run queue.
1070 cx.waker().wake_by_ref();
1071 }
1072
1073 Poll::Pending
1074 })
1075 }
1076}
1077
1078impl Shared {
1079 /// Schedule the provided task on the scheduler.
1080 fn schedule(&self, task: task::Notified<Arc<Self>>) {
1081 CURRENT.with(|localdata| {
1082 match localdata.ctx.get() {
1083 // If the current `LocalSet` is being polled, we don't need to wake it.
1084 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1085 // In this case it is not being polled, so we need to wake it.
1086 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1087 // Safety: if the current `LocalSet` context points to this
1088 // `LocalSet`, then we are on the thread that owns it.
1089 cx.shared.local_state.task_push_back(task);
1090 },
1091
1092 // We are on the thread that owns the `LocalSet`, so we can
1093 // wake to the local queue.
1094 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1095 unsafe {
1096 // Safety: we just checked that the thread ID matches
1097 // the localset's owner, so this is safe.
1098 self.local_state.task_push_back(task);
1099 }
1100 // We still have to wake the `LocalSet`, because it isn't
1101 // currently being polled.
1102 self.waker.wake();
1103 }
1104
1105 // We are *not* on the thread that owns the `LocalSet`, so we
1106 // have to wake to the remote queue.
1107 _ => {
1108 // First, check whether the queue is still there (if not, the
1109 // LocalSet is dropped). Then push to it if so, and if not,
1110 // do nothing.
1111 let mut lock = self.queue.lock();
1112
1113 if let Some(queue) = lock.as_mut() {
1114 queue.push_back(task);
1115 drop(lock);
1116 self.waker.wake();
1117 }
1118 }
1119 }
1120 });
1121 }
1122
1123 fn ptr_eq(&self, other: &Shared) -> bool {
1124 std::ptr::eq(self, other)
1125 }
1126}
1127
1128// This is safe because (and only because) we *pinky pwomise* to never touch the
1129// local run queue except from the thread that owns the `LocalSet`.
1130unsafe impl Sync for Shared {}
1131
1132impl task::Schedule for Arc<Shared> {
1133 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1134 // Safety, this is always called from the thread that owns `LocalSet`
1135 unsafe { self.local_state.task_remove(task) }
1136 }
1137
1138 fn schedule(&self, task: task::Notified<Self>) {
1139 Shared::schedule(self, task);
1140 }
1141
1142 // localset does not currently support task hooks
1143 fn hooks(&self) -> TaskHarnessScheduleHooks {
1144 TaskHarnessScheduleHooks {
1145 task_terminate_callback: None,
1146 }
1147 }
1148
1149 cfg_unstable! {
1150 fn unhandled_panic(&self) {
1151 use crate::runtime::UnhandledPanic;
1152
1153 match self.unhandled_panic {
1154 UnhandledPanic::Ignore => {
1155 // Do nothing
1156 }
1157 UnhandledPanic::ShutdownRuntime => {
1158 // This hook is only called from within the runtime, so
1159 // `CURRENT` should match with `&self`, i.e. there is no
1160 // opportunity for a nested scheduler to be called.
1161 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1162 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1163 cx.unhandled_panic.set(true);
1164 // Safety: this is always called from the thread that owns `LocalSet`
1165 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1166 }
1167 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1168 })
1169 }
1170 }
1171 }
1172 }
1173}
1174
1175impl LocalState {
1176 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1177 // The caller ensures it is called from the same thread that owns
1178 // the LocalSet.
1179 self.assert_called_from_owner_thread();
1180
1181 self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1182 }
1183
1184 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1185 // The caller ensures it is called from the same thread that owns
1186 // the LocalSet.
1187 self.assert_called_from_owner_thread();
1188
1189 self.local_queue.with_mut(|ptr| (*ptr).push_back(task));
1190 }
1191
1192 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1193 // The caller ensures it is called from the same thread that owns
1194 // the LocalSet.
1195 self.assert_called_from_owner_thread();
1196
1197 self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1198 }
1199
1200 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1201 // The caller ensures it is called from the same thread that owns
1202 // the LocalSet.
1203 self.assert_called_from_owner_thread();
1204
1205 self.owned.remove(task)
1206 }
1207
1208 /// Returns true if the `LocalSet` does not have any spawned tasks
1209 unsafe fn owned_is_empty(&self) -> bool {
1210 // The caller ensures it is called from the same thread that owns
1211 // the LocalSet.
1212 self.assert_called_from_owner_thread();
1213
1214 self.owned.is_empty()
1215 }
1216
1217 unsafe fn assert_owner(
1218 &self,
1219 task: task::Notified<Arc<Shared>>,
1220 ) -> task::LocalNotified<Arc<Shared>> {
1221 // The caller ensures it is called from the same thread that owns
1222 // the LocalSet.
1223 self.assert_called_from_owner_thread();
1224
1225 self.owned.assert_owner(task)
1226 }
1227
1228 unsafe fn close_and_shutdown_all(&self) {
1229 // The caller ensures it is called from the same thread that owns
1230 // the LocalSet.
1231 self.assert_called_from_owner_thread();
1232
1233 self.owned.close_and_shutdown_all();
1234 }
1235
1236 #[track_caller]
1237 fn assert_called_from_owner_thread(&self) {
1238 // FreeBSD has some weirdness around thread-local destruction.
1239 // TODO: remove this hack when thread id is cleaned up
1240 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1241 debug_assert!(
1242 // if we couldn't get the thread ID because we're dropping the local
1243 // data, skip the assertion --- the `Drop` impl is not going to be
1244 // called from another thread, because `LocalSet` is `!Send`
1245 context::thread_id()
1246 .map(|id| id == self.owner)
1247 .unwrap_or(true),
1248 "`LocalSet`'s local run queue must not be accessed by another thread!"
1249 );
1250 }
1251}
1252
1253// This is `Send` because it is stored in `Shared`. It is up to the caller to
1254// ensure they are on the same thread that owns the `LocalSet`.
1255unsafe impl Send for LocalState {}
1256
1257#[cfg(all(test, not(loom)))]
1258mod tests {
1259 use super::*;
1260
1261 // Does a `LocalSet` running on a current-thread runtime...basically work?
1262 //
1263 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1264 // a lib test, it will run under Miri, so this is necessary to catch stacked
1265 // borrows violations in the `LocalSet` implementation.
1266 #[test]
1267 fn local_current_thread_scheduler() {
1268 let f = async {
1269 LocalSet::new()
1270 .run_until(async {
1271 spawn_local(async {}).await.unwrap();
1272 })
1273 .await;
1274 };
1275 crate::runtime::Builder::new_current_thread()
1276 .build()
1277 .expect("rt")
1278 .block_on(f)
1279 }
1280
1281 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1282 // same thread, the task is woken to the localset's local queue rather than
1283 // its remote queue.
1284 //
1285 // This test has to be defined in the `local.rs` file as a lib test, rather
1286 // than in `tests/`, because it makes assertions about the local set's
1287 // internal state.
1288 #[test]
1289 fn wakes_to_local_queue() {
1290 use super::*;
1291 use crate::sync::Notify;
1292 let rt = crate::runtime::Builder::new_current_thread()
1293 .build()
1294 .expect("rt");
1295 rt.block_on(async {
1296 let local = LocalSet::new();
1297 let notify = Arc::new(Notify::new());
1298 let task = local.spawn_local({
1299 let notify = notify.clone();
1300 async move {
1301 notify.notified().await;
1302 }
1303 });
1304 let mut run_until = Box::pin(local.run_until(async move {
1305 task.await.unwrap();
1306 }));
1307
1308 // poll the run until future once
1309 std::future::poll_fn(|cx| {
1310 let _ = run_until.as_mut().poll(cx);
1311 Poll::Ready(())
1312 })
1313 .await;
1314
1315 notify.notify_one();
1316 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1317 // TODO(eliza): it would be nice to be able to assert that this is
1318 // the local task.
1319 assert!(
1320 task.is_some(),
1321 "task should have been notified to the LocalSet's local queue"
1322 );
1323 })
1324 }
1325}