tokio/sync/
notify.rs

1// Allow `unreachable_pub` warnings when sync is not enabled
2// due to the usage of `Notify` within the `rt` feature set.
3// When this module is compiled with `sync` enabled we will warn on
4// this lint. When `rt` is enabled we use `pub(crate)` which
5// triggers this warning but it is safe to ignore in this case.
6#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
7
8use crate::loom::cell::UnsafeCell;
9use crate::loom::sync::atomic::AtomicUsize;
10use crate::loom::sync::Mutex;
11use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
12use crate::util::WakeList;
13
14use std::future::Future;
15use std::marker::PhantomPinned;
16use std::panic::{RefUnwindSafe, UnwindSafe};
17use std::pin::Pin;
18use std::ptr::NonNull;
19use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
20use std::sync::Arc;
21use std::task::{Context, Poll, Waker};
22
23type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
24type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
25
26/// Notifies a single task to wake up.
27///
28/// `Notify` provides a basic mechanism to notify a single task of an event.
29/// `Notify` itself does not carry any data. Instead, it is to be used to signal
30/// another task to perform an operation.
31///
32/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
33/// [`notified().await`] method waits for a permit to become available, and
34/// [`notify_one()`] sets a permit **if there currently are no available
35/// permits**.
36///
37/// The synchronization details of `Notify` are similar to
38/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
39/// value contains a single permit. [`notified().await`] waits for the permit to
40/// be made available, consumes the permit, and resumes.  [`notify_one()`] sets
41/// the permit, waking a pending task if there is one.
42///
43/// If `notify_one()` is called **before** `notified().await`, then the next
44/// call to `notified().await` will complete immediately, consuming the permit.
45/// Any subsequent calls to `notified().await` will wait for a new permit.
46///
47/// If `notify_one()` is called **multiple** times before `notified().await`,
48/// only a **single** permit is stored. The next call to `notified().await` will
49/// complete immediately, but the one after will wait for a new permit.
50///
51/// # Examples
52///
53/// Basic usage.
54///
55/// ```
56/// use tokio::sync::Notify;
57/// use std::sync::Arc;
58///
59/// # #[tokio::main(flavor = "current_thread")]
60/// # async fn main() {
61/// let notify = Arc::new(Notify::new());
62/// let notify2 = notify.clone();
63///
64/// let handle = tokio::spawn(async move {
65///     notify2.notified().await;
66///     println!("received notification");
67/// });
68///
69/// println!("sending notification");
70/// notify.notify_one();
71///
72/// // Wait for task to receive notification.
73/// handle.await.unwrap();
74/// # }
75/// ```
76///
77/// Unbound multi-producer single-consumer (mpsc) channel.
78///
79/// No wakeups can be lost when using this channel because the call to
80/// `notify_one()` will store a permit in the `Notify`, which the following call
81/// to `notified()` will consume.
82///
83/// ```
84/// use tokio::sync::Notify;
85///
86/// use std::collections::VecDeque;
87/// use std::sync::Mutex;
88///
89/// struct Channel<T> {
90///     values: Mutex<VecDeque<T>>,
91///     notify: Notify,
92/// }
93///
94/// impl<T> Channel<T> {
95///     pub fn send(&self, value: T) {
96///         self.values.lock().unwrap()
97///             .push_back(value);
98///
99///         // Notify the consumer a value is available
100///         self.notify.notify_one();
101///     }
102///
103///     // This is a single-consumer channel, so several concurrent calls to
104///     // `recv` are not allowed.
105///     pub async fn recv(&self) -> T {
106///         loop {
107///             // Drain values
108///             if let Some(value) = self.values.lock().unwrap().pop_front() {
109///                 return value;
110///             }
111///
112///             // Wait for values to be available
113///             self.notify.notified().await;
114///         }
115///     }
116/// }
117/// ```
118///
119/// Unbound multi-producer multi-consumer (mpmc) channel.
120///
121/// The call to [`enable`] is important because otherwise if you have two
122/// calls to `recv` and two calls to `send` in parallel, the following could
123/// happen:
124///
125///  1. Both calls to `try_recv` return `None`.
126///  2. Both new elements are added to the vector.
127///  3. The `notify_one` method is called twice, adding only a single
128///     permit to the `Notify`.
129///  4. Both calls to `recv` reach the `Notified` future. One of them
130///     consumes the permit, and the other sleeps forever.
131///
132/// By adding the `Notified` futures to the list by calling `enable` before
133/// `try_recv`, the `notify_one` calls in step three would remove the
134/// futures from the list and mark them notified instead of adding a permit
135/// to the `Notify`. This ensures that both futures are woken.
136///
137/// Notice that this failure can only happen if there are two concurrent calls
138/// to `recv`. This is why the mpsc example above does not require a call to
139/// `enable`.
140///
141/// ```
142/// use tokio::sync::Notify;
143///
144/// use std::collections::VecDeque;
145/// use std::sync::Mutex;
146///
147/// struct Channel<T> {
148///     messages: Mutex<VecDeque<T>>,
149///     notify_on_sent: Notify,
150/// }
151///
152/// impl<T> Channel<T> {
153///     pub fn send(&self, msg: T) {
154///         let mut locked_queue = self.messages.lock().unwrap();
155///         locked_queue.push_back(msg);
156///         drop(locked_queue);
157///
158///         // Send a notification to one of the calls currently
159///         // waiting in a call to `recv`.
160///         self.notify_on_sent.notify_one();
161///     }
162///
163///     pub fn try_recv(&self) -> Option<T> {
164///         let mut locked_queue = self.messages.lock().unwrap();
165///         locked_queue.pop_front()
166///     }
167///
168///     pub async fn recv(&self) -> T {
169///         let future = self.notify_on_sent.notified();
170///         tokio::pin!(future);
171///
172///         loop {
173///             // Make sure that no wakeup is lost if we get
174///             // `None` from `try_recv`.
175///             future.as_mut().enable();
176///
177///             if let Some(msg) = self.try_recv() {
178///                 return msg;
179///             }
180///
181///             // Wait for a call to `notify_one`.
182///             //
183///             // This uses `.as_mut()` to avoid consuming the future,
184///             // which lets us call `Pin::set` below.
185///             future.as_mut().await;
186///
187///             // Reset the future in case another call to
188///             // `try_recv` got the message before us.
189///             future.set(self.notify_on_sent.notified());
190///         }
191///     }
192/// }
193/// ```
194///
195/// [park]: std::thread::park
196/// [unpark]: std::thread::Thread::unpark
197/// [`notified().await`]: Notify::notified()
198/// [`notify_one()`]: Notify::notify_one()
199/// [`enable`]: Notified::enable()
200/// [`Semaphore`]: crate::sync::Semaphore
201#[derive(Debug)]
202pub struct Notify {
203    // `state` uses 2 bits to store one of `EMPTY`,
204    // `WAITING` or `NOTIFIED`. The rest of the bits
205    // are used to store the number of times `notify_waiters`
206    // was called.
207    //
208    // Throughout the code there are two assumptions:
209    // - state can be transitioned *from* `WAITING` only if
210    //   `waiters` lock is held
211    // - number of times `notify_waiters` was called can
212    //   be modified only if `waiters` lock is held
213    state: AtomicUsize,
214    waiters: Mutex<WaitList>,
215}
216
217#[derive(Debug)]
218struct Waiter {
219    /// Intrusive linked-list pointers.
220    pointers: linked_list::Pointers<Waiter>,
221
222    /// Waiting task's waker. Depending on the value of `notification`,
223    /// this field is either protected by the `waiters` lock in
224    /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
225    waker: UnsafeCell<Option<Waker>>,
226
227    /// Notification for this waiter. Uses 2 bits to store if and how was
228    /// notified, 1 bit for storing if it was woken up using FIFO or LIFO, and
229    /// the rest of it is unused.
230    /// * if it's `None`, then `waker` is protected by the `waiters` lock.
231    /// * if it's `Some`, then `waker` is exclusively owned by the
232    ///   enclosing `Waiter` and can be accessed without locking.
233    notification: AtomicNotification,
234
235    /// Should not be `Unpin`.
236    _p: PhantomPinned,
237}
238
239impl Waiter {
240    fn new() -> Waiter {
241        Waiter {
242            pointers: linked_list::Pointers::new(),
243            waker: UnsafeCell::new(None),
244            notification: AtomicNotification::none(),
245            _p: PhantomPinned,
246        }
247    }
248}
249
250generate_addr_of_methods! {
251    impl<> Waiter {
252        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
253            &self.pointers
254        }
255    }
256}
257
258// No notification.
259const NOTIFICATION_NONE: usize = 0b000;
260
261// Notification type used by `notify_one`.
262const NOTIFICATION_ONE: usize = 0b001;
263
264// Notification type used by `notify_last`.
265const NOTIFICATION_LAST: usize = 0b101;
266
267// Notification type used by `notify_waiters`.
268const NOTIFICATION_ALL: usize = 0b010;
269
270/// Notification for a `Waiter`.
271/// This struct is equivalent to `Option<Notification>`, but uses
272/// `AtomicUsize` inside for atomic operations.
273#[derive(Debug)]
274struct AtomicNotification(AtomicUsize);
275
276impl AtomicNotification {
277    fn none() -> Self {
278        AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
279    }
280
281    /// Store-release a notification.
282    /// This method should be called exactly once.
283    fn store_release(&self, notification: Notification) {
284        let data: usize = match notification {
285            Notification::All => NOTIFICATION_ALL,
286            Notification::One(NotifyOneStrategy::Fifo) => NOTIFICATION_ONE,
287            Notification::One(NotifyOneStrategy::Lifo) => NOTIFICATION_LAST,
288        };
289        self.0.store(data, Release);
290    }
291
292    fn load(&self, ordering: Ordering) -> Option<Notification> {
293        let data = self.0.load(ordering);
294        match data {
295            NOTIFICATION_NONE => None,
296            NOTIFICATION_ONE => Some(Notification::One(NotifyOneStrategy::Fifo)),
297            NOTIFICATION_LAST => Some(Notification::One(NotifyOneStrategy::Lifo)),
298            NOTIFICATION_ALL => Some(Notification::All),
299            _ => unreachable!(),
300        }
301    }
302
303    /// Clears the notification.
304    /// This method is used by a `Notified` future to consume the
305    /// notification. It uses relaxed ordering and should be only
306    /// used once the atomic notification is no longer shared.
307    fn clear(&self) {
308        self.0.store(NOTIFICATION_NONE, Relaxed);
309    }
310}
311
312#[derive(Debug, PartialEq, Eq)]
313#[repr(usize)]
314enum NotifyOneStrategy {
315    Fifo,
316    Lifo,
317}
318
319#[derive(Debug, PartialEq, Eq)]
320#[repr(usize)]
321enum Notification {
322    One(NotifyOneStrategy),
323    All,
324}
325
326/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
327/// and gates the access to it on `notify.waiters` mutex. It also empties
328/// the list on drop.
329struct NotifyWaitersList<'a> {
330    list: GuardedWaitList,
331    is_empty: bool,
332    notify: &'a Notify,
333}
334
335impl<'a> NotifyWaitersList<'a> {
336    fn new(
337        unguarded_list: WaitList,
338        guard: Pin<&'a Waiter>,
339        notify: &'a Notify,
340    ) -> NotifyWaitersList<'a> {
341        let guard_ptr = NonNull::from(guard.get_ref());
342        let list = unguarded_list.into_guarded(guard_ptr);
343        NotifyWaitersList {
344            list,
345            is_empty: false,
346            notify,
347        }
348    }
349
350    /// Removes the last element from the guarded list. Modifying this list
351    /// requires an exclusive access to the main list in `Notify`.
352    fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
353        let result = self.list.pop_back();
354        if result.is_none() {
355            // Save information about emptiness to avoid waiting for lock
356            // in the destructor.
357            self.is_empty = true;
358        }
359        result
360    }
361}
362
363impl Drop for NotifyWaitersList<'_> {
364    fn drop(&mut self) {
365        // If the list is not empty, we unlink all waiters from it.
366        // We do not wake the waiters to avoid double panics.
367        if !self.is_empty {
368            let _lock_guard = self.notify.waiters.lock();
369            while let Some(waiter) = self.list.pop_back() {
370                // Safety: we never make mutable references to waiters.
371                let waiter = unsafe { waiter.as_ref() };
372                waiter.notification.store_release(Notification::All);
373            }
374        }
375    }
376}
377
378/// Future returned from [`Notify::notified()`].
379///
380/// This future is fused, so once it has completed, any future calls to poll
381/// will immediately return `Poll::Ready`.
382#[derive(Debug)]
383#[must_use = "futures do nothing unless you `.await` or poll them"]
384pub struct Notified<'a> {
385    /// The `Notify` being received on.
386    notify: &'a Notify,
387
388    /// The current state of the receiving process.
389    state: State,
390
391    /// Number of calls to `notify_waiters` at the time of creation.
392    notify_waiters_calls: usize,
393
394    /// Entry in the waiter `LinkedList`.
395    waiter: Waiter,
396}
397
398unsafe impl<'a> Send for Notified<'a> {}
399unsafe impl<'a> Sync for Notified<'a> {}
400
401/// Future returned from [`Notify::notified_owned()`].
402///
403/// This future is fused, so once it has completed, any future calls to poll
404/// will immediately return `Poll::Ready`.
405#[derive(Debug)]
406#[must_use = "futures do nothing unless you `.await` or poll them"]
407pub struct OwnedNotified {
408    /// The `Notify` being received on.
409    notify: Arc<Notify>,
410
411    /// The current state of the receiving process.
412    state: State,
413
414    /// Number of calls to `notify_waiters` at the time of creation.
415    notify_waiters_calls: usize,
416
417    /// Entry in the waiter `LinkedList`.
418    waiter: Waiter,
419}
420
421unsafe impl Sync for OwnedNotified {}
422
423/// A custom `project` implementation is used in place of `pin-project-lite`
424/// as a custom drop for [`Notified`] and [`OwnedNotified`] implementation
425/// is needed.
426struct NotifiedProject<'a> {
427    notify: &'a Notify,
428    state: &'a mut State,
429    notify_waiters_calls: &'a usize,
430    waiter: &'a Waiter,
431}
432
433#[derive(Debug)]
434enum State {
435    Init,
436    Waiting,
437    Done,
438}
439
440const NOTIFY_WAITERS_SHIFT: usize = 2;
441const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
442const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
443
444/// Initial "idle" state.
445const EMPTY: usize = 0;
446
447/// One or more threads are currently waiting to be notified.
448const WAITING: usize = 1;
449
450/// Pending notification.
451const NOTIFIED: usize = 2;
452
453fn set_state(data: usize, state: usize) -> usize {
454    (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
455}
456
457fn get_state(data: usize) -> usize {
458    data & STATE_MASK
459}
460
461fn get_num_notify_waiters_calls(data: usize) -> usize {
462    (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
463}
464
465fn inc_num_notify_waiters_calls(data: usize) -> usize {
466    data + (1 << NOTIFY_WAITERS_SHIFT)
467}
468
469fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) {
470    data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst);
471}
472
473impl Notify {
474    /// Create a new `Notify`, initialized without a permit.
475    ///
476    /// # Examples
477    ///
478    /// ```
479    /// use tokio::sync::Notify;
480    ///
481    /// let notify = Notify::new();
482    /// ```
483    pub fn new() -> Notify {
484        Notify {
485            state: AtomicUsize::new(0),
486            waiters: Mutex::new(LinkedList::new()),
487        }
488    }
489
490    /// Create a new `Notify`, initialized without a permit.
491    ///
492    /// When using the `tracing` [unstable feature], a `Notify` created with
493    /// `const_new` will not be instrumented. As such, it will not be visible
494    /// in [`tokio-console`]. Instead, [`Notify::new`] should be used to create
495    /// an instrumented object if that is needed.
496    ///
497    /// # Examples
498    ///
499    /// ```
500    /// use tokio::sync::Notify;
501    ///
502    /// static NOTIFY: Notify = Notify::const_new();
503    /// ```
504    ///
505    /// [`tokio-console`]: https://github.com/tokio-rs/console
506    /// [unstable feature]: crate#unstable-features
507    #[cfg(not(all(loom, test)))]
508    pub const fn const_new() -> Notify {
509        Notify {
510            state: AtomicUsize::new(0),
511            waiters: Mutex::const_new(LinkedList::new()),
512        }
513    }
514
515    /// Wait for a notification.
516    ///
517    /// Equivalent to:
518    ///
519    /// ```ignore
520    /// async fn notified(&self);
521    /// ```
522    ///
523    /// Each `Notify` value holds a single permit. If a permit is available from
524    /// an earlier call to [`notify_one()`], then `notified().await` will complete
525    /// immediately, consuming that permit. Otherwise, `notified().await` waits
526    /// for a permit to be made available by the next call to `notify_one()`.
527    ///
528    /// The `Notified` future is not guaranteed to receive wakeups from calls to
529    /// `notify_one()` if it has not yet been polled. See the documentation for
530    /// [`Notified::enable()`] for more details.
531    ///
532    /// The `Notified` future is guaranteed to receive wakeups from
533    /// `notify_waiters()` as soon as it has been created, even if it has not
534    /// yet been polled.
535    ///
536    /// [`notify_one()`]: Notify::notify_one
537    /// [`Notified::enable()`]: Notified::enable
538    ///
539    /// # Cancel safety
540    ///
541    /// This method uses a queue to fairly distribute notifications in the order
542    /// they were requested. Cancelling a call to `notified` makes you lose your
543    /// place in the queue.
544    ///
545    /// # Examples
546    ///
547    /// ```
548    /// use tokio::sync::Notify;
549    /// use std::sync::Arc;
550    ///
551    /// # #[tokio::main(flavor = "current_thread")]
552    /// # async fn main() {
553    /// let notify = Arc::new(Notify::new());
554    /// let notify2 = notify.clone();
555    ///
556    /// tokio::spawn(async move {
557    ///     notify2.notified().await;
558    ///     println!("received notification");
559    /// });
560    ///
561    /// println!("sending notification");
562    /// notify.notify_one();
563    /// # }
564    /// ```
565    pub fn notified(&self) -> Notified<'_> {
566        // we load the number of times notify_waiters
567        // was called and store that in the future.
568        let state = self.state.load(SeqCst);
569        Notified {
570            notify: self,
571            state: State::Init,
572            notify_waiters_calls: get_num_notify_waiters_calls(state),
573            waiter: Waiter::new(),
574        }
575    }
576
577    /// Wait for a notification with an owned `Future`.
578    ///
579    /// Unlike [`Self::notified`] which returns a future tied to the `Notify`'s
580    /// lifetime, `notified_owned` creates a self-contained future that owns its
581    /// notification state, making it safe to move between threads.
582    ///
583    /// See [`Self::notified`] for more details.
584    ///
585    /// # Cancel safety
586    ///
587    /// This method uses a queue to fairly distribute notifications in the order
588    /// they were requested. Cancelling a call to `notified_owned` makes you lose your
589    /// place in the queue.
590    ///
591    /// # Examples
592    ///
593    /// ```
594    /// use std::sync::Arc;
595    /// use tokio::sync::Notify;
596    ///
597    /// # #[tokio::main(flavor = "current_thread")]
598    /// # async fn main() {
599    /// let notify = Arc::new(Notify::new());
600    ///
601    /// for _ in 0..10 {
602    ///     let notified = notify.clone().notified_owned();
603    ///     tokio::spawn(async move {
604    ///         notified.await;
605    ///         println!("received notification");
606    ///     });
607    /// }
608    ///
609    /// println!("sending notification");
610    /// notify.notify_waiters();
611    /// # }
612    /// ```
613    pub fn notified_owned(self: Arc<Self>) -> OwnedNotified {
614        // we load the number of times notify_waiters
615        // was called and store that in the future.
616        let state = self.state.load(SeqCst);
617        OwnedNotified {
618            notify: self,
619            state: State::Init,
620            notify_waiters_calls: get_num_notify_waiters_calls(state),
621            waiter: Waiter::new(),
622        }
623    }
624    /// Notifies the first waiting task.
625    ///
626    /// If a task is currently waiting, that task is notified. Otherwise, a
627    /// permit is stored in this `Notify` value and the **next** call to
628    /// [`notified().await`] will complete immediately consuming the permit made
629    /// available by this call to `notify_one()`.
630    ///
631    /// At most one permit may be stored by `Notify`. Many sequential calls to
632    /// `notify_one` will result in a single permit being stored. The next call to
633    /// `notified().await` will complete immediately, but the one after that
634    /// will wait.
635    ///
636    /// [`notified().await`]: Notify::notified()
637    ///
638    /// # Examples
639    ///
640    /// ```
641    /// use tokio::sync::Notify;
642    /// use std::sync::Arc;
643    ///
644    /// # #[tokio::main(flavor = "current_thread")]
645    /// # async fn main() {
646    /// let notify = Arc::new(Notify::new());
647    /// let notify2 = notify.clone();
648    ///
649    /// tokio::spawn(async move {
650    ///     notify2.notified().await;
651    ///     println!("received notification");
652    /// });
653    ///
654    /// println!("sending notification");
655    /// notify.notify_one();
656    /// # }
657    /// ```
658    // Alias for old name in 0.x
659    #[cfg_attr(docsrs, doc(alias = "notify"))]
660    pub fn notify_one(&self) {
661        self.notify_with_strategy(NotifyOneStrategy::Fifo);
662    }
663
664    /// Notifies the last waiting task.
665    ///
666    /// This function behaves similar to `notify_one`. The only difference is that it wakes
667    /// the most recently added waiter instead of the oldest waiter.
668    ///
669    /// Check the [`notify_one()`] documentation for more info and
670    /// examples.
671    ///
672    /// [`notify_one()`]: Notify::notify_one
673    pub fn notify_last(&self) {
674        self.notify_with_strategy(NotifyOneStrategy::Lifo);
675    }
676
677    fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
678        // Load the current state
679        let mut curr = self.state.load(SeqCst);
680
681        // If the state is `EMPTY`, transition to `NOTIFIED` and return.
682        while let EMPTY | NOTIFIED = get_state(curr) {
683            // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
684            // happens-before synchronization must happen between this atomic
685            // operation and a task calling `notified().await`.
686            let new = set_state(curr, NOTIFIED);
687            let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
688
689            match res {
690                // No waiters, no further work to do
691                Ok(_) => return,
692                Err(actual) => {
693                    curr = actual;
694                }
695            }
696        }
697
698        // There are waiters, the lock must be acquired to notify.
699        let mut waiters = self.waiters.lock();
700
701        // The state must be reloaded while the lock is held. The state may only
702        // transition out of WAITING while the lock is held.
703        curr = self.state.load(SeqCst);
704
705        if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
706            drop(waiters);
707            waker.wake();
708        }
709    }
710
711    /// Notifies all waiting tasks.
712    ///
713    /// If a task is currently waiting, that task is notified. Unlike with
714    /// `notify_one()`, no permit is stored to be used by the next call to
715    /// `notified().await`. The purpose of this method is to notify all
716    /// already registered waiters. Registering for notification is done by
717    /// acquiring an instance of the `Notified` future via calling `notified()`.
718    ///
719    /// # Examples
720    ///
721    /// ```
722    /// use tokio::sync::Notify;
723    /// use std::sync::Arc;
724    ///
725    /// # #[tokio::main(flavor = "current_thread")]
726    /// # async fn main() {
727    /// let notify = Arc::new(Notify::new());
728    /// let notify2 = notify.clone();
729    ///
730    /// let notified1 = notify.notified();
731    /// let notified2 = notify.notified();
732    ///
733    /// let handle = tokio::spawn(async move {
734    ///     println!("sending notifications");
735    ///     notify2.notify_waiters();
736    /// });
737    ///
738    /// notified1.await;
739    /// notified2.await;
740    /// println!("received notifications");
741    /// # }
742    /// ```
743    pub fn notify_waiters(&self) {
744        self.lock_waiter_list().notify_waiters();
745    }
746
747    fn inner_notify_waiters<'a>(
748        &'a self,
749        curr: usize,
750        mut waiters: crate::loom::sync::MutexGuard<'a, LinkedList<Waiter, Waiter>>,
751    ) {
752        if matches!(get_state(curr), EMPTY | NOTIFIED) {
753            // There are no waiting tasks. All we need to do is increment the
754            // number of times this method was called.
755            atomic_inc_num_notify_waiters_calls(&self.state);
756            return;
757        }
758
759        // Increment the number of times this method was called
760        // and transition to empty.
761        let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
762        self.state.store(new_state, SeqCst);
763
764        // It is critical for `GuardedLinkedList` safety that the guard node is
765        // pinned in memory and is not dropped until the guarded list is dropped.
766        let guard = Waiter::new();
767        pin!(guard);
768
769        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
770        // underneath to allow every waiter to safely remove itself from it.
771        //
772        // * This list will be still guarded by the `waiters` lock.
773        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
774        // * This wrapper will empty the list on drop. It is critical for safety
775        //   that we will not leave any list entry with a pointer to the local
776        //   guard node after this function returns / panics.
777        let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
778
779        let mut wakers = WakeList::new();
780        'outer: loop {
781            while wakers.can_push() {
782                match list.pop_back_locked(&mut waiters) {
783                    Some(waiter) => {
784                        // Safety: we never make mutable references to waiters.
785                        let waiter = unsafe { waiter.as_ref() };
786
787                        // Safety: we hold the lock, so we can access the waker.
788                        if let Some(waker) =
789                            unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
790                        {
791                            wakers.push(waker);
792                        }
793
794                        // This waiter is unlinked and will not be shared ever again, release it.
795                        waiter.notification.store_release(Notification::All);
796                    }
797                    None => {
798                        break 'outer;
799                    }
800                }
801            }
802
803            // Release the lock before notifying.
804            drop(waiters);
805
806            // One of the wakers may panic, but the remaining waiters will still
807            // be unlinked from the list in `NotifyWaitersList` destructor.
808            wakers.wake_all();
809
810            // Acquire the lock again.
811            waiters = self.waiters.lock();
812        }
813
814        // Release the lock before notifying
815        drop(waiters);
816
817        wakers.wake_all();
818    }
819
820    pub(crate) fn lock_waiter_list(&self) -> NotifyGuard<'_> {
821        let guarded_waiters = self.waiters.lock();
822
823        // The state must be loaded while the lock is held. The state may only
824        // transition out of WAITING while the lock is held.
825        let current_state = self.state.load(SeqCst);
826
827        NotifyGuard {
828            guarded_notify: self,
829            guarded_waiters,
830            current_state,
831        }
832    }
833}
834
835impl Default for Notify {
836    fn default() -> Notify {
837        Notify::new()
838    }
839}
840
841impl UnwindSafe for Notify {}
842impl RefUnwindSafe for Notify {}
843
844fn notify_locked(
845    waiters: &mut WaitList,
846    state: &AtomicUsize,
847    curr: usize,
848    strategy: NotifyOneStrategy,
849) -> Option<Waker> {
850    match get_state(curr) {
851        EMPTY | NOTIFIED => {
852            let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);
853
854            match res {
855                Ok(_) => None,
856                Err(actual) => {
857                    let actual_state = get_state(actual);
858                    assert!(actual_state == EMPTY || actual_state == NOTIFIED);
859                    state.store(set_state(actual, NOTIFIED), SeqCst);
860                    None
861                }
862            }
863        }
864        WAITING => {
865            // At this point, it is guaranteed that the state will not
866            // concurrently change as holding the lock is required to
867            // transition **out** of `WAITING`.
868            //
869            // Get a pending waiter using one of the available dequeue strategies.
870            let waiter = match strategy {
871                NotifyOneStrategy::Fifo => waiters.pop_back().unwrap(),
872                NotifyOneStrategy::Lifo => waiters.pop_front().unwrap(),
873            };
874
875            // Safety: we never make mutable references to waiters.
876            let waiter = unsafe { waiter.as_ref() };
877
878            // Safety: we hold the lock, so we can access the waker.
879            let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
880
881            // This waiter is unlinked and will not be shared ever again, release it.
882            waiter
883                .notification
884                .store_release(Notification::One(strategy));
885
886            if waiters.is_empty() {
887                // As this the **final** waiter in the list, the state
888                // must be transitioned to `EMPTY`. As transitioning
889                // **from** `WAITING` requires the lock to be held, a
890                // `store` is sufficient.
891                state.store(set_state(curr, EMPTY), SeqCst);
892            }
893            waker
894        }
895        _ => unreachable!(),
896    }
897}
898
899// ===== impl Notified =====
900
901impl Notified<'_> {
902    /// Adds this future to the list of futures that are ready to receive
903    /// wakeups from calls to [`notify_one`].
904    ///
905    /// Polling the future also adds it to the list, so this method should only
906    /// be used if you want to add the future to the list before the first call
907    /// to `poll`. (In fact, this method is equivalent to calling `poll` except
908    /// that no `Waker` is registered.)
909    ///
910    /// This has no effect on notifications sent using [`notify_waiters`], which
911    /// are received as long as they happen after the creation of the `Notified`
912    /// regardless of whether `enable` or `poll` has been called.
913    ///
914    /// This method returns true if the `Notified` is ready. This happens in the
915    /// following situations:
916    ///
917    ///  1. The `notify_waiters` method was called between the creation of the
918    ///     `Notified` and the call to this method.
919    ///  2. This is the first call to `enable` or `poll` on this future, and the
920    ///     `Notify` was holding a permit from a previous call to `notify_one`.
921    ///     The call consumes the permit in that case.
922    ///  3. The future has previously been enabled or polled, and it has since
923    ///     then been marked ready by either consuming a permit from the
924    ///     `Notify`, or by a call to `notify_one` or `notify_waiters` that
925    ///     removed it from the list of futures ready to receive wakeups.
926    ///
927    /// If this method returns true, any future calls to poll on the same future
928    /// will immediately return `Poll::Ready`.
929    ///
930    /// # Examples
931    ///
932    /// Unbound multi-producer multi-consumer (mpmc) channel.
933    ///
934    /// The call to `enable` is important because otherwise if you have two
935    /// calls to `recv` and two calls to `send` in parallel, the following could
936    /// happen:
937    ///
938    ///  1. Both calls to `try_recv` return `None`.
939    ///  2. Both new elements are added to the vector.
940    ///  3. The `notify_one` method is called twice, adding only a single
941    ///     permit to the `Notify`.
942    ///  4. Both calls to `recv` reach the `Notified` future. One of them
943    ///     consumes the permit, and the other sleeps forever.
944    ///
945    /// By adding the `Notified` futures to the list by calling `enable` before
946    /// `try_recv`, the `notify_one` calls in step three would remove the
947    /// futures from the list and mark them notified instead of adding a permit
948    /// to the `Notify`. This ensures that both futures are woken.
949    ///
950    /// ```
951    /// use tokio::sync::Notify;
952    ///
953    /// use std::collections::VecDeque;
954    /// use std::sync::Mutex;
955    ///
956    /// struct Channel<T> {
957    ///     messages: Mutex<VecDeque<T>>,
958    ///     notify_on_sent: Notify,
959    /// }
960    ///
961    /// impl<T> Channel<T> {
962    ///     pub fn send(&self, msg: T) {
963    ///         let mut locked_queue = self.messages.lock().unwrap();
964    ///         locked_queue.push_back(msg);
965    ///         drop(locked_queue);
966    ///
967    ///         // Send a notification to one of the calls currently
968    ///         // waiting in a call to `recv`.
969    ///         self.notify_on_sent.notify_one();
970    ///     }
971    ///
972    ///     pub fn try_recv(&self) -> Option<T> {
973    ///         let mut locked_queue = self.messages.lock().unwrap();
974    ///         locked_queue.pop_front()
975    ///     }
976    ///
977    ///     pub async fn recv(&self) -> T {
978    ///         let future = self.notify_on_sent.notified();
979    ///         tokio::pin!(future);
980    ///
981    ///         loop {
982    ///             // Make sure that no wakeup is lost if we get
983    ///             // `None` from `try_recv`.
984    ///             future.as_mut().enable();
985    ///
986    ///             if let Some(msg) = self.try_recv() {
987    ///                 return msg;
988    ///             }
989    ///
990    ///             // Wait for a call to `notify_one`.
991    ///             //
992    ///             // This uses `.as_mut()` to avoid consuming the future,
993    ///             // which lets us call `Pin::set` below.
994    ///             future.as_mut().await;
995    ///
996    ///             // Reset the future in case another call to
997    ///             // `try_recv` got the message before us.
998    ///             future.set(self.notify_on_sent.notified());
999    ///         }
1000    ///     }
1001    /// }
1002    /// ```
1003    ///
1004    /// [`notify_one`]: Notify::notify_one()
1005    /// [`notify_waiters`]: Notify::notify_waiters()
1006    pub fn enable(self: Pin<&mut Self>) -> bool {
1007        self.poll_notified(None).is_ready()
1008    }
1009
1010    fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
1011        unsafe {
1012            // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
1013
1014            is_unpin::<&Notify>();
1015            is_unpin::<State>();
1016            is_unpin::<usize>();
1017
1018            let me = self.get_unchecked_mut();
1019            NotifiedProject {
1020                notify: me.notify,
1021                state: &mut me.state,
1022                notify_waiters_calls: &me.notify_waiters_calls,
1023                waiter: &me.waiter,
1024            }
1025        }
1026    }
1027
1028    fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
1029        self.project().poll_notified(waker)
1030    }
1031}
1032
1033impl Future for Notified<'_> {
1034    type Output = ();
1035
1036    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1037        self.poll_notified(Some(cx.waker()))
1038    }
1039}
1040
1041impl Drop for Notified<'_> {
1042    fn drop(&mut self) {
1043        // Safety: The type only transitions to a "Waiting" state when pinned.
1044        unsafe { Pin::new_unchecked(self) }
1045            .project()
1046            .drop_notified();
1047    }
1048}
1049
1050// ===== impl OwnedNotified =====
1051
1052impl OwnedNotified {
1053    /// Adds this future to the list of futures that are ready to receive
1054    /// wakeups from calls to [`notify_one`].
1055    ///
1056    /// See [`Notified::enable`] for more details.
1057    ///
1058    /// [`notify_one`]: Notify::notify_one()
1059    pub fn enable(self: Pin<&mut Self>) -> bool {
1060        self.poll_notified(None).is_ready()
1061    }
1062
1063    /// A custom `project` implementation is used in place of `pin-project-lite`
1064    /// as a custom drop implementation is needed.
1065    fn project(self: Pin<&mut Self>) -> NotifiedProject<'_> {
1066        unsafe {
1067            // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
1068
1069            is_unpin::<&Notify>();
1070            is_unpin::<State>();
1071            is_unpin::<usize>();
1072
1073            let me = self.get_unchecked_mut();
1074            NotifiedProject {
1075                notify: &me.notify,
1076                state: &mut me.state,
1077                notify_waiters_calls: &me.notify_waiters_calls,
1078                waiter: &me.waiter,
1079            }
1080        }
1081    }
1082
1083    fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
1084        self.project().poll_notified(waker)
1085    }
1086}
1087
1088impl Future for OwnedNotified {
1089    type Output = ();
1090
1091    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
1092        self.poll_notified(Some(cx.waker()))
1093    }
1094}
1095
1096impl Drop for OwnedNotified {
1097    fn drop(&mut self) {
1098        // Safety: The type only transitions to a "Waiting" state when pinned.
1099        unsafe { Pin::new_unchecked(self) }
1100            .project()
1101            .drop_notified();
1102    }
1103}
1104
1105// ===== impl NotifiedProject =====
1106
1107impl NotifiedProject<'_> {
1108    fn poll_notified(self, waker: Option<&Waker>) -> Poll<()> {
1109        let NotifiedProject {
1110            notify,
1111            state,
1112            notify_waiters_calls,
1113            waiter,
1114        } = self;
1115
1116        'outer_loop: loop {
1117            match *state {
1118                State::Init => {
1119                    let curr = notify.state.load(SeqCst);
1120
1121                    // Optimistically try acquiring a pending notification
1122                    let res = notify.state.compare_exchange(
1123                        set_state(curr, NOTIFIED),
1124                        set_state(curr, EMPTY),
1125                        SeqCst,
1126                        SeqCst,
1127                    );
1128
1129                    if res.is_ok() {
1130                        // Acquired the notification
1131                        *state = State::Done;
1132                        continue 'outer_loop;
1133                    }
1134
1135                    // Clone the waker before locking, a waker clone can be
1136                    // triggering arbitrary code.
1137                    let waker = waker.cloned();
1138
1139                    // Acquire the lock and attempt to transition to the waiting
1140                    // state.
1141                    let mut waiters = notify.waiters.lock();
1142
1143                    // Reload the state with the lock held
1144                    let mut curr = notify.state.load(SeqCst);
1145
1146                    // if notify_waiters has been called after the future
1147                    // was created, then we are done
1148                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1149                        *state = State::Done;
1150                        continue 'outer_loop;
1151                    }
1152
1153                    // Transition the state to WAITING.
1154                    loop {
1155                        match get_state(curr) {
1156                            EMPTY => {
1157                                // Transition to WAITING
1158                                let res = notify.state.compare_exchange(
1159                                    set_state(curr, EMPTY),
1160                                    set_state(curr, WAITING),
1161                                    SeqCst,
1162                                    SeqCst,
1163                                );
1164
1165                                if let Err(actual) = res {
1166                                    assert_eq!(get_state(actual), NOTIFIED);
1167                                    curr = actual;
1168                                } else {
1169                                    break;
1170                                }
1171                            }
1172                            WAITING => break,
1173                            NOTIFIED => {
1174                                // Try consuming the notification
1175                                let res = notify.state.compare_exchange(
1176                                    set_state(curr, NOTIFIED),
1177                                    set_state(curr, EMPTY),
1178                                    SeqCst,
1179                                    SeqCst,
1180                                );
1181
1182                                match res {
1183                                    Ok(_) => {
1184                                        // Acquired the notification
1185                                        *state = State::Done;
1186                                        continue 'outer_loop;
1187                                    }
1188                                    Err(actual) => {
1189                                        assert_eq!(get_state(actual), EMPTY);
1190                                        curr = actual;
1191                                    }
1192                                }
1193                            }
1194                            _ => unreachable!(),
1195                        }
1196                    }
1197
1198                    let mut old_waker = None;
1199                    if waker.is_some() {
1200                        // Safety: called while locked.
1201                        //
1202                        // The use of `old_waiter` here is not necessary, as the field is always
1203                        // None when we reach this line.
1204                        unsafe {
1205                            old_waker =
1206                                waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
1207                        }
1208                    }
1209
1210                    // Insert the waiter into the linked list
1211                    waiters.push_front(NonNull::from(waiter));
1212
1213                    *state = State::Waiting;
1214
1215                    drop(waiters);
1216                    drop(old_waker);
1217
1218                    return Poll::Pending;
1219                }
1220                State::Waiting => {
1221                    #[cfg(feature = "taskdump")]
1222                    if let Some(waker) = waker {
1223                        let mut ctx = Context::from_waker(waker);
1224                        std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1225                    }
1226
1227                    if waiter.notification.load(Acquire).is_some() {
1228                        // Safety: waiter is already unlinked and will not be shared again,
1229                        // so we have an exclusive access to `waker`.
1230                        drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
1231
1232                        waiter.notification.clear();
1233                        *state = State::Done;
1234                        return Poll::Ready(());
1235                    }
1236
1237                    // Our waiter was not notified, implying it is still stored in a waiter
1238                    // list (guarded by `notify.waiters`). In order to access the waker
1239                    // fields, we must acquire the lock.
1240
1241                    let mut old_waker = None;
1242                    let mut waiters = notify.waiters.lock();
1243
1244                    // We hold the lock and notifications are set only with the lock held,
1245                    // so this can be relaxed, because the happens-before relationship is
1246                    // established through the mutex.
1247                    if waiter.notification.load(Relaxed).is_some() {
1248                        // Safety: waiter is already unlinked and will not be shared again,
1249                        // so we have an exclusive access to `waker`.
1250                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1251
1252                        waiter.notification.clear();
1253
1254                        // Drop the old waker after releasing the lock.
1255                        drop(waiters);
1256                        drop(old_waker);
1257
1258                        *state = State::Done;
1259                        return Poll::Ready(());
1260                    }
1261
1262                    // Load the state with the lock held.
1263                    let curr = notify.state.load(SeqCst);
1264
1265                    if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
1266                        // Before we add a waiter to the list we check if these numbers are
1267                        // different while holding the lock. If these numbers are different now,
1268                        // it means that there is a call to `notify_waiters` in progress and this
1269                        // waiter must be contained by a guarded list used in `notify_waiters`.
1270                        // We can treat the waiter as notified and remove it from the list, as
1271                        // it would have been notified in the `notify_waiters` call anyways.
1272
1273                        // Safety: we hold the lock, so we can modify the waker.
1274                        old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
1275
1276                        // Safety: we hold the lock, so we have an exclusive access to the list.
1277                        // The list is used in `notify_waiters`, so it must be guarded.
1278                        unsafe { waiters.remove(NonNull::from(waiter)) };
1279
1280                        *state = State::Done;
1281                    } else {
1282                        // Safety: we hold the lock, so we can modify the waker.
1283                        unsafe {
1284                            waiter.waker.with_mut(|v| {
1285                                if let Some(waker) = waker {
1286                                    let should_update = match &*v {
1287                                        Some(current_waker) => !current_waker.will_wake(waker),
1288                                        None => true,
1289                                    };
1290                                    if should_update {
1291                                        old_waker = (*v).replace(waker.clone());
1292                                    }
1293                                }
1294                            });
1295                        }
1296
1297                        // Drop the old waker after releasing the lock.
1298                        drop(waiters);
1299                        drop(old_waker);
1300
1301                        return Poll::Pending;
1302                    }
1303
1304                    // Explicit drop of the lock to indicate the scope that the
1305                    // lock is held. Because holding the lock is required to
1306                    // ensure safe access to fields not held within the lock, it
1307                    // is helpful to visualize the scope of the critical
1308                    // section.
1309                    drop(waiters);
1310
1311                    // Drop the old waker after releasing the lock.
1312                    drop(old_waker);
1313                }
1314                State::Done => {
1315                    #[cfg(feature = "taskdump")]
1316                    if let Some(waker) = waker {
1317                        let mut ctx = Context::from_waker(waker);
1318                        std::task::ready!(crate::trace::trace_leaf(&mut ctx));
1319                    }
1320                    return Poll::Ready(());
1321                }
1322            }
1323        }
1324    }
1325
1326    fn drop_notified(self) {
1327        let NotifiedProject {
1328            notify,
1329            state,
1330            waiter,
1331            ..
1332        } = self;
1333
1334        // This is where we ensure safety. The `Notified` value is being
1335        // dropped, which means we must ensure that the waiter entry is no
1336        // longer stored in the linked list.
1337        if matches!(*state, State::Waiting) {
1338            let mut waiters = notify.waiters.lock();
1339            let mut notify_state = notify.state.load(SeqCst);
1340
1341            // We hold the lock, so this field is not concurrently accessed by
1342            // `notify_*` functions and we can use the relaxed ordering.
1343            let notification = waiter.notification.load(Relaxed);
1344
1345            // remove the entry from the list (if not already removed)
1346            //
1347            // Safety: we hold the lock, so we have an exclusive access to every list the
1348            // waiter may be contained in. If the node is not contained in the `waiters`
1349            // list, then it is contained by a guarded list used by `notify_waiters`.
1350            unsafe { waiters.remove(NonNull::from(waiter)) };
1351
1352            if waiters.is_empty() && get_state(notify_state) == WAITING {
1353                notify_state = set_state(notify_state, EMPTY);
1354                notify.state.store(notify_state, SeqCst);
1355            }
1356
1357            // See if the node was notified but not received. In this case, if
1358            // the notification was triggered via `notify_one`, it must be sent
1359            // to the next waiter.
1360            if let Some(Notification::One(strategy)) = notification {
1361                if let Some(waker) =
1362                    notify_locked(&mut waiters, &notify.state, notify_state, strategy)
1363                {
1364                    drop(waiters);
1365                    waker.wake();
1366                }
1367            }
1368        }
1369    }
1370}
1371
1372/// # Safety
1373///
1374/// `Waiter` is forced to be !Unpin.
1375unsafe impl linked_list::Link for Waiter {
1376    type Handle = NonNull<Waiter>;
1377    type Target = Waiter;
1378
1379    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1380        *handle
1381    }
1382
1383    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1384        ptr
1385    }
1386
1387    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1388        Waiter::addr_of_pointers(target)
1389    }
1390}
1391
1392fn is_unpin<T: Unpin>() {}
1393
1394/// A guard that provides exclusive access to a `Notify`'s internal
1395/// waiters list.
1396///
1397/// While this guard is held, the `Notify` instance's waiter list is locked.
1398pub(crate) struct NotifyGuard<'a> {
1399    guarded_notify: &'a Notify,
1400    guarded_waiters: crate::loom::sync::MutexGuard<'a, WaitList>,
1401    current_state: usize,
1402}
1403
1404impl NotifyGuard<'_> {
1405    pub(crate) fn notify_waiters(self) {
1406        self.guarded_notify
1407            .inner_notify_waiters(self.current_state, self.guarded_waiters);
1408    }
1409}