tokio/sync/watch.rs
1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A multi-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! This channel is useful for watching for changes to a value from multiple
7//! points in the code base, for example, changes to configuration values.
8//!
9//! # Usage
10//!
11//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12//! and consumer halves of the channel. The channel is created with an initial
13//! value.
14//!
15//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16//!
17//! To access the **current** value stored in the channel and mark it as *seen*
18//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19//!
20//! To access the current value **without** marking it as *seen*, use
21//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23//!
24//! For more information on when to use these methods, see
25//! [here](#borrow_and_update-versus-borrow).
26//!
27//! ## Change notifications
28//!
29//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31//!
32//! * [`Receiver::changed()`] returns:
33//! * `Ok(())` on receiving a new value.
34//! * `Err(`[`RecvError`](error::RecvError)`)` if the
35//! channel has been closed __AND__ the current value is *seen*.
36//! * If the current value is *unseen* when calling [`changed`], then
37//! [`changed`] will return immediately. If the current value is *seen*, then
38//! it will sleep until either a new message is sent via the [`Sender`] half,
39//! or the [`Sender`] is dropped.
40//! * On completion, the [`changed`] method marks the new value as *seen*.
41//! * At creation, the initial value is considered *seen*. In other words,
42//! [`Receiver::changed()`] will not return until a subsequent value is sent.
43//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
44//! The current value at the time the [`Receiver`] is created is considered
45//! *seen*.
46//!
47//! ## [`changed`] versus [`has_changed`]
48//!
49//! The [`Receiver`] half provides two methods for checking for changes
50//! in the channel, [`has_changed`] and [`changed`].
51//!
52//! * [`has_changed`] is a *synchronous* method that checks whether the current
53//! value is seen or not and returns a boolean. This method does __not__ mark the
54//! value as seen.
55//!
56//! * [`changed`] is an *asynchronous* method that will return once an unseen
57//! value is in the channel. This method does mark the value as seen.
58//!
59//! Note there are two behavioral differences on when these two methods return
60//! an error.
61//!
62//! - [`has_changed`] errors if and only if the channel is closed.
63//! - [`changed`] errors if the channel has been closed __AND__
64//! the current value is seen.
65//!
66//! See the example below that shows how these methods have different fallibility.
67//!
68//! ## [`borrow_and_update`] versus [`borrow`]
69//!
70//! If the receiver intends to await notifications from [`changed`] in a loop,
71//! [`Receiver::borrow_and_update()`] should be preferred over
72//! [`Receiver::borrow()`]. This avoids a potential race where a new value is
73//! sent between [`changed`] being ready and the value being read. (If
74//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
75//!
76//! If the receiver is only interested in the current value, and does not intend
77//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
78//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
79//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
80//! self`.
81//!
82//! # Examples
83//!
84//! The following example prints `hello! world! `.
85//!
86//! ```
87//! use tokio::sync::watch;
88//! use tokio::time::{Duration, sleep};
89//!
90//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
91//! let (tx, mut rx) = watch::channel("hello");
92//!
93//! tokio::spawn(async move {
94//! // Use the equivalent of a "do-while" loop so the initial value is
95//! // processed before awaiting the `changed()` future.
96//! loop {
97//! println!("{}! ", *rx.borrow_and_update());
98//! if rx.changed().await.is_err() {
99//! break;
100//! }
101//! }
102//! });
103//!
104//! sleep(Duration::from_millis(100)).await;
105//! tx.send("world")?;
106//! # Ok(())
107//! # }
108//! ```
109//!
110//! Difference on fallibility of [`changed`] versus [`has_changed`].
111//! ```
112//! use tokio::sync::watch;
113//!
114//! #[tokio::main(flavor = "current_thread")]
115//! # async fn main() {
116//! let (tx, mut rx) = watch::channel("hello");
117//! tx.send("goodbye").unwrap();
118//! drop(tx);
119//!
120//! // `has_changed` does not mark the value as seen and errors
121//! // since the channel is closed.
122//! assert!(rx.has_changed().is_err());
123//!
124//! // `changed` returns Ok since the value is not already marked as seen
125//! // even if the channel is closed.
126//! assert!(rx.changed().await.is_ok());
127//!
128//! // The `changed` call above marks the value as seen.
129//! // The next `changed` call now returns an error as the channel is closed
130//! // AND the current value is seen.
131//! assert!(rx.changed().await.is_err());
132//! # }
133//! ```
134//!
135//! # Closing
136//!
137//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
138//! when all [`Receiver`] handles have been dropped. This indicates that there
139//! is no further interest in the values being produced and work can be stopped.
140//!
141//! The value in the channel will not be dropped until all senders and all
142//! receivers have been dropped.
143//!
144//! # Thread safety
145//!
146//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
147//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
148//! handles may be moved to separate threads and also used concurrently.
149//!
150//! [`Sender`]: crate::sync::watch::Sender
151//! [`Receiver`]: crate::sync::watch::Receiver
152//! [`changed`]: crate::sync::watch::Receiver::changed
153//! [`has_changed`]: crate::sync::watch::Receiver::has_changed
154//! [`borrow`]: crate::sync::watch::Receiver::borrow
155//! [`borrow_and_update`]: crate::sync::watch::Receiver::borrow_and_update
156//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
157//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
158//! [`Receiver::borrow_and_update()`]:
159//! crate::sync::watch::Receiver::borrow_and_update
160//! [`channel`]: crate::sync::watch::channel
161//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
162//! [`Sender::closed`]: crate::sync::watch::Sender::closed
163//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
164
165use crate::sync::notify::Notify;
166use crate::task::coop::cooperative;
167
168use crate::loom::sync::atomic::AtomicUsize;
169use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
170use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
171use std::fmt;
172use std::mem;
173use std::ops;
174use std::panic;
175
176/// Receives values from the associated [`Sender`](struct@Sender).
177///
178/// Instances are created by the [`channel`](fn@channel) function.
179///
180/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
181/// wrapper.
182///
183/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
184#[derive(Debug)]
185pub struct Receiver<T> {
186 /// Pointer to the shared state
187 shared: Arc<Shared<T>>,
188
189 /// Last observed version
190 version: Version,
191}
192
193/// Sends values to the associated [`Receiver`](struct@Receiver).
194///
195/// Instances are created by the [`channel`](fn@channel) function.
196#[derive(Debug)]
197pub struct Sender<T> {
198 shared: Arc<Shared<T>>,
199}
200
201impl<T> Clone for Sender<T> {
202 fn clone(&self) -> Self {
203 self.shared.ref_count_tx.fetch_add(1, Relaxed);
204
205 Self {
206 shared: self.shared.clone(),
207 }
208 }
209}
210
211impl<T: Default> Default for Sender<T> {
212 fn default() -> Self {
213 Self::new(T::default())
214 }
215}
216
217/// Returns a reference to the inner value.
218///
219/// Outstanding borrows hold a read lock on the inner value. This means that
220/// long-lived borrows could cause the producer half to block. It is recommended
221/// to keep the borrow as short-lived as possible. Additionally, if you are
222/// running in an environment that allows `!Send` futures, you must ensure that
223/// the returned `Ref` type is never held alive across an `.await` point,
224/// otherwise, it can lead to a deadlock.
225///
226/// The priority policy of the lock is dependent on the underlying lock
227/// implementation, and this type does not guarantee that any particular policy
228/// will be used. In particular, a producer which is waiting to acquire the lock
229/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
230///
231/// <details><summary>Potential deadlock example</summary>
232///
233/// ```text
234/// // Task 1 (on thread A) | // Task 2 (on thread B)
235/// let _ref1 = rx.borrow(); |
236/// | // will block
237/// | let _ = tx.send(());
238/// // may deadlock |
239/// let _ref2 = rx.borrow(); |
240/// ```
241/// </details>
242#[derive(Debug)]
243pub struct Ref<'a, T> {
244 inner: RwLockReadGuard<'a, T>,
245 has_changed: bool,
246}
247
248impl<'a, T> Ref<'a, T> {
249 /// Indicates if the borrowed value is considered as _changed_ since the last
250 /// time it has been marked as seen.
251 ///
252 /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
253 ///
254 /// When borrowed from the [`Sender`] this function will always return `false`.
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// use tokio::sync::watch;
260 ///
261 /// # #[tokio::main(flavor = "current_thread")]
262 /// # async fn main() {
263 /// let (tx, mut rx) = watch::channel("hello");
264 ///
265 /// tx.send("goodbye").unwrap();
266 /// // The sender does never consider the value as changed.
267 /// assert!(!tx.borrow().has_changed());
268 ///
269 /// // Drop the sender immediately, just for testing purposes.
270 /// drop(tx);
271 ///
272 /// // Even if the sender has already been dropped...
273 /// assert!(rx.has_changed().is_err());
274 /// // ...the modified value is still readable and detected as changed.
275 /// assert_eq!(*rx.borrow(), "goodbye");
276 /// assert!(rx.borrow().has_changed());
277 ///
278 /// // Read the changed value and mark it as seen.
279 /// {
280 /// let received = rx.borrow_and_update();
281 /// assert_eq!(*received, "goodbye");
282 /// assert!(received.has_changed());
283 /// // Release the read lock when leaving this scope.
284 /// }
285 ///
286 /// // Now the value has already been marked as seen and could
287 /// // never be modified again (after the sender has been dropped).
288 /// assert!(!rx.borrow().has_changed());
289 /// # }
290 /// ```
291 pub fn has_changed(&self) -> bool {
292 self.has_changed
293 }
294}
295
296struct Shared<T> {
297 /// The most recent value.
298 value: RwLock<T>,
299
300 /// The current version.
301 ///
302 /// The lowest bit represents a "closed" state. The rest of the bits
303 /// represent the current version.
304 state: AtomicState,
305
306 /// Tracks the number of `Receiver` instances.
307 ref_count_rx: AtomicUsize,
308
309 /// Tracks the number of `Sender` instances.
310 ref_count_tx: AtomicUsize,
311
312 /// Notifies waiting receivers that the value changed.
313 notify_rx: big_notify::BigNotify,
314
315 /// Notifies any task listening for `Receiver` dropped events.
316 notify_tx: Notify,
317}
318
319impl<T: fmt::Debug> fmt::Debug for Shared<T> {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 let state = self.state.load();
322 f.debug_struct("Shared")
323 .field("value", &self.value)
324 .field("version", &state.version())
325 .field("is_closed", &state.is_closed())
326 .field("ref_count_rx", &self.ref_count_rx)
327 .finish()
328 }
329}
330
331pub mod error {
332 //! Watch error types.
333
334 use std::error::Error;
335 use std::fmt;
336
337 /// Error produced when sending a value fails.
338 #[derive(PartialEq, Eq, Clone, Copy)]
339 pub struct SendError<T>(pub T);
340
341 // ===== impl SendError =====
342
343 impl<T> fmt::Debug for SendError<T> {
344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345 f.debug_struct("SendError").finish_non_exhaustive()
346 }
347 }
348
349 impl<T> fmt::Display for SendError<T> {
350 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
351 write!(fmt, "channel closed")
352 }
353 }
354
355 impl<T> Error for SendError<T> {}
356
357 /// Error produced when receiving a change notification.
358 #[derive(Debug, Clone)]
359 pub struct RecvError(pub(super) ());
360
361 // ===== impl RecvError =====
362
363 impl fmt::Display for RecvError {
364 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
365 write!(fmt, "channel closed")
366 }
367 }
368
369 impl Error for RecvError {}
370}
371
372mod big_notify {
373 use super::Notify;
374 use crate::sync::notify::Notified;
375
376 // To avoid contention on the lock inside the `Notify`, we store multiple
377 // copies of it. Then, we use either circular access or randomness to spread
378 // out threads over different `Notify` objects.
379 //
380 // Some simple benchmarks show that randomness performs slightly better than
381 // circular access (probably due to contention on `next`), so we prefer to
382 // use randomness when Tokio is compiled with a random number generator.
383 //
384 // When the random number generator is not available, we fall back to
385 // circular access.
386
387 pub(super) struct BigNotify {
388 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
389 next: std::sync::atomic::AtomicUsize,
390 inner: [Notify; 8],
391 }
392
393 impl BigNotify {
394 pub(super) fn new() -> Self {
395 Self {
396 #[cfg(not(all(
397 not(loom),
398 feature = "sync",
399 any(feature = "rt", feature = "macros")
400 )))]
401 next: std::sync::atomic::AtomicUsize::new(0),
402 inner: Default::default(),
403 }
404 }
405
406 pub(super) fn notify_waiters(&self) {
407 for notify in &self.inner {
408 notify.notify_waiters();
409 }
410 }
411
412 /// This function implements the case where randomness is not available.
413 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
414 pub(super) fn notified(&self) -> Notified<'_> {
415 let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
416 self.inner[i].notified()
417 }
418
419 /// This function implements the case where randomness is available.
420 #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
421 pub(super) fn notified(&self) -> Notified<'_> {
422 let i = crate::runtime::context::thread_rng_n(8) as usize;
423 self.inner[i].notified()
424 }
425 }
426}
427
428use self::state::{AtomicState, Version};
429mod state {
430 use crate::loom::sync::atomic::AtomicUsize;
431 use crate::loom::sync::atomic::Ordering;
432
433 const CLOSED_BIT: usize = 1;
434
435 // Using 2 as the step size preserves the `CLOSED_BIT`.
436 const STEP_SIZE: usize = 2;
437
438 /// The version part of the state. The lowest bit is always zero.
439 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
440 pub(super) struct Version(usize);
441
442 /// Snapshot of the state. The first bit is used as the CLOSED bit.
443 /// The remaining bits are used as the version.
444 ///
445 /// The CLOSED bit tracks whether all senders have been dropped. Dropping all
446 /// receivers does not set it.
447 #[derive(Copy, Clone, Debug)]
448 pub(super) struct StateSnapshot(usize);
449
450 /// The state stored in an atomic integer.
451 ///
452 /// The `Sender` uses `Release` ordering for storing a new state
453 /// and the `Receiver`s use `Acquire` ordering for loading the
454 /// current state. This ensures that written values are seen by
455 /// the `Receiver`s for a proper handover.
456 #[derive(Debug)]
457 pub(super) struct AtomicState(AtomicUsize);
458
459 impl Version {
460 /// Decrements the version.
461 pub(super) fn decrement(&mut self) {
462 // Using a wrapping decrement here is required to ensure that the
463 // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
464 // which wraps on overflow.
465 self.0 = self.0.wrapping_sub(STEP_SIZE);
466 }
467
468 pub(super) const INITIAL: Self = Version(0);
469 }
470
471 impl StateSnapshot {
472 /// Extract the version from the state.
473 pub(super) fn version(self) -> Version {
474 Version(self.0 & !CLOSED_BIT)
475 }
476
477 /// Is the closed bit set?
478 pub(super) fn is_closed(self) -> bool {
479 (self.0 & CLOSED_BIT) == CLOSED_BIT
480 }
481 }
482
483 impl AtomicState {
484 /// Create a new `AtomicState` that is not closed and which has the
485 /// version set to `Version::INITIAL`.
486 pub(super) fn new() -> Self {
487 AtomicState(AtomicUsize::new(Version::INITIAL.0))
488 }
489
490 /// Load the current value of the state.
491 ///
492 /// Only used by the receiver and for debugging purposes.
493 ///
494 /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
495 /// of the shared value with the sender side (single writer). The state is always
496 /// updated after modifying and before releasing the (exclusive) lock on the
497 /// shared value.
498 pub(super) fn load(&self) -> StateSnapshot {
499 StateSnapshot(self.0.load(Ordering::Acquire))
500 }
501
502 /// Increment the version counter.
503 pub(super) fn increment_version_while_locked(&self) {
504 // Use `Release` ordering to ensure that the shared value
505 // has been written before updating the version. The shared
506 // value is still protected by an exclusive lock during this
507 // method.
508 self.0.fetch_add(STEP_SIZE, Ordering::Release);
509 }
510
511 /// Set the closed bit in the state.
512 pub(super) fn set_closed(&self) {
513 self.0.fetch_or(CLOSED_BIT, Ordering::Release);
514 }
515 }
516}
517
518/// Creates a new watch channel, returning the "send" and "receive" handles.
519///
520/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
521/// Only the last value sent is made available to the [`Receiver`] half. All
522/// intermediate values are dropped.
523///
524/// # Examples
525///
526/// The following example prints `hello! world! `.
527///
528/// ```
529/// use tokio::sync::watch;
530/// use tokio::time::{Duration, sleep};
531///
532/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
533/// let (tx, mut rx) = watch::channel("hello");
534///
535/// tokio::spawn(async move {
536/// // Use the equivalent of a "do-while" loop so the initial value is
537/// // processed before awaiting the `changed()` future.
538/// loop {
539/// println!("{}! ", *rx.borrow_and_update());
540/// if rx.changed().await.is_err() {
541/// break;
542/// }
543/// }
544/// });
545///
546/// sleep(Duration::from_millis(100)).await;
547/// tx.send("world")?;
548/// # Ok(())
549/// # }
550/// ```
551///
552/// [`Sender`]: struct@Sender
553/// [`Receiver`]: struct@Receiver
554pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
555 let shared = Arc::new(Shared {
556 value: RwLock::new(init),
557 state: AtomicState::new(),
558 ref_count_rx: AtomicUsize::new(1),
559 ref_count_tx: AtomicUsize::new(1),
560 notify_rx: big_notify::BigNotify::new(),
561 notify_tx: Notify::new(),
562 });
563
564 let tx = Sender {
565 shared: shared.clone(),
566 };
567
568 let rx = Receiver {
569 shared,
570 version: Version::INITIAL,
571 };
572
573 (tx, rx)
574}
575
576impl<T> Receiver<T> {
577 fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
578 // No synchronization necessary as this is only used as a counter and
579 // not memory access.
580 shared.ref_count_rx.fetch_add(1, Relaxed);
581
582 Self { shared, version }
583 }
584
585 /// Returns a reference to the most recently sent value.
586 ///
587 /// This method does not mark the returned value as seen, so future calls to
588 /// [`changed`] may return immediately even if you have already seen the
589 /// value with a call to `borrow`.
590 ///
591 /// Outstanding borrows hold a read lock on the inner value. This means that
592 /// long-lived borrows could cause the producer half to block. It is recommended
593 /// to keep the borrow as short-lived as possible. Additionally, if you are
594 /// running in an environment that allows `!Send` futures, you must ensure that
595 /// the returned `Ref` type is never held alive across an `.await` point,
596 /// otherwise, it can lead to a deadlock.
597 ///
598 /// The priority policy of the lock is dependent on the underlying lock
599 /// implementation, and this type does not guarantee that any particular policy
600 /// will be used. In particular, a producer which is waiting to acquire the lock
601 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
602 ///
603 /// <details><summary>Potential deadlock example</summary>
604 ///
605 /// ```text
606 /// // Task 1 (on thread A) | // Task 2 (on thread B)
607 /// let _ref1 = rx.borrow(); |
608 /// | // will block
609 /// | let _ = tx.send(());
610 /// // may deadlock |
611 /// let _ref2 = rx.borrow(); |
612 /// ```
613 /// </details>
614 ///
615 /// For more information on when to use this method versus
616 /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
617 ///
618 /// [`changed`]: Receiver::changed
619 /// [`borrow_and_update`]: Receiver::borrow_and_update
620 ///
621 /// # Examples
622 ///
623 /// ```
624 /// use tokio::sync::watch;
625 ///
626 /// let (_, rx) = watch::channel("hello");
627 /// assert_eq!(*rx.borrow(), "hello");
628 /// ```
629 pub fn borrow(&self) -> Ref<'_, T> {
630 let inner = self.shared.value.read();
631
632 // After obtaining a read-lock no concurrent writes could occur
633 // and the loaded version matches that of the borrowed reference.
634 let new_version = self.shared.state.load().version();
635 let has_changed = self.version != new_version;
636
637 Ref { inner, has_changed }
638 }
639
640 /// Returns a reference to the most recently sent value and marks that value
641 /// as seen.
642 ///
643 /// This method marks the current value as seen. Subsequent calls to [`changed`]
644 /// will not return immediately until the [`Sender`] has modified the shared
645 /// value again.
646 ///
647 /// Outstanding borrows hold a read lock on the inner value. This means that
648 /// long-lived borrows could cause the producer half to block. It is recommended
649 /// to keep the borrow as short-lived as possible. Additionally, if you are
650 /// running in an environment that allows `!Send` futures, you must ensure that
651 /// the returned `Ref` type is never held alive across an `.await` point,
652 /// otherwise, it can lead to a deadlock.
653 ///
654 /// The priority policy of the lock is dependent on the underlying lock
655 /// implementation, and this type does not guarantee that any particular policy
656 /// will be used. In particular, a producer which is waiting to acquire the lock
657 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
658 ///
659 /// <details><summary>Potential deadlock example</summary>
660 ///
661 /// ```text
662 /// // Task 1 (on thread A) | // Task 2 (on thread B)
663 /// let _ref1 = rx1.borrow_and_update(); |
664 /// | // will block
665 /// | let _ = tx.send(());
666 /// // may deadlock |
667 /// let _ref2 = rx2.borrow_and_update(); |
668 /// ```
669 /// </details>
670 ///
671 /// For more information on when to use this method versus [`borrow`], see
672 /// [here](self#borrow_and_update-versus-borrow).
673 ///
674 /// [`changed`]: Receiver::changed
675 /// [`borrow`]: Receiver::borrow
676 pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
677 let inner = self.shared.value.read();
678
679 // After obtaining a read-lock no concurrent writes could occur
680 // and the loaded version matches that of the borrowed reference.
681 let new_version = self.shared.state.load().version();
682 let has_changed = self.version != new_version;
683
684 // Mark the shared value as seen by updating the version
685 self.version = new_version;
686
687 Ref { inner, has_changed }
688 }
689
690 /// Checks if this channel contains a message that this receiver has not yet
691 /// seen. The current value will not be marked as seen.
692 ///
693 /// Although this method is called `has_changed`, it does not check
694 /// messages for equality, so this call will return true even if the current
695 /// message is equal to the previous message.
696 ///
697 /// # Errors
698 ///
699 /// Returns a [`RecvError`](error::RecvError) if and only if the channel has been closed.
700 ///
701 /// # Examples
702 ///
703 /// ## Basic usage
704 ///
705 /// ```
706 /// use tokio::sync::watch;
707 ///
708 /// # #[tokio::main(flavor = "current_thread")]
709 /// # async fn main() {
710 /// let (tx, mut rx) = watch::channel("hello");
711 ///
712 /// tx.send("goodbye").unwrap();
713 ///
714 /// assert!(rx.has_changed().unwrap());
715 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
716 ///
717 /// // The value has been marked as seen
718 /// assert!(!rx.has_changed().unwrap());
719 /// # }
720 /// ```
721 ///
722 /// ## Closed channel example
723 ///
724 /// ```
725 /// use tokio::sync::watch;
726 ///
727 /// # #[tokio::main(flavor = "current_thread")]
728 /// # async fn main() {
729 /// let (tx, rx) = watch::channel("hello");
730 /// tx.send("goodbye").unwrap();
731 ///
732 /// drop(tx);
733 ///
734 /// // The channel is closed
735 /// assert!(rx.has_changed().is_err());
736 /// # }
737 /// ```
738 pub fn has_changed(&self) -> Result<bool, error::RecvError> {
739 // Load the version from the state
740 let state = self.shared.state.load();
741 if state.is_closed() {
742 // All senders have dropped.
743 return Err(error::RecvError(()));
744 }
745 let new_version = state.version();
746
747 Ok(self.version != new_version)
748 }
749
750 /// Marks the state as changed.
751 ///
752 /// After invoking this method [`has_changed()`](Self::has_changed)
753 /// returns `true` and [`changed()`](Self::changed) returns
754 /// immediately, regardless of whether a new value has been sent.
755 ///
756 /// This is useful for triggering an initial change notification after
757 /// subscribing to synchronize new receivers.
758 pub fn mark_changed(&mut self) {
759 self.version.decrement();
760 }
761
762 /// Marks the state as unchanged.
763 ///
764 /// The current value will be considered seen by the receiver.
765 ///
766 /// This is useful if you are not interested in the current value
767 /// visible in the receiver.
768 pub fn mark_unchanged(&mut self) {
769 let current_version = self.shared.state.load().version();
770 self.version = current_version;
771 }
772
773 /// Waits for a change notification, then marks the current value as seen.
774 ///
775 /// If the current value in the channel has not yet been marked seen when
776 /// this method is called, the method marks that value seen and returns
777 /// immediately. If the newest value has already been marked seen, then the
778 /// method sleeps until a new message is sent by a [`Sender`] connected to
779 /// this `Receiver`, or until all [`Sender`]s are dropped.
780 ///
781 /// For more information, see
782 /// [*Change notifications*](self#change-notifications) in the module-level documentation.
783 ///
784 /// # Errors
785 ///
786 /// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__
787 /// the current value is seen.
788 ///
789 /// # Cancel safety
790 ///
791 /// This method is cancel safe. If you use it as the event in a
792 /// [`tokio::select!`](crate::select) statement and some other branch
793 /// completes first, then it is guaranteed that no values have been marked
794 /// seen by this call to `changed`.
795 ///
796 /// [`Sender`]: struct@Sender
797 ///
798 /// # Examples
799 ///
800 /// ```
801 /// use tokio::sync::watch;
802 ///
803 /// # #[tokio::main(flavor = "current_thread")]
804 /// # async fn main() {
805 /// let (tx, mut rx) = watch::channel("hello");
806 ///
807 /// tokio::spawn(async move {
808 /// tx.send("goodbye").unwrap();
809 /// });
810 ///
811 /// assert!(rx.changed().await.is_ok());
812 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
813 ///
814 /// // The `tx` handle has been dropped
815 /// assert!(rx.changed().await.is_err());
816 /// # }
817 /// ```
818 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
819 cooperative(changed_impl(&self.shared, &mut self.version)).await
820 }
821
822 /// Waits for a value that satisfies the provided condition.
823 ///
824 /// This method will call the provided closure whenever something is sent on
825 /// the channel. Once the closure returns `true`, this method will return a
826 /// reference to the value that was passed to the closure.
827 ///
828 /// Before `wait_for` starts waiting for changes, it will call the closure
829 /// on the current value. If the closure returns `true` when given the
830 /// current value, then `wait_for` will immediately return a reference to
831 /// the current value. This is the case even if the current value is already
832 /// considered seen.
833 ///
834 /// The watch channel only keeps track of the most recent value, so if
835 /// several messages are sent faster than `wait_for` is able to call the
836 /// closure, then it may skip some updates. Whenever the closure is called,
837 /// it will be called with the most recent value.
838 ///
839 /// When this function returns, the value that was passed to the closure
840 /// when it returned `true` will be considered seen.
841 ///
842 /// If the channel is closed, then `wait_for` will return a [`RecvError`].
843 /// Once this happens, no more messages can ever be sent on the channel.
844 /// When an error is returned, it is guaranteed that the closure has been
845 /// called on the last value, and that it returned `false` for that value.
846 /// (If the closure returned `true`, then the last value would have been
847 /// returned instead of the error.)
848 ///
849 /// Like the [`borrow`] method, the returned borrow holds a read lock on the
850 /// inner value. This means that long-lived borrows could cause the producer
851 /// half to block. It is recommended to keep the borrow as short-lived as
852 /// possible. See the documentation of `borrow` for more information on
853 /// this.
854 ///
855 /// [`borrow`]: Receiver::borrow
856 /// [`RecvError`]: error::RecvError
857 ///
858 /// # Cancel safety
859 ///
860 /// This method is cancel safe. If you use it as the event in a
861 /// [`tokio::select!`](crate::select) statement and some other branch
862 /// completes first, then it is guaranteed that the last seen value `val`
863 /// (if any) satisfies `f(val) == false`.
864 ///
865 /// # Panics
866 ///
867 /// If and only if the closure `f` panics. In that case, no resource owned
868 /// or shared by this [`Receiver`] will be poisoned.
869 ///
870 /// # Examples
871 ///
872 /// ```
873 /// use tokio::sync::watch;
874 /// use tokio::time::{sleep, Duration};
875 ///
876 /// #[tokio::main(flavor = "current_thread", start_paused = true)]
877 /// async fn main() {
878 /// let (tx, mut rx) = watch::channel("hello");
879 ///
880 /// tokio::spawn(async move {
881 /// sleep(Duration::from_secs(1)).await;
882 /// tx.send("goodbye").unwrap();
883 /// });
884 ///
885 /// assert!(rx.wait_for(|val| *val == "goodbye").await.is_ok());
886 /// assert_eq!(*rx.borrow(), "goodbye");
887 /// }
888 /// ```
889 pub async fn wait_for(
890 &mut self,
891 f: impl FnMut(&T) -> bool,
892 ) -> Result<Ref<'_, T>, error::RecvError> {
893 cooperative(self.wait_for_inner(f)).await
894 }
895
896 async fn wait_for_inner(
897 &mut self,
898 mut f: impl FnMut(&T) -> bool,
899 ) -> Result<Ref<'_, T>, error::RecvError> {
900 let mut closed = false;
901 loop {
902 {
903 let inner = self.shared.value.read();
904
905 let new_version = self.shared.state.load().version();
906 let has_changed = self.version != new_version;
907 self.version = new_version;
908
909 if !closed || has_changed {
910 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
911 match result {
912 Ok(true) => {
913 return Ok(Ref { inner, has_changed });
914 }
915 Ok(false) => {
916 // Skip the value.
917 }
918 Err(panicked) => {
919 // Drop the read-lock to avoid poisoning it.
920 drop(inner);
921 // Forward the panic to the caller.
922 panic::resume_unwind(panicked);
923 // Unreachable
924 }
925 };
926 }
927 }
928
929 if closed {
930 return Err(error::RecvError(()));
931 }
932
933 // Wait for the value to change.
934 closed = changed_impl(&self.shared, &mut self.version).await.is_err();
935 }
936 }
937
938 /// Returns `true` if receivers belong to the same channel.
939 ///
940 /// # Examples
941 ///
942 /// ```
943 /// let (tx, rx) = tokio::sync::watch::channel(true);
944 /// let rx2 = rx.clone();
945 /// assert!(rx.same_channel(&rx2));
946 ///
947 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
948 /// assert!(!rx3.same_channel(&rx2));
949 /// ```
950 pub fn same_channel(&self, other: &Self) -> bool {
951 Arc::ptr_eq(&self.shared, &other.shared)
952 }
953
954 cfg_process_driver! {
955 pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
956 maybe_changed(&self.shared, &mut self.version)
957 }
958 }
959}
960
961fn maybe_changed<T>(
962 shared: &Shared<T>,
963 version: &mut Version,
964) -> Option<Result<(), error::RecvError>> {
965 // Load the version from the state
966 let state = shared.state.load();
967 let new_version = state.version();
968
969 if *version != new_version {
970 // Observe the new version and return
971 *version = new_version;
972 return Some(Ok(()));
973 }
974
975 if state.is_closed() {
976 // All senders have been dropped.
977 return Some(Err(error::RecvError(())));
978 }
979
980 None
981}
982
983async fn changed_impl<T>(
984 shared: &Shared<T>,
985 version: &mut Version,
986) -> Result<(), error::RecvError> {
987 crate::trace::async_trace_leaf().await;
988
989 loop {
990 // In order to avoid a race condition, we first request a notification,
991 // **then** check the current value's version. If a new version exists,
992 // the notification request is dropped.
993 let notified = shared.notify_rx.notified();
994
995 if let Some(ret) = maybe_changed(shared, version) {
996 return ret;
997 }
998
999 notified.await;
1000 // loop around again in case the wake-up was spurious
1001 }
1002}
1003
1004impl<T> Clone for Receiver<T> {
1005 fn clone(&self) -> Self {
1006 let version = self.version;
1007 let shared = self.shared.clone();
1008
1009 Self::from_shared(version, shared)
1010 }
1011}
1012
1013impl<T> Drop for Receiver<T> {
1014 fn drop(&mut self) {
1015 // No synchronization necessary as this is only used as a counter and
1016 // not memory access.
1017 if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
1018 // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
1019 self.shared.notify_tx.notify_waiters();
1020 }
1021 }
1022}
1023
1024impl<T> Sender<T> {
1025 /// Creates the sending-half of the [`watch`] channel.
1026 ///
1027 /// See documentation of [`watch::channel`] for errors when calling this function.
1028 /// Beware that attempting to send a value when there are no receivers will
1029 /// return an error.
1030 ///
1031 /// [`watch`]: crate::sync::watch
1032 /// [`watch::channel`]: crate::sync::watch
1033 ///
1034 /// # Examples
1035 /// ```
1036 /// let sender = tokio::sync::watch::Sender::new(0u8);
1037 /// assert!(sender.send(3).is_err());
1038 /// let _rec = sender.subscribe();
1039 /// assert!(sender.send(4).is_ok());
1040 /// ```
1041 pub fn new(init: T) -> Self {
1042 let (tx, _) = channel(init);
1043 tx
1044 }
1045
1046 /// Sends a new value via the channel, notifying all receivers.
1047 ///
1048 /// This method fails if the channel is closed, which is the case when
1049 /// every receiver has been dropped. It is possible to reopen the channel
1050 /// using the [`subscribe`] method. However, when `send` fails, the value
1051 /// isn't made available for future receivers (but returned with the
1052 /// [`SendError`]).
1053 ///
1054 /// To always make a new value available for future receivers, even if no
1055 /// receiver currently exists, one of the other send methods
1056 /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
1057 /// used instead.
1058 ///
1059 /// [`subscribe`]: Sender::subscribe
1060 /// [`SendError`]: error::SendError
1061 /// [`send_if_modified`]: Sender::send_if_modified
1062 /// [`send_modify`]: Sender::send_modify
1063 /// [`send_replace`]: Sender::send_replace
1064 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
1065 // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
1066 if 0 == self.receiver_count() {
1067 return Err(error::SendError(value));
1068 }
1069
1070 self.send_replace(value);
1071 Ok(())
1072 }
1073
1074 /// Modifies the watched value **unconditionally** in-place,
1075 /// notifying all receivers.
1076 ///
1077 /// This can be useful for modifying the watched value, without
1078 /// having to allocate a new instance. Additionally, this
1079 /// method permits sending values even when there are no receivers.
1080 ///
1081 /// Prefer to use the more versatile function [`Self::send_if_modified()`]
1082 /// if the value is only modified conditionally during the mutable borrow
1083 /// to prevent unneeded change notifications for unmodified values.
1084 ///
1085 /// # Panics
1086 ///
1087 /// This function panics when the invocation of the `modify` closure panics.
1088 /// No receivers are notified when panicking. All changes of the watched
1089 /// value applied by the closure before panicking will be visible in
1090 /// subsequent calls to `borrow`.
1091 ///
1092 /// # Examples
1093 ///
1094 /// ```
1095 /// use tokio::sync::watch;
1096 ///
1097 /// struct State {
1098 /// counter: usize,
1099 /// }
1100 /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
1101 /// state_tx.send_modify(|state| state.counter += 1);
1102 /// assert_eq!(state_rx.borrow().counter, 1);
1103 /// ```
1104 pub fn send_modify<F>(&self, modify: F)
1105 where
1106 F: FnOnce(&mut T),
1107 {
1108 self.send_if_modified(|value| {
1109 modify(value);
1110 true
1111 });
1112 }
1113
1114 /// Modifies the watched value **conditionally** in-place,
1115 /// notifying all receivers only if modified.
1116 ///
1117 /// This can be useful for modifying the watched value, without
1118 /// having to allocate a new instance. Additionally, this
1119 /// method permits sending values even when there are no receivers.
1120 ///
1121 /// The `modify` closure must return `true` if the value has actually
1122 /// been modified during the mutable borrow. It should only return `false`
1123 /// if the value is guaranteed to be unmodified despite the mutable
1124 /// borrow.
1125 ///
1126 /// Receivers are only notified if the closure returned `true`. If the
1127 /// closure has modified the value but returned `false` this results
1128 /// in a *silent modification*, i.e. the modified value will be visible
1129 /// in subsequent calls to `borrow`, but receivers will not receive
1130 /// a change notification.
1131 ///
1132 /// Returns the result of the closure, i.e. `true` if the value has
1133 /// been modified and `false` otherwise.
1134 ///
1135 /// # Panics
1136 ///
1137 /// This function panics when the invocation of the `modify` closure panics.
1138 /// No receivers are notified when panicking. All changes of the watched
1139 /// value applied by the closure before panicking will be visible in
1140 /// subsequent calls to `borrow`.
1141 ///
1142 /// # Examples
1143 ///
1144 /// ```
1145 /// use tokio::sync::watch;
1146 ///
1147 /// struct State {
1148 /// counter: usize,
1149 /// }
1150 /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1151 /// let inc_counter_if_odd = |state: &mut State| {
1152 /// if state.counter % 2 == 1 {
1153 /// state.counter += 1;
1154 /// return true;
1155 /// }
1156 /// false
1157 /// };
1158 ///
1159 /// assert_eq!(state_rx.borrow().counter, 1);
1160 ///
1161 /// assert!(!state_rx.has_changed().unwrap());
1162 /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1163 /// assert!(state_rx.has_changed().unwrap());
1164 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1165 ///
1166 /// assert!(!state_rx.has_changed().unwrap());
1167 /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1168 /// assert!(!state_rx.has_changed().unwrap());
1169 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1170 /// ```
1171 pub fn send_if_modified<F>(&self, modify: F) -> bool
1172 where
1173 F: FnOnce(&mut T) -> bool,
1174 {
1175 {
1176 // Acquire the write lock and update the value.
1177 let mut lock = self.shared.value.write();
1178
1179 // Update the value and catch possible panic inside func.
1180 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1181 match result {
1182 Ok(modified) => {
1183 if !modified {
1184 // Abort, i.e. don't notify receivers if unmodified
1185 return false;
1186 }
1187 // Continue if modified
1188 }
1189 Err(panicked) => {
1190 // Drop the lock to avoid poisoning it.
1191 drop(lock);
1192 // Forward the panic to the caller.
1193 panic::resume_unwind(panicked);
1194 // Unreachable
1195 }
1196 };
1197
1198 self.shared.state.increment_version_while_locked();
1199
1200 // Release the write lock.
1201 //
1202 // Incrementing the version counter while holding the lock ensures
1203 // that receivers are able to figure out the version number of the
1204 // value they are currently looking at.
1205 drop(lock);
1206 }
1207
1208 self.shared.notify_rx.notify_waiters();
1209
1210 true
1211 }
1212
1213 /// Sends a new value via the channel, notifying all receivers and returning
1214 /// the previous value in the channel.
1215 ///
1216 /// This can be useful for reusing the buffers inside a watched value.
1217 /// Additionally, this method permits sending values even when there are no
1218 /// receivers.
1219 ///
1220 /// # Examples
1221 ///
1222 /// ```
1223 /// use tokio::sync::watch;
1224 ///
1225 /// let (tx, _rx) = watch::channel(1);
1226 /// assert_eq!(tx.send_replace(2), 1);
1227 /// assert_eq!(tx.send_replace(3), 2);
1228 /// ```
1229 pub fn send_replace(&self, mut value: T) -> T {
1230 // swap old watched value with the new one
1231 self.send_modify(|old| mem::swap(old, &mut value));
1232
1233 value
1234 }
1235
1236 /// Returns a reference to the most recently sent value
1237 ///
1238 /// Outstanding borrows hold a read lock on the inner value. This means that
1239 /// long-lived borrows could cause the producer half to block. It is recommended
1240 /// to keep the borrow as short-lived as possible. Additionally, if you are
1241 /// running in an environment that allows `!Send` futures, you must ensure that
1242 /// the returned `Ref` type is never held alive across an `.await` point,
1243 /// otherwise, it can lead to a deadlock.
1244 ///
1245 /// # Examples
1246 ///
1247 /// ```
1248 /// use tokio::sync::watch;
1249 ///
1250 /// let (tx, _) = watch::channel("hello");
1251 /// assert_eq!(*tx.borrow(), "hello");
1252 /// ```
1253 pub fn borrow(&self) -> Ref<'_, T> {
1254 let inner = self.shared.value.read();
1255
1256 // The sender/producer always sees the current version
1257 let has_changed = false;
1258
1259 Ref { inner, has_changed }
1260 }
1261
1262 /// Checks if the channel has been closed. This happens when all receivers
1263 /// have dropped.
1264 ///
1265 /// # Examples
1266 ///
1267 /// ```
1268 /// let (tx, rx) = tokio::sync::watch::channel(());
1269 /// assert!(!tx.is_closed());
1270 ///
1271 /// drop(rx);
1272 /// assert!(tx.is_closed());
1273 /// ```
1274 pub fn is_closed(&self) -> bool {
1275 self.receiver_count() == 0
1276 }
1277
1278 /// Completes when all receivers have dropped.
1279 ///
1280 /// This allows the producer to get notified when interest in the produced
1281 /// values is canceled and immediately stop doing work. Once a channel is
1282 /// closed, the only way to reopen it is to call [`Sender::subscribe`] to
1283 /// get a new receiver.
1284 ///
1285 /// If the channel becomes closed for a brief amount of time (e.g., the last
1286 /// receiver is dropped and then `subscribe` is called), then this call to
1287 /// `closed` might return, but it is also possible that it does not "notice"
1288 /// that the channel was closed for a brief amount of time.
1289 ///
1290 /// # Cancel safety
1291 ///
1292 /// This method is cancel safe.
1293 ///
1294 /// # Examples
1295 ///
1296 /// ```
1297 /// use tokio::sync::watch;
1298 ///
1299 /// # #[tokio::main(flavor = "current_thread")]
1300 /// # async fn main() {
1301 /// let (tx, rx) = watch::channel("hello");
1302 ///
1303 /// tokio::spawn(async move {
1304 /// // use `rx`
1305 /// drop(rx);
1306 /// });
1307 ///
1308 /// // Waits for `rx` to drop
1309 /// tx.closed().await;
1310 /// println!("the `rx` handles dropped")
1311 /// # }
1312 /// ```
1313 pub async fn closed(&self) {
1314 cooperative(async {
1315 crate::trace::async_trace_leaf().await;
1316
1317 while self.receiver_count() > 0 {
1318 let notified = self.shared.notify_tx.notified();
1319
1320 if self.receiver_count() == 0 {
1321 return;
1322 }
1323
1324 notified.await;
1325 // The channel could have been reopened in the meantime by calling
1326 // `subscribe`, so we loop again.
1327 }
1328 })
1329 .await;
1330 }
1331
1332 /// Creates a new [`Receiver`] connected to this `Sender`.
1333 ///
1334 /// All messages sent before this call to `subscribe` are initially marked
1335 /// as seen by the new `Receiver`.
1336 ///
1337 /// This method can be called even if there are no other receivers. In this
1338 /// case, the channel is reopened.
1339 ///
1340 /// # Examples
1341 ///
1342 /// The new channel will receive messages sent on this `Sender`.
1343 ///
1344 /// ```
1345 /// use tokio::sync::watch;
1346 ///
1347 /// # #[tokio::main(flavor = "current_thread")]
1348 /// # async fn main() {
1349 /// let (tx, _rx) = watch::channel(0u64);
1350 ///
1351 /// tx.send(5).unwrap();
1352 ///
1353 /// let rx = tx.subscribe();
1354 /// assert_eq!(5, *rx.borrow());
1355 ///
1356 /// tx.send(10).unwrap();
1357 /// assert_eq!(10, *rx.borrow());
1358 /// # }
1359 /// ```
1360 ///
1361 /// The most recent message is considered seen by the channel, so this test
1362 /// is guaranteed to pass.
1363 ///
1364 /// ```
1365 /// use tokio::sync::watch;
1366 /// use tokio::time::Duration;
1367 ///
1368 /// # #[tokio::main(flavor = "current_thread")]
1369 /// # async fn main() {
1370 /// let (tx, _rx) = watch::channel(0u64);
1371 /// tx.send(5).unwrap();
1372 /// let mut rx = tx.subscribe();
1373 ///
1374 /// tokio::spawn(async move {
1375 /// // by spawning and sleeping, the message is sent after `main`
1376 /// // hits the call to `changed`.
1377 /// # if false {
1378 /// tokio::time::sleep(Duration::from_millis(10)).await;
1379 /// # }
1380 /// tx.send(100).unwrap();
1381 /// });
1382 ///
1383 /// rx.changed().await.unwrap();
1384 /// assert_eq!(100, *rx.borrow());
1385 /// # }
1386 /// ```
1387 pub fn subscribe(&self) -> Receiver<T> {
1388 let shared = self.shared.clone();
1389 let version = shared.state.load().version();
1390
1391 // The CLOSED bit in the state tracks only whether the sender is
1392 // dropped, so we do not need to unset it if this reopens the channel.
1393 Receiver::from_shared(version, shared)
1394 }
1395
1396 /// Returns the number of receivers that currently exist.
1397 ///
1398 /// # Examples
1399 ///
1400 /// ```
1401 /// use tokio::sync::watch;
1402 ///
1403 /// # #[tokio::main(flavor = "current_thread")]
1404 /// # async fn main() {
1405 /// let (tx, rx1) = watch::channel("hello");
1406 ///
1407 /// assert_eq!(1, tx.receiver_count());
1408 ///
1409 /// let mut _rx2 = rx1.clone();
1410 ///
1411 /// assert_eq!(2, tx.receiver_count());
1412 /// # }
1413 /// ```
1414 pub fn receiver_count(&self) -> usize {
1415 self.shared.ref_count_rx.load(Relaxed)
1416 }
1417
1418 /// Returns the number of senders that currently exist.
1419 ///
1420 /// # Examples
1421 ///
1422 /// ```
1423 /// use tokio::sync::watch;
1424 ///
1425 /// # #[tokio::main(flavor = "current_thread")]
1426 /// # async fn main() {
1427 /// let (tx1, rx) = watch::channel("hello");
1428 ///
1429 /// assert_eq!(1, tx1.sender_count());
1430 ///
1431 /// let tx2 = tx1.clone();
1432 ///
1433 /// assert_eq!(2, tx1.sender_count());
1434 /// assert_eq!(2, tx2.sender_count());
1435 /// # }
1436 /// ```
1437 pub fn sender_count(&self) -> usize {
1438 self.shared.ref_count_tx.load(Relaxed)
1439 }
1440
1441 /// Returns `true` if senders belong to the same channel.
1442 ///
1443 /// # Examples
1444 ///
1445 /// ```
1446 /// let (tx, rx) = tokio::sync::watch::channel(true);
1447 /// let tx2 = tx.clone();
1448 /// assert!(tx.same_channel(&tx2));
1449 ///
1450 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
1451 /// assert!(!tx3.same_channel(&tx2));
1452 /// ```
1453 pub fn same_channel(&self, other: &Self) -> bool {
1454 Arc::ptr_eq(&self.shared, &other.shared)
1455 }
1456}
1457
1458impl<T> Drop for Sender<T> {
1459 fn drop(&mut self) {
1460 if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
1461 self.shared.state.set_closed();
1462 self.shared.notify_rx.notify_waiters();
1463 }
1464 }
1465}
1466
1467// ===== impl Ref =====
1468
1469impl<T> ops::Deref for Ref<'_, T> {
1470 type Target = T;
1471
1472 fn deref(&self) -> &T {
1473 self.inner.deref()
1474 }
1475}
1476
1477#[cfg(all(test, loom))]
1478mod tests {
1479 use futures::future::FutureExt;
1480 use loom::thread;
1481
1482 // test for https://github.com/tokio-rs/tokio/issues/3168
1483 #[test]
1484 fn watch_spurious_wakeup() {
1485 loom::model(|| {
1486 let (send, mut recv) = crate::sync::watch::channel(0i32);
1487
1488 send.send(1).unwrap();
1489
1490 let send_thread = thread::spawn(move || {
1491 send.send(2).unwrap();
1492 send
1493 });
1494
1495 recv.changed().now_or_never();
1496
1497 let send = send_thread.join().unwrap();
1498 let recv_thread = thread::spawn(move || {
1499 recv.changed().now_or_never();
1500 recv.changed().now_or_never();
1501 recv
1502 });
1503
1504 send.send(3).unwrap();
1505
1506 let mut recv = recv_thread.join().unwrap();
1507 let send_thread = thread::spawn(move || {
1508 send.send(2).unwrap();
1509 });
1510
1511 recv.changed().now_or_never();
1512
1513 send_thread.join().unwrap();
1514 });
1515 }
1516
1517 #[test]
1518 fn watch_borrow() {
1519 loom::model(|| {
1520 let (send, mut recv) = crate::sync::watch::channel(0i32);
1521
1522 assert!(send.borrow().eq(&0));
1523 assert!(recv.borrow().eq(&0));
1524
1525 send.send(1).unwrap();
1526 assert!(send.borrow().eq(&1));
1527
1528 let send_thread = thread::spawn(move || {
1529 send.send(2).unwrap();
1530 send
1531 });
1532
1533 recv.changed().now_or_never();
1534
1535 let send = send_thread.join().unwrap();
1536 let recv_thread = thread::spawn(move || {
1537 recv.changed().now_or_never();
1538 recv.changed().now_or_never();
1539 recv
1540 });
1541
1542 send.send(3).unwrap();
1543
1544 let recv = recv_thread.join().unwrap();
1545 assert!(recv.borrow().eq(&3));
1546 assert!(send.borrow().eq(&3));
1547
1548 send.send(2).unwrap();
1549
1550 thread::spawn(move || {
1551 assert!(recv.borrow().eq(&2));
1552 });
1553 assert!(send.borrow().eq(&2));
1554 });
1555 }
1556}