tokio/sync/
semaphore.rs

1use super::batch_semaphore as ll; // low level implementation
2use super::{AcquireError, TryAcquireError};
3#[cfg(all(tokio_unstable, feature = "tracing"))]
4use crate::util::trace;
5use std::sync::Arc;
6
7/// Counting semaphore performing asynchronous permit acquisition.
8///
9/// A semaphore maintains a set of permits. Permits are used to synchronize
10/// access to a shared resource. A semaphore differs from a mutex in that it
11/// can allow more than one concurrent caller to access the shared resource at a
12/// time.
13///
14/// When `acquire` is called and the semaphore has remaining permits, the
15/// function immediately returns a permit. However, if no remaining permits are
16/// available, `acquire` (asynchronously) waits until an outstanding permit is
17/// dropped. At this point, the freed permit is assigned to the caller.
18///
19/// This `Semaphore` is fair, which means that permits are given out in the order
20/// they were requested. This fairness is also applied when `acquire_many` gets
21/// involved, so if a call to `acquire_many` at the front of the queue requests
22/// more permits than currently available, this can prevent a call to `acquire`
23/// from completing, even if the semaphore has enough permits complete the call
24/// to `acquire`.
25///
26/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
27/// utility.
28///
29/// # Examples
30///
31/// Basic usage:
32///
33/// ```
34/// use tokio::sync::{Semaphore, TryAcquireError};
35///
36/// # #[tokio::main(flavor = "current_thread")]
37/// # async fn main() {
38/// let semaphore = Semaphore::new(3);
39///
40/// let a_permit = semaphore.acquire().await.unwrap();
41/// let two_permits = semaphore.acquire_many(2).await.unwrap();
42///
43/// assert_eq!(semaphore.available_permits(), 0);
44///
45/// let permit_attempt = semaphore.try_acquire();
46/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
47/// # }
48/// ```
49///
50/// ## Limit the number of simultaneously opened files in your program
51///
52/// Most operating systems have limits on the number of open file
53/// handles. Even in systems without explicit limits, resource constraints
54/// implicitly set an upper bound on the number of open files. If your
55/// program attempts to open a large number of files and exceeds this
56/// limit, it will result in an error.
57///
58/// This example uses a Semaphore with 100 permits. By acquiring a permit from
59/// the Semaphore before accessing a file, you ensure that your program opens
60/// no more than 100 files at a time. When trying to open the 101st
61/// file, the program will wait until a permit becomes available before
62/// proceeding to open another file.
63/// ```
64/// # #[cfg(not(target_family = "wasm"))]
65/// # {
66/// use std::io::Result;
67/// use tokio::fs::File;
68/// use tokio::sync::Semaphore;
69/// use tokio::io::AsyncWriteExt;
70///
71/// static PERMITS: Semaphore = Semaphore::const_new(100);
72///
73/// async fn write_to_file(message: &[u8]) -> Result<()> {
74///     let _permit = PERMITS.acquire().await.unwrap();
75///     let mut buffer = File::create("example.txt").await?;
76///     buffer.write_all(message).await?;
77///     Ok(()) // Permit goes out of scope here, and is available again for acquisition
78/// }
79/// # }
80/// ```
81///
82/// ## Limit the number of outgoing requests being sent at the same time
83///
84/// In some scenarios, it might be required to limit the number of outgoing
85/// requests being sent in parallel. This could be due to limits of a consumed
86/// API or the network resources of the system the application is running on.
87///
88/// This example uses an `Arc<Semaphore>` with 10 permits. Each task spawned is
89/// given a reference to the semaphore by cloning the `Arc<Semaphore>`. Before
90/// a task sends a request, it must acquire a permit from the semaphore by
91/// calling [`Semaphore::acquire`]. This ensures that at most 10 requests are
92/// sent in parallel at any given time. After a task has sent a request, it
93/// drops the permit to allow other tasks to send requests.
94///
95/// ```
96/// use std::sync::Arc;
97/// use tokio::sync::Semaphore;
98///
99/// # #[tokio::main(flavor = "current_thread")]
100/// # async fn main() {
101/// // Define maximum number of parallel requests.
102/// let semaphore = Arc::new(Semaphore::new(5));
103/// // Spawn many tasks that will send requests.
104/// let mut jhs = Vec::new();
105/// for task_id in 0..50 {
106///     let semaphore = semaphore.clone();
107///     let jh = tokio::spawn(async move {
108///         // Acquire permit before sending request.
109///         let _permit = semaphore.acquire().await.unwrap();
110///         // Send the request.
111///         let response = send_request(task_id).await;
112///         // Drop the permit after the request has been sent.
113///         drop(_permit);
114///         // Handle response.
115///         // ...
116///
117///         response
118///     });
119///     jhs.push(jh);
120/// }
121/// // Collect responses from tasks.
122/// let mut responses = Vec::new();
123/// for jh in jhs {
124///     let response = jh.await.unwrap();
125///     responses.push(response);
126/// }
127/// // Process responses.
128/// // ...
129/// # }
130/// # async fn send_request(task_id: usize) {
131/// #     // Send request.
132/// # }
133/// ```
134///
135/// ## Limit the number of incoming requests being handled at the same time
136///
137/// Similar to limiting the number of simultaneously opened files, network handles
138/// are a limited resource. Allowing an unbounded amount of requests to be processed
139/// could result in a denial-of-service, among many other issues.
140///
141/// This example uses an `Arc<Semaphore>` instead of a global variable.
142/// To limit the number of requests that can be processed at the time,
143/// we acquire a permit for each task before spawning it. Once acquired,
144/// a new task is spawned; and once finished, the permit is dropped inside
145/// of the task to allow others to spawn. Permits must be acquired via
146/// [`Semaphore::acquire_owned`] to be movable across the task boundary.
147/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
148///
149/// ```no_run
150/// # #[cfg(not(target_family = "wasm"))]
151/// # {
152/// use std::sync::Arc;
153/// use tokio::sync::Semaphore;
154/// use tokio::net::TcpListener;
155///
156/// #[tokio::main]
157/// async fn main() -> std::io::Result<()> {
158///     let semaphore = Arc::new(Semaphore::new(3));
159///     let listener = TcpListener::bind("127.0.0.1:8080").await?;
160///
161///     loop {
162///         // Acquire permit before accepting the next socket.
163///         //
164///         // We use `acquire_owned` so that we can move `permit` into
165///         // other tasks.
166///         let permit = semaphore.clone().acquire_owned().await.unwrap();
167///         let (mut socket, _) = listener.accept().await?;
168///
169///         tokio::spawn(async move {
170///             // Do work using the socket.
171///             handle_connection(&mut socket).await;
172///             // Drop socket while the permit is still live.
173///             drop(socket);
174///             // Drop the permit, so more tasks can be created.
175///             drop(permit);
176///         });
177///     }
178/// }
179/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
180/// #   // Do work
181/// # }
182/// # }
183/// ```
184///
185/// ## Prevent tests from running in parallel
186///
187/// By default, Rust runs tests in the same file in parallel. However, in some
188/// cases, running two tests in parallel may lead to problems. For example, this
189/// can happen when tests use the same database.
190///
191/// Consider the following scenario:
192/// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
193///    the value using the same key to verify the insertion.
194/// 2. `test_update`: Inserts a key, then updates the key to a new value and
195///    verifies that the value has been accurately updated.
196/// 3. `test_others`: A third test that doesn't modify the database state. It
197///    can run in parallel with the other tests.
198///
199/// In this example, `test_insert` and `test_update` need to run in sequence to
200/// work, but it doesn't matter which test runs first. We can leverage a
201/// semaphore with a single permit to address this challenge.
202///
203/// ```
204/// # use tokio::sync::Mutex;
205/// # use std::collections::BTreeMap;
206/// # struct Database {
207/// #   map: Mutex<BTreeMap<String, i32>>,
208/// # }
209/// # impl Database {
210/// #    pub const fn setup() -> Database {
211/// #        Database {
212/// #            map: Mutex::const_new(BTreeMap::new()),
213/// #        }
214/// #    }
215/// #    pub async fn insert(&self, key: &str, value: i32) {
216/// #        self.map.lock().await.insert(key.to_string(), value);
217/// #    }
218/// #    pub async fn update(&self, key: &str, value: i32) {
219/// #        self.map.lock().await
220/// #            .entry(key.to_string())
221/// #            .and_modify(|origin| *origin = value);
222/// #    }
223/// #    pub async fn delete(&self, key: &str) {
224/// #        self.map.lock().await.remove(key);
225/// #    }
226/// #    pub async fn get(&self, key: &str) -> i32 {
227/// #        *self.map.lock().await.get(key).unwrap()
228/// #    }
229/// # }
230/// use tokio::sync::Semaphore;
231///
232/// // Initialize a static semaphore with only one permit, which is used to
233/// // prevent test_insert and test_update from running in parallel.
234/// static PERMIT: Semaphore = Semaphore::const_new(1);
235///
236/// // Initialize the database that will be used by the subsequent tests.
237/// static DB: Database = Database::setup();
238///
239/// #[tokio::test]
240/// # async fn fake_test_insert() {}
241/// async fn test_insert() {
242///     // Acquire permit before proceeding. Since the semaphore has only one permit,
243///     // the test will wait if the permit is already acquired by other tests.
244///     let permit = PERMIT.acquire().await.unwrap();
245///
246///     // Do the actual test stuff with database
247///
248///     // Insert a key-value pair to database
249///     let (key, value) = ("name", 0);
250///     DB.insert(key, value).await;
251///
252///     // Verify that the value has been inserted correctly.
253///     assert_eq!(DB.get(key).await, value);
254///
255///     // Undo the insertion, so the database is empty at the end of the test.
256///     DB.delete(key).await;
257///
258///     // Drop permit. This allows the other test to start running.
259///     drop(permit);
260/// }
261///
262/// #[tokio::test]
263/// # async fn fake_test_update() {}
264/// async fn test_update() {
265///     // Acquire permit before proceeding. Since the semaphore has only one permit,
266///     // the test will wait if the permit is already acquired by other tests.
267///     let permit = PERMIT.acquire().await.unwrap();
268///
269///     // Do the same insert.
270///     let (key, value) = ("name", 0);
271///     DB.insert(key, value).await;
272///
273///     // Update the existing value with a new one.
274///     let new_value = 1;
275///     DB.update(key, new_value).await;
276///
277///     // Verify that the value has been updated correctly.
278///     assert_eq!(DB.get(key).await, new_value);
279///
280///     // Undo any modificattion.
281///     DB.delete(key).await;
282///
283///     // Drop permit. This allows the other test to start running.
284///     drop(permit);
285/// }
286///
287/// #[tokio::test]
288/// # async fn fake_test_others() {}
289/// async fn test_others() {
290///     // This test can run in parallel with test_insert and test_update,
291///     // so it does not use PERMIT.
292/// }
293/// # #[tokio::main(flavor = "current_thread")]
294/// # async fn main() {
295/// #   test_insert().await;
296/// #   test_update().await;
297/// #   test_others().await;
298/// # }
299/// ```
300///
301/// ## Rate limiting using a token bucket
302///
303/// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
304///
305/// Many applications and systems have constraints on the rate at which certain
306/// operations should occur. Exceeding this rate can result in suboptimal
307/// performance or even errors.
308///
309/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
310/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
311/// arrive at the same time.
312///
313/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
314/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
315/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
316/// wait for new tokens to be added.
317///
318/// Unlike the example that limits how many requests can be handled at the same time, we do not add
319/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
320///
321/// Note that this implementation is suboptimal when the duration is small, because it consumes a
322/// lot of cpu constantly looping and sleeping.
323///
324/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
325/// [`add_permits`]: crate::sync::Semaphore::add_permits
326/// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
327/// ```
328/// use std::sync::Arc;
329/// use tokio::sync::Semaphore;
330/// use tokio::time::{interval, Duration};
331///
332/// struct TokenBucket {
333///     sem: Arc<Semaphore>,
334///     jh: tokio::task::JoinHandle<()>,
335/// }
336///
337/// impl TokenBucket {
338///     fn new(duration: Duration, capacity: usize) -> Self {
339///         let sem = Arc::new(Semaphore::new(capacity));
340///
341///         // refills the tokens at the end of each interval
342///         let jh = tokio::spawn({
343///             let sem = sem.clone();
344///             let mut interval = interval(duration);
345///             interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
346///
347///             async move {
348///                 loop {
349///                     interval.tick().await;
350///
351///                     if sem.available_permits() < capacity {
352///                         sem.add_permits(1);
353///                     }
354///                 }
355///             }
356///         });
357///
358///         Self { jh, sem }
359///     }
360///
361///     async fn acquire(&self) {
362///         // This can return an error if the semaphore is closed, but we
363///         // never close it, so this error can never happen.
364///         let permit = self.sem.acquire().await.unwrap();
365///         // To avoid releasing the permit back to the semaphore, we use
366///         // the `SemaphorePermit::forget` method.
367///         permit.forget();
368///     }
369/// }
370///
371/// impl Drop for TokenBucket {
372///     fn drop(&mut self) {
373///         // Kill the background task so it stops taking up resources when we
374///         // don't need it anymore.
375///         self.jh.abort();
376///     }
377/// }
378///
379/// # #[tokio::main(flavor = "current_thread")]
380/// # async fn _hidden() {}
381/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
382/// # async fn main() {
383/// let capacity = 5;
384/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
385/// let bucket = TokenBucket::new(update_interval, capacity);
386///
387/// for _ in 0..5 {
388///     bucket.acquire().await;
389///
390///     // do the operation
391/// }
392/// # }
393/// ```
394///
395/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
396/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
397#[derive(Debug)]
398pub struct Semaphore {
399    /// The low level semaphore
400    ll_sem: ll::Semaphore,
401    #[cfg(all(tokio_unstable, feature = "tracing"))]
402    resource_span: tracing::Span,
403}
404
405/// A permit from the semaphore.
406///
407/// This type is created by the [`acquire`] method.
408///
409/// [`acquire`]: crate::sync::Semaphore::acquire()
410#[must_use]
411#[clippy::has_significant_drop]
412#[derive(Debug)]
413pub struct SemaphorePermit<'a> {
414    sem: &'a Semaphore,
415    permits: u32,
416}
417
418/// An owned permit from the semaphore.
419///
420/// This type is created by the [`acquire_owned`] method.
421///
422/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
423#[must_use]
424#[clippy::has_significant_drop]
425#[derive(Debug)]
426pub struct OwnedSemaphorePermit {
427    sem: Arc<Semaphore>,
428    permits: u32,
429}
430
431#[test]
432#[cfg(not(loom))]
433fn bounds() {
434    fn check_unpin<T: Unpin>() {}
435    // This has to take a value, since the async fn's return type is unnameable.
436    fn check_send_sync_val<T: Send + Sync>(_t: T) {}
437    fn check_send_sync<T: Send + Sync>() {}
438    check_unpin::<Semaphore>();
439    check_unpin::<SemaphorePermit<'_>>();
440    check_send_sync::<Semaphore>();
441
442    let semaphore = Semaphore::new(0);
443    check_send_sync_val(semaphore.acquire());
444}
445
446impl Semaphore {
447    /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
448    ///
449    /// Exceeding this limit typically results in a panic.
450    pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
451
452    /// Creates a new semaphore with the initial number of permits.
453    ///
454    /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
455    #[track_caller]
456    pub fn new(permits: usize) -> Self {
457        #[cfg(all(tokio_unstable, feature = "tracing"))]
458        let resource_span = {
459            let location = std::panic::Location::caller();
460
461            tracing::trace_span!(
462                parent: None,
463                "runtime.resource",
464                concrete_type = "Semaphore",
465                kind = "Sync",
466                loc.file = location.file(),
467                loc.line = location.line(),
468                loc.col = location.column(),
469                inherits_child_attrs = true,
470            )
471        };
472
473        #[cfg(all(tokio_unstable, feature = "tracing"))]
474        let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
475
476        #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
477        let ll_sem = ll::Semaphore::new(permits);
478
479        Self {
480            ll_sem,
481            #[cfg(all(tokio_unstable, feature = "tracing"))]
482            resource_span,
483        }
484    }
485
486    /// Creates a new semaphore with the initial number of permits.
487    ///
488    /// When using the `tracing` [unstable feature], a `Semaphore` created with
489    /// `const_new` will not be instrumented. As such, it will not be visible
490    /// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
491    /// create an instrumented object if that is needed.
492    ///
493    /// # Examples
494    ///
495    /// ```
496    /// use tokio::sync::Semaphore;
497    ///
498    /// static SEM: Semaphore = Semaphore::const_new(10);
499    /// ```
500    ///
501    /// [`tokio-console`]: https://github.com/tokio-rs/console
502    /// [unstable feature]: crate#unstable-features
503    #[cfg(not(all(loom, test)))]
504    pub const fn const_new(permits: usize) -> Self {
505        Self {
506            ll_sem: ll::Semaphore::const_new(permits),
507            #[cfg(all(tokio_unstable, feature = "tracing"))]
508            resource_span: tracing::Span::none(),
509        }
510    }
511
512    /// Creates a new closed semaphore with 0 permits.
513    pub(crate) fn new_closed() -> Self {
514        Self {
515            ll_sem: ll::Semaphore::new_closed(),
516            #[cfg(all(tokio_unstable, feature = "tracing"))]
517            resource_span: tracing::Span::none(),
518        }
519    }
520
521    /// Creates a new closed semaphore with 0 permits.
522    #[cfg(not(all(loom, test)))]
523    pub(crate) const fn const_new_closed() -> Self {
524        Self {
525            ll_sem: ll::Semaphore::const_new_closed(),
526            #[cfg(all(tokio_unstable, feature = "tracing"))]
527            resource_span: tracing::Span::none(),
528        }
529    }
530
531    /// Returns the current number of available permits.
532    pub fn available_permits(&self) -> usize {
533        self.ll_sem.available_permits()
534    }
535
536    /// Adds `n` new permits to the semaphore.
537    ///
538    /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
539    pub fn add_permits(&self, n: usize) {
540        self.ll_sem.release(n);
541    }
542
543    /// Decrease a semaphore's permits by a maximum of `n`.
544    ///
545    /// If there are insufficient permits and it's not possible to reduce by `n`,
546    /// return the number of permits that were actually reduced.
547    pub fn forget_permits(&self, n: usize) -> usize {
548        self.ll_sem.forget_permits(n)
549    }
550
551    /// Acquires a permit from the semaphore.
552    ///
553    /// If the semaphore has been closed, this returns an [`AcquireError`].
554    /// Otherwise, this returns a [`SemaphorePermit`] representing the
555    /// acquired permit.
556    ///
557    /// # Cancel safety
558    ///
559    /// This method uses a queue to fairly distribute permits in the order they
560    /// were requested. Cancelling a call to `acquire` makes you lose your place
561    /// in the queue.
562    ///
563    /// # Examples
564    ///
565    /// ```
566    /// use tokio::sync::Semaphore;
567    ///
568    /// # #[tokio::main(flavor = "current_thread")]
569    /// # async fn main() {
570    /// let semaphore = Semaphore::new(2);
571    ///
572    /// let permit_1 = semaphore.acquire().await.unwrap();
573    /// assert_eq!(semaphore.available_permits(), 1);
574    ///
575    /// let permit_2 = semaphore.acquire().await.unwrap();
576    /// assert_eq!(semaphore.available_permits(), 0);
577    ///
578    /// drop(permit_1);
579    /// assert_eq!(semaphore.available_permits(), 1);
580    /// # }
581    /// ```
582    ///
583    /// [`AcquireError`]: crate::sync::AcquireError
584    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
585    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
586        #[cfg(all(tokio_unstable, feature = "tracing"))]
587        let inner = trace::async_op(
588            || self.ll_sem.acquire(1),
589            self.resource_span.clone(),
590            "Semaphore::acquire",
591            "poll",
592            true,
593        );
594        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
595        let inner = self.ll_sem.acquire(1);
596
597        inner.await?;
598        Ok(SemaphorePermit {
599            sem: self,
600            permits: 1,
601        })
602    }
603
604    /// Acquires `n` permits from the semaphore.
605    ///
606    /// If the semaphore has been closed, this returns an [`AcquireError`].
607    /// Otherwise, this returns a [`SemaphorePermit`] representing the
608    /// acquired permits.
609    ///
610    /// # Cancel safety
611    ///
612    /// This method uses a queue to fairly distribute permits in the order they
613    /// were requested. Cancelling a call to `acquire_many` makes you lose your
614    /// place in the queue.
615    ///
616    /// # Examples
617    ///
618    /// ```
619    /// use tokio::sync::Semaphore;
620    ///
621    /// # #[tokio::main(flavor = "current_thread")]
622    /// # async fn main() {
623    /// let semaphore = Semaphore::new(5);
624    ///
625    /// let permit = semaphore.acquire_many(3).await.unwrap();
626    /// assert_eq!(semaphore.available_permits(), 2);
627    /// # }
628    /// ```
629    ///
630    /// [`AcquireError`]: crate::sync::AcquireError
631    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
632    pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
633        #[cfg(all(tokio_unstable, feature = "tracing"))]
634        trace::async_op(
635            || self.ll_sem.acquire(n as usize),
636            self.resource_span.clone(),
637            "Semaphore::acquire_many",
638            "poll",
639            true,
640        )
641        .await?;
642
643        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
644        self.ll_sem.acquire(n as usize).await?;
645
646        Ok(SemaphorePermit {
647            sem: self,
648            permits: n,
649        })
650    }
651
652    /// Tries to acquire a permit from the semaphore.
653    ///
654    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
655    /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
656    /// this returns a [`SemaphorePermit`] representing the acquired permits.
657    ///
658    /// # Examples
659    ///
660    /// ```
661    /// use tokio::sync::{Semaphore, TryAcquireError};
662    ///
663    /// # fn main() {
664    /// let semaphore = Semaphore::new(2);
665    ///
666    /// let permit_1 = semaphore.try_acquire().unwrap();
667    /// assert_eq!(semaphore.available_permits(), 1);
668    ///
669    /// let permit_2 = semaphore.try_acquire().unwrap();
670    /// assert_eq!(semaphore.available_permits(), 0);
671    ///
672    /// let permit_3 = semaphore.try_acquire();
673    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
674    /// # }
675    /// ```
676    ///
677    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
678    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
679    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
680    pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
681        match self.ll_sem.try_acquire(1) {
682            Ok(()) => Ok(SemaphorePermit {
683                sem: self,
684                permits: 1,
685            }),
686            Err(e) => Err(e),
687        }
688    }
689
690    /// Tries to acquire `n` permits from the semaphore.
691    ///
692    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
693    /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
694    /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
695    ///
696    /// # Examples
697    ///
698    /// ```
699    /// use tokio::sync::{Semaphore, TryAcquireError};
700    ///
701    /// # fn main() {
702    /// let semaphore = Semaphore::new(4);
703    ///
704    /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
705    /// assert_eq!(semaphore.available_permits(), 1);
706    ///
707    /// let permit_2 = semaphore.try_acquire_many(2);
708    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
709    /// # }
710    /// ```
711    ///
712    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
713    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
714    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
715    pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
716        match self.ll_sem.try_acquire(n as usize) {
717            Ok(()) => Ok(SemaphorePermit {
718                sem: self,
719                permits: n,
720            }),
721            Err(e) => Err(e),
722        }
723    }
724
725    /// Acquires a permit from the semaphore.
726    ///
727    /// The semaphore must be wrapped in an [`Arc`] to call this method.
728    /// If the semaphore has been closed, this returns an [`AcquireError`].
729    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
730    /// acquired permit.
731    ///
732    /// # Cancel safety
733    ///
734    /// This method uses a queue to fairly distribute permits in the order they
735    /// were requested. Cancelling a call to `acquire_owned` makes you lose your
736    /// place in the queue.
737    ///
738    /// # Examples
739    ///
740    /// ```
741    /// use std::sync::Arc;
742    /// use tokio::sync::Semaphore;
743    ///
744    /// # #[tokio::main(flavor = "current_thread")]
745    /// # async fn main() {
746    /// let semaphore = Arc::new(Semaphore::new(3));
747    /// let mut join_handles = Vec::new();
748    ///
749    /// for _ in 0..5 {
750    ///     let permit = semaphore.clone().acquire_owned().await.unwrap();
751    ///     join_handles.push(tokio::spawn(async move {
752    ///         // perform task...
753    ///         // explicitly own `permit` in the task
754    ///         drop(permit);
755    ///     }));
756    /// }
757    ///
758    /// for handle in join_handles {
759    ///     handle.await.unwrap();
760    /// }
761    /// # }
762    /// ```
763    ///
764    /// [`Arc`]: std::sync::Arc
765    /// [`AcquireError`]: crate::sync::AcquireError
766    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
767    pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
768        #[cfg(all(tokio_unstable, feature = "tracing"))]
769        let inner = trace::async_op(
770            || self.ll_sem.acquire(1),
771            self.resource_span.clone(),
772            "Semaphore::acquire_owned",
773            "poll",
774            true,
775        );
776        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
777        let inner = self.ll_sem.acquire(1);
778
779        inner.await?;
780        Ok(OwnedSemaphorePermit {
781            sem: self,
782            permits: 1,
783        })
784    }
785
786    /// Acquires `n` permits from the semaphore.
787    ///
788    /// The semaphore must be wrapped in an [`Arc`] to call this method.
789    /// If the semaphore has been closed, this returns an [`AcquireError`].
790    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
791    /// acquired permit.
792    ///
793    /// # Cancel safety
794    ///
795    /// This method uses a queue to fairly distribute permits in the order they
796    /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
797    /// your place in the queue.
798    ///
799    /// # Examples
800    ///
801    /// ```
802    /// use std::sync::Arc;
803    /// use tokio::sync::Semaphore;
804    ///
805    /// # #[tokio::main(flavor = "current_thread")]
806    /// # async fn main() {
807    /// let semaphore = Arc::new(Semaphore::new(10));
808    /// let mut join_handles = Vec::new();
809    ///
810    /// for _ in 0..5 {
811    ///     let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
812    ///     join_handles.push(tokio::spawn(async move {
813    ///         // perform task...
814    ///         // explicitly own `permit` in the task
815    ///         drop(permit);
816    ///     }));
817    /// }
818    ///
819    /// for handle in join_handles {
820    ///     handle.await.unwrap();
821    /// }
822    /// # }
823    /// ```
824    ///
825    /// [`Arc`]: std::sync::Arc
826    /// [`AcquireError`]: crate::sync::AcquireError
827    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
828    pub async fn acquire_many_owned(
829        self: Arc<Self>,
830        n: u32,
831    ) -> Result<OwnedSemaphorePermit, AcquireError> {
832        #[cfg(all(tokio_unstable, feature = "tracing"))]
833        let inner = trace::async_op(
834            || self.ll_sem.acquire(n as usize),
835            self.resource_span.clone(),
836            "Semaphore::acquire_many_owned",
837            "poll",
838            true,
839        );
840        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
841        let inner = self.ll_sem.acquire(n as usize);
842
843        inner.await?;
844        Ok(OwnedSemaphorePermit {
845            sem: self,
846            permits: n,
847        })
848    }
849
850    /// Tries to acquire a permit from the semaphore.
851    ///
852    /// The semaphore must be wrapped in an [`Arc`] to call this method. If
853    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
854    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
855    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
856    /// acquired permit.
857    ///
858    /// # Examples
859    ///
860    /// ```
861    /// use std::sync::Arc;
862    /// use tokio::sync::{Semaphore, TryAcquireError};
863    ///
864    /// # fn main() {
865    /// let semaphore = Arc::new(Semaphore::new(2));
866    ///
867    /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
868    /// assert_eq!(semaphore.available_permits(), 1);
869    ///
870    /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
871    /// assert_eq!(semaphore.available_permits(), 0);
872    ///
873    /// let permit_3 = semaphore.try_acquire_owned();
874    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
875    /// # }
876    /// ```
877    ///
878    /// [`Arc`]: std::sync::Arc
879    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
880    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
881    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
882    pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
883        match self.ll_sem.try_acquire(1) {
884            Ok(()) => Ok(OwnedSemaphorePermit {
885                sem: self,
886                permits: 1,
887            }),
888            Err(e) => Err(e),
889        }
890    }
891
892    /// Tries to acquire `n` permits from the semaphore.
893    ///
894    /// The semaphore must be wrapped in an [`Arc`] to call this method. If
895    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
896    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
897    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
898    /// acquired permit.
899    ///
900    /// # Examples
901    ///
902    /// ```
903    /// use std::sync::Arc;
904    /// use tokio::sync::{Semaphore, TryAcquireError};
905    ///
906    /// # fn main() {
907    /// let semaphore = Arc::new(Semaphore::new(4));
908    ///
909    /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
910    /// assert_eq!(semaphore.available_permits(), 1);
911    ///
912    /// let permit_2 = semaphore.try_acquire_many_owned(2);
913    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
914    /// # }
915    /// ```
916    ///
917    /// [`Arc`]: std::sync::Arc
918    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
919    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
920    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
921    pub fn try_acquire_many_owned(
922        self: Arc<Self>,
923        n: u32,
924    ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
925        match self.ll_sem.try_acquire(n as usize) {
926            Ok(()) => Ok(OwnedSemaphorePermit {
927                sem: self,
928                permits: n,
929            }),
930            Err(e) => Err(e),
931        }
932    }
933
934    /// Closes the semaphore.
935    ///
936    /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
937    ///
938    /// # Examples
939    ///
940    /// ```
941    /// use tokio::sync::Semaphore;
942    /// use std::sync::Arc;
943    /// use tokio::sync::TryAcquireError;
944    ///
945    /// # #[tokio::main(flavor = "current_thread")]
946    /// # async fn main() {
947    /// let semaphore = Arc::new(Semaphore::new(1));
948    /// let semaphore2 = semaphore.clone();
949    ///
950    /// tokio::spawn(async move {
951    ///     let permit = semaphore.acquire_many(2).await;
952    ///     assert!(permit.is_err());
953    ///     println!("waiter received error");
954    /// });
955    ///
956    /// println!("closing semaphore");
957    /// semaphore2.close();
958    ///
959    /// // Cannot obtain more permits
960    /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
961    /// # }
962    /// ```
963    pub fn close(&self) {
964        self.ll_sem.close();
965    }
966
967    /// Returns true if the semaphore is closed
968    pub fn is_closed(&self) -> bool {
969        self.ll_sem.is_closed()
970    }
971}
972
973impl<'a> SemaphorePermit<'a> {
974    /// Forgets the permit **without** releasing it back to the semaphore.
975    /// This can be used to reduce the amount of permits available from a
976    /// semaphore.
977    ///
978    /// # Examples
979    ///
980    /// ```
981    /// use std::sync::Arc;
982    /// use tokio::sync::Semaphore;
983    ///
984    /// let sem = Arc::new(Semaphore::new(10));
985    /// {
986    ///     let permit = sem.try_acquire_many(5).unwrap();
987    ///     assert_eq!(sem.available_permits(), 5);
988    ///     permit.forget();
989    /// }
990    ///
991    /// // Since we forgot the permit, available permits won't go back to its initial value
992    /// // even after the permit is dropped.
993    /// assert_eq!(sem.available_permits(), 5);
994    /// ```
995    pub fn forget(mut self) {
996        self.permits = 0;
997    }
998
999    /// Merge two [`SemaphorePermit`] instances together, consuming `other`
1000    /// without releasing the permits it holds.
1001    ///
1002    /// Permits held by both `self` and `other` are released when `self` drops.
1003    ///
1004    /// # Panics
1005    ///
1006    /// This function panics if permits from different [`Semaphore`] instances
1007    /// are merged.
1008    ///
1009    /// # Examples
1010    ///
1011    /// ```
1012    /// use std::sync::Arc;
1013    /// use tokio::sync::Semaphore;
1014    ///
1015    /// let sem = Arc::new(Semaphore::new(10));
1016    /// let mut permit = sem.try_acquire().unwrap();
1017    ///
1018    /// for _ in 0..9 {
1019    ///     let _permit = sem.try_acquire().unwrap();
1020    ///     // Merge individual permits into a single one.
1021    ///     permit.merge(_permit)
1022    /// }
1023    ///
1024    /// assert_eq!(sem.available_permits(), 0);
1025    ///
1026    /// // Release all permits in a single batch.
1027    /// drop(permit);
1028    ///
1029    /// assert_eq!(sem.available_permits(), 10);
1030    /// ```
1031    #[track_caller]
1032    pub fn merge(&mut self, mut other: Self) {
1033        assert!(
1034            std::ptr::eq(self.sem, other.sem),
1035            "merging permits from different semaphore instances"
1036        );
1037        self.permits += other.permits;
1038        other.permits = 0;
1039    }
1040
1041    /// Splits `n` permits from `self` and returns a new [`SemaphorePermit`] instance that holds `n` permits.
1042    ///
1043    /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1044    ///
1045    /// # Examples
1046    ///
1047    /// ```
1048    /// use std::sync::Arc;
1049    /// use tokio::sync::Semaphore;
1050    ///
1051    /// let sem = Arc::new(Semaphore::new(3));
1052    ///
1053    /// let mut p1 = sem.try_acquire_many(3).unwrap();
1054    /// let p2 = p1.split(1).unwrap();
1055    ///
1056    /// assert_eq!(p1.num_permits(), 2);
1057    /// assert_eq!(p2.num_permits(), 1);
1058    /// ```
1059    pub fn split(&mut self, n: usize) -> Option<Self> {
1060        let n = u32::try_from(n).ok()?;
1061
1062        if n > self.permits {
1063            return None;
1064        }
1065
1066        self.permits -= n;
1067
1068        Some(Self {
1069            sem: self.sem,
1070            permits: n,
1071        })
1072    }
1073
1074    /// Returns the number of permits held by `self`.
1075    pub fn num_permits(&self) -> usize {
1076        self.permits as usize
1077    }
1078}
1079
1080impl OwnedSemaphorePermit {
1081    /// Forgets the permit **without** releasing it back to the semaphore.
1082    /// This can be used to reduce the amount of permits available from a
1083    /// semaphore.
1084    ///
1085    /// # Examples
1086    ///
1087    /// ```
1088    /// use std::sync::Arc;
1089    /// use tokio::sync::Semaphore;
1090    ///
1091    /// let sem = Arc::new(Semaphore::new(10));
1092    /// {
1093    ///     let permit = sem.clone().try_acquire_many_owned(5).unwrap();
1094    ///     assert_eq!(sem.available_permits(), 5);
1095    ///     permit.forget();
1096    /// }
1097    ///
1098    /// // Since we forgot the permit, available permits won't go back to its initial value
1099    /// // even after the permit is dropped.
1100    /// assert_eq!(sem.available_permits(), 5);
1101    /// ```
1102    pub fn forget(mut self) {
1103        self.permits = 0;
1104    }
1105
1106    /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
1107    /// without releasing the permits it holds.
1108    ///
1109    /// Permits held by both `self` and `other` are released when `self` drops.
1110    ///
1111    /// # Panics
1112    ///
1113    /// This function panics if permits from different [`Semaphore`] instances
1114    /// are merged.
1115    ///
1116    /// # Examples
1117    ///
1118    /// ```
1119    /// use std::sync::Arc;
1120    /// use tokio::sync::Semaphore;
1121    ///
1122    /// let sem = Arc::new(Semaphore::new(10));
1123    /// let mut permit = sem.clone().try_acquire_owned().unwrap();
1124    ///
1125    /// for _ in 0..9 {
1126    ///     let _permit = sem.clone().try_acquire_owned().unwrap();
1127    ///     // Merge individual permits into a single one.
1128    ///     permit.merge(_permit)
1129    /// }
1130    ///
1131    /// assert_eq!(sem.available_permits(), 0);
1132    ///
1133    /// // Release all permits in a single batch.
1134    /// drop(permit);
1135    ///
1136    /// assert_eq!(sem.available_permits(), 10);
1137    /// ```
1138    #[track_caller]
1139    pub fn merge(&mut self, mut other: Self) {
1140        assert!(
1141            Arc::ptr_eq(&self.sem, &other.sem),
1142            "merging permits from different semaphore instances"
1143        );
1144        self.permits += other.permits;
1145        other.permits = 0;
1146    }
1147
1148    /// Splits `n` permits from `self` and returns a new [`OwnedSemaphorePermit`] instance that holds `n` permits.
1149    ///
1150    /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1151    ///
1152    /// # Note
1153    ///
1154    /// It will clone the owned `Arc<Semaphore>` to construct the new instance.
1155    ///
1156    /// # Examples
1157    ///
1158    /// ```
1159    /// use std::sync::Arc;
1160    /// use tokio::sync::Semaphore;
1161    ///
1162    /// let sem = Arc::new(Semaphore::new(3));
1163    ///
1164    /// let mut p1 = sem.try_acquire_many_owned(3).unwrap();
1165    /// let p2 = p1.split(1).unwrap();
1166    ///
1167    /// assert_eq!(p1.num_permits(), 2);
1168    /// assert_eq!(p2.num_permits(), 1);
1169    /// ```
1170    pub fn split(&mut self, n: usize) -> Option<Self> {
1171        let n = u32::try_from(n).ok()?;
1172
1173        if n > self.permits {
1174            return None;
1175        }
1176
1177        self.permits -= n;
1178
1179        Some(Self {
1180            sem: self.sem.clone(),
1181            permits: n,
1182        })
1183    }
1184
1185    /// Returns the [`Semaphore`] from which this permit was acquired.
1186    pub fn semaphore(&self) -> &Arc<Semaphore> {
1187        &self.sem
1188    }
1189
1190    /// Returns the number of permits held by `self`.
1191    pub fn num_permits(&self) -> usize {
1192        self.permits as usize
1193    }
1194}
1195
1196impl Drop for SemaphorePermit<'_> {
1197    fn drop(&mut self) {
1198        self.sem.add_permits(self.permits as usize);
1199    }
1200}
1201
1202impl Drop for OwnedSemaphorePermit {
1203    fn drop(&mut self) {
1204        self.sem.add_permits(self.permits as usize);
1205    }
1206}