tokio/sync/semaphore.rs
1use super::batch_semaphore as ll; // low level implementation
2use super::{AcquireError, TryAcquireError};
3#[cfg(all(tokio_unstable, feature = "tracing"))]
4use crate::util::trace;
5use std::sync::Arc;
6
7/// Counting semaphore performing asynchronous permit acquisition.
8///
9/// A semaphore maintains a set of permits. Permits are used to synchronize
10/// access to a shared resource. A semaphore differs from a mutex in that it
11/// can allow more than one concurrent caller to access the shared resource at a
12/// time.
13///
14/// When `acquire` is called and the semaphore has remaining permits, the
15/// function immediately returns a permit. However, if no remaining permits are
16/// available, `acquire` (asynchronously) waits until an outstanding permit is
17/// dropped. At this point, the freed permit is assigned to the caller.
18///
19/// This `Semaphore` is fair, which means that permits are given out in the order
20/// they were requested. This fairness is also applied when `acquire_many` gets
21/// involved, so if a call to `acquire_many` at the front of the queue requests
22/// more permits than currently available, this can prevent a call to `acquire`
23/// from completing, even if the semaphore has enough permits complete the call
24/// to `acquire`.
25///
26/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
27/// utility.
28///
29/// # Examples
30///
31/// Basic usage:
32///
33/// ```
34/// use tokio::sync::{Semaphore, TryAcquireError};
35///
36/// # #[tokio::main(flavor = "current_thread")]
37/// # async fn main() {
38/// let semaphore = Semaphore::new(3);
39///
40/// let a_permit = semaphore.acquire().await.unwrap();
41/// let two_permits = semaphore.acquire_many(2).await.unwrap();
42///
43/// assert_eq!(semaphore.available_permits(), 0);
44///
45/// let permit_attempt = semaphore.try_acquire();
46/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
47/// # }
48/// ```
49///
50/// ## Limit the number of simultaneously opened files in your program
51///
52/// Most operating systems have limits on the number of open file
53/// handles. Even in systems without explicit limits, resource constraints
54/// implicitly set an upper bound on the number of open files. If your
55/// program attempts to open a large number of files and exceeds this
56/// limit, it will result in an error.
57///
58/// This example uses a Semaphore with 100 permits. By acquiring a permit from
59/// the Semaphore before accessing a file, you ensure that your program opens
60/// no more than 100 files at a time. When trying to open the 101st
61/// file, the program will wait until a permit becomes available before
62/// proceeding to open another file.
63/// ```
64/// # #[cfg(not(target_family = "wasm"))]
65/// # {
66/// use std::io::Result;
67/// use tokio::fs::File;
68/// use tokio::sync::Semaphore;
69/// use tokio::io::AsyncWriteExt;
70///
71/// static PERMITS: Semaphore = Semaphore::const_new(100);
72///
73/// async fn write_to_file(message: &[u8]) -> Result<()> {
74/// let _permit = PERMITS.acquire().await.unwrap();
75/// let mut buffer = File::create("example.txt").await?;
76/// buffer.write_all(message).await?;
77/// Ok(()) // Permit goes out of scope here, and is available again for acquisition
78/// }
79/// # }
80/// ```
81///
82/// ## Limit the number of outgoing requests being sent at the same time
83///
84/// In some scenarios, it might be required to limit the number of outgoing
85/// requests being sent in parallel. This could be due to limits of a consumed
86/// API or the network resources of the system the application is running on.
87///
88/// This example uses an `Arc<Semaphore>` with 10 permits. Each task spawned is
89/// given a reference to the semaphore by cloning the `Arc<Semaphore>`. Before
90/// a task sends a request, it must acquire a permit from the semaphore by
91/// calling [`Semaphore::acquire`]. This ensures that at most 10 requests are
92/// sent in parallel at any given time. After a task has sent a request, it
93/// drops the permit to allow other tasks to send requests.
94///
95/// ```
96/// use std::sync::Arc;
97/// use tokio::sync::Semaphore;
98///
99/// # #[tokio::main(flavor = "current_thread")]
100/// # async fn main() {
101/// // Define maximum number of parallel requests.
102/// let semaphore = Arc::new(Semaphore::new(5));
103/// // Spawn many tasks that will send requests.
104/// let mut jhs = Vec::new();
105/// for task_id in 0..50 {
106/// let semaphore = semaphore.clone();
107/// let jh = tokio::spawn(async move {
108/// // Acquire permit before sending request.
109/// let _permit = semaphore.acquire().await.unwrap();
110/// // Send the request.
111/// let response = send_request(task_id).await;
112/// // Drop the permit after the request has been sent.
113/// drop(_permit);
114/// // Handle response.
115/// // ...
116///
117/// response
118/// });
119/// jhs.push(jh);
120/// }
121/// // Collect responses from tasks.
122/// let mut responses = Vec::new();
123/// for jh in jhs {
124/// let response = jh.await.unwrap();
125/// responses.push(response);
126/// }
127/// // Process responses.
128/// // ...
129/// # }
130/// # async fn send_request(task_id: usize) {
131/// # // Send request.
132/// # }
133/// ```
134///
135/// ## Limit the number of incoming requests being handled at the same time
136///
137/// Similar to limiting the number of simultaneously opened files, network handles
138/// are a limited resource. Allowing an unbounded amount of requests to be processed
139/// could result in a denial-of-service, among many other issues.
140///
141/// This example uses an `Arc<Semaphore>` instead of a global variable.
142/// To limit the number of requests that can be processed at the time,
143/// we acquire a permit for each task before spawning it. Once acquired,
144/// a new task is spawned; and once finished, the permit is dropped inside
145/// of the task to allow others to spawn. Permits must be acquired via
146/// [`Semaphore::acquire_owned`] to be movable across the task boundary.
147/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
148///
149/// ```no_run
150/// # #[cfg(not(target_family = "wasm"))]
151/// # {
152/// use std::sync::Arc;
153/// use tokio::sync::Semaphore;
154/// use tokio::net::TcpListener;
155///
156/// #[tokio::main]
157/// async fn main() -> std::io::Result<()> {
158/// let semaphore = Arc::new(Semaphore::new(3));
159/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
160///
161/// loop {
162/// // Acquire permit before accepting the next socket.
163/// //
164/// // We use `acquire_owned` so that we can move `permit` into
165/// // other tasks.
166/// let permit = semaphore.clone().acquire_owned().await.unwrap();
167/// let (mut socket, _) = listener.accept().await?;
168///
169/// tokio::spawn(async move {
170/// // Do work using the socket.
171/// handle_connection(&mut socket).await;
172/// // Drop socket while the permit is still live.
173/// drop(socket);
174/// // Drop the permit, so more tasks can be created.
175/// drop(permit);
176/// });
177/// }
178/// }
179/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
180/// # // Do work
181/// # }
182/// # }
183/// ```
184///
185/// ## Prevent tests from running in parallel
186///
187/// By default, Rust runs tests in the same file in parallel. However, in some
188/// cases, running two tests in parallel may lead to problems. For example, this
189/// can happen when tests use the same database.
190///
191/// Consider the following scenario:
192/// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
193/// the value using the same key to verify the insertion.
194/// 2. `test_update`: Inserts a key, then updates the key to a new value and
195/// verifies that the value has been accurately updated.
196/// 3. `test_others`: A third test that doesn't modify the database state. It
197/// can run in parallel with the other tests.
198///
199/// In this example, `test_insert` and `test_update` need to run in sequence to
200/// work, but it doesn't matter which test runs first. We can leverage a
201/// semaphore with a single permit to address this challenge.
202///
203/// ```
204/// # use tokio::sync::Mutex;
205/// # use std::collections::BTreeMap;
206/// # struct Database {
207/// # map: Mutex<BTreeMap<String, i32>>,
208/// # }
209/// # impl Database {
210/// # pub const fn setup() -> Database {
211/// # Database {
212/// # map: Mutex::const_new(BTreeMap::new()),
213/// # }
214/// # }
215/// # pub async fn insert(&self, key: &str, value: i32) {
216/// # self.map.lock().await.insert(key.to_string(), value);
217/// # }
218/// # pub async fn update(&self, key: &str, value: i32) {
219/// # self.map.lock().await
220/// # .entry(key.to_string())
221/// # .and_modify(|origin| *origin = value);
222/// # }
223/// # pub async fn delete(&self, key: &str) {
224/// # self.map.lock().await.remove(key);
225/// # }
226/// # pub async fn get(&self, key: &str) -> i32 {
227/// # *self.map.lock().await.get(key).unwrap()
228/// # }
229/// # }
230/// use tokio::sync::Semaphore;
231///
232/// // Initialize a static semaphore with only one permit, which is used to
233/// // prevent test_insert and test_update from running in parallel.
234/// static PERMIT: Semaphore = Semaphore::const_new(1);
235///
236/// // Initialize the database that will be used by the subsequent tests.
237/// static DB: Database = Database::setup();
238///
239/// #[tokio::test]
240/// # async fn fake_test_insert() {}
241/// async fn test_insert() {
242/// // Acquire permit before proceeding. Since the semaphore has only one permit,
243/// // the test will wait if the permit is already acquired by other tests.
244/// let permit = PERMIT.acquire().await.unwrap();
245///
246/// // Do the actual test stuff with database
247///
248/// // Insert a key-value pair to database
249/// let (key, value) = ("name", 0);
250/// DB.insert(key, value).await;
251///
252/// // Verify that the value has been inserted correctly.
253/// assert_eq!(DB.get(key).await, value);
254///
255/// // Undo the insertion, so the database is empty at the end of the test.
256/// DB.delete(key).await;
257///
258/// // Drop permit. This allows the other test to start running.
259/// drop(permit);
260/// }
261///
262/// #[tokio::test]
263/// # async fn fake_test_update() {}
264/// async fn test_update() {
265/// // Acquire permit before proceeding. Since the semaphore has only one permit,
266/// // the test will wait if the permit is already acquired by other tests.
267/// let permit = PERMIT.acquire().await.unwrap();
268///
269/// // Do the same insert.
270/// let (key, value) = ("name", 0);
271/// DB.insert(key, value).await;
272///
273/// // Update the existing value with a new one.
274/// let new_value = 1;
275/// DB.update(key, new_value).await;
276///
277/// // Verify that the value has been updated correctly.
278/// assert_eq!(DB.get(key).await, new_value);
279///
280/// // Undo any modificattion.
281/// DB.delete(key).await;
282///
283/// // Drop permit. This allows the other test to start running.
284/// drop(permit);
285/// }
286///
287/// #[tokio::test]
288/// # async fn fake_test_others() {}
289/// async fn test_others() {
290/// // This test can run in parallel with test_insert and test_update,
291/// // so it does not use PERMIT.
292/// }
293/// # #[tokio::main(flavor = "current_thread")]
294/// # async fn main() {
295/// # test_insert().await;
296/// # test_update().await;
297/// # test_others().await;
298/// # }
299/// ```
300///
301/// ## Rate limiting using a token bucket
302///
303/// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
304///
305/// Many applications and systems have constraints on the rate at which certain
306/// operations should occur. Exceeding this rate can result in suboptimal
307/// performance or even errors.
308///
309/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
310/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
311/// arrive at the same time.
312///
313/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
314/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
315/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
316/// wait for new tokens to be added.
317///
318/// Unlike the example that limits how many requests can be handled at the same time, we do not add
319/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
320///
321/// Note that this implementation is suboptimal when the duration is small, because it consumes a
322/// lot of cpu constantly looping and sleeping.
323///
324/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
325/// [`add_permits`]: crate::sync::Semaphore::add_permits
326/// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
327/// ```
328/// use std::sync::Arc;
329/// use tokio::sync::Semaphore;
330/// use tokio::time::{interval, Duration};
331///
332/// struct TokenBucket {
333/// sem: Arc<Semaphore>,
334/// jh: tokio::task::JoinHandle<()>,
335/// }
336///
337/// impl TokenBucket {
338/// fn new(duration: Duration, capacity: usize) -> Self {
339/// let sem = Arc::new(Semaphore::new(capacity));
340///
341/// // refills the tokens at the end of each interval
342/// let jh = tokio::spawn({
343/// let sem = sem.clone();
344/// let mut interval = interval(duration);
345/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
346///
347/// async move {
348/// loop {
349/// interval.tick().await;
350///
351/// if sem.available_permits() < capacity {
352/// sem.add_permits(1);
353/// }
354/// }
355/// }
356/// });
357///
358/// Self { jh, sem }
359/// }
360///
361/// async fn acquire(&self) {
362/// // This can return an error if the semaphore is closed, but we
363/// // never close it, so this error can never happen.
364/// let permit = self.sem.acquire().await.unwrap();
365/// // To avoid releasing the permit back to the semaphore, we use
366/// // the `SemaphorePermit::forget` method.
367/// permit.forget();
368/// }
369/// }
370///
371/// impl Drop for TokenBucket {
372/// fn drop(&mut self) {
373/// // Kill the background task so it stops taking up resources when we
374/// // don't need it anymore.
375/// self.jh.abort();
376/// }
377/// }
378///
379/// # #[tokio::main(flavor = "current_thread")]
380/// # async fn _hidden() {}
381/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
382/// # async fn main() {
383/// let capacity = 5;
384/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
385/// let bucket = TokenBucket::new(update_interval, capacity);
386///
387/// for _ in 0..5 {
388/// bucket.acquire().await;
389///
390/// // do the operation
391/// }
392/// # }
393/// ```
394///
395/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
396/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
397#[derive(Debug)]
398pub struct Semaphore {
399 /// The low level semaphore
400 ll_sem: ll::Semaphore,
401 #[cfg(all(tokio_unstable, feature = "tracing"))]
402 resource_span: tracing::Span,
403}
404
405/// A permit from the semaphore.
406///
407/// This type is created by the [`acquire`] method.
408///
409/// [`acquire`]: crate::sync::Semaphore::acquire()
410#[must_use]
411#[clippy::has_significant_drop]
412#[derive(Debug)]
413pub struct SemaphorePermit<'a> {
414 sem: &'a Semaphore,
415 permits: u32,
416}
417
418/// An owned permit from the semaphore.
419///
420/// This type is created by the [`acquire_owned`] method.
421///
422/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
423#[must_use]
424#[clippy::has_significant_drop]
425#[derive(Debug)]
426pub struct OwnedSemaphorePermit {
427 sem: Arc<Semaphore>,
428 permits: u32,
429}
430
431#[test]
432#[cfg(not(loom))]
433fn bounds() {
434 fn check_unpin<T: Unpin>() {}
435 // This has to take a value, since the async fn's return type is unnameable.
436 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
437 fn check_send_sync<T: Send + Sync>() {}
438 check_unpin::<Semaphore>();
439 check_unpin::<SemaphorePermit<'_>>();
440 check_send_sync::<Semaphore>();
441
442 let semaphore = Semaphore::new(0);
443 check_send_sync_val(semaphore.acquire());
444}
445
446impl Semaphore {
447 /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
448 ///
449 /// Exceeding this limit typically results in a panic.
450 pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
451
452 /// Creates a new semaphore with the initial number of permits.
453 ///
454 /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
455 #[track_caller]
456 pub fn new(permits: usize) -> Self {
457 #[cfg(all(tokio_unstable, feature = "tracing"))]
458 let resource_span = {
459 let location = std::panic::Location::caller();
460
461 tracing::trace_span!(
462 parent: None,
463 "runtime.resource",
464 concrete_type = "Semaphore",
465 kind = "Sync",
466 loc.file = location.file(),
467 loc.line = location.line(),
468 loc.col = location.column(),
469 inherits_child_attrs = true,
470 )
471 };
472
473 #[cfg(all(tokio_unstable, feature = "tracing"))]
474 let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
475
476 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
477 let ll_sem = ll::Semaphore::new(permits);
478
479 Self {
480 ll_sem,
481 #[cfg(all(tokio_unstable, feature = "tracing"))]
482 resource_span,
483 }
484 }
485
486 /// Creates a new semaphore with the initial number of permits.
487 ///
488 /// When using the `tracing` [unstable feature], a `Semaphore` created with
489 /// `const_new` will not be instrumented. As such, it will not be visible
490 /// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
491 /// create an instrumented object if that is needed.
492 ///
493 /// # Examples
494 ///
495 /// ```
496 /// use tokio::sync::Semaphore;
497 ///
498 /// static SEM: Semaphore = Semaphore::const_new(10);
499 /// ```
500 ///
501 /// [`tokio-console`]: https://github.com/tokio-rs/console
502 /// [unstable feature]: crate#unstable-features
503 #[cfg(not(all(loom, test)))]
504 pub const fn const_new(permits: usize) -> Self {
505 Self {
506 ll_sem: ll::Semaphore::const_new(permits),
507 #[cfg(all(tokio_unstable, feature = "tracing"))]
508 resource_span: tracing::Span::none(),
509 }
510 }
511
512 /// Creates a new closed semaphore with 0 permits.
513 pub(crate) fn new_closed() -> Self {
514 Self {
515 ll_sem: ll::Semaphore::new_closed(),
516 #[cfg(all(tokio_unstable, feature = "tracing"))]
517 resource_span: tracing::Span::none(),
518 }
519 }
520
521 /// Creates a new closed semaphore with 0 permits.
522 #[cfg(not(all(loom, test)))]
523 pub(crate) const fn const_new_closed() -> Self {
524 Self {
525 ll_sem: ll::Semaphore::const_new_closed(),
526 #[cfg(all(tokio_unstable, feature = "tracing"))]
527 resource_span: tracing::Span::none(),
528 }
529 }
530
531 /// Returns the current number of available permits.
532 pub fn available_permits(&self) -> usize {
533 self.ll_sem.available_permits()
534 }
535
536 /// Adds `n` new permits to the semaphore.
537 ///
538 /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
539 pub fn add_permits(&self, n: usize) {
540 self.ll_sem.release(n);
541 }
542
543 /// Decrease a semaphore's permits by a maximum of `n`.
544 ///
545 /// If there are insufficient permits and it's not possible to reduce by `n`,
546 /// return the number of permits that were actually reduced.
547 pub fn forget_permits(&self, n: usize) -> usize {
548 self.ll_sem.forget_permits(n)
549 }
550
551 /// Acquires a permit from the semaphore.
552 ///
553 /// If the semaphore has been closed, this returns an [`AcquireError`].
554 /// Otherwise, this returns a [`SemaphorePermit`] representing the
555 /// acquired permit.
556 ///
557 /// # Cancel safety
558 ///
559 /// This method uses a queue to fairly distribute permits in the order they
560 /// were requested. Cancelling a call to `acquire` makes you lose your place
561 /// in the queue.
562 ///
563 /// # Examples
564 ///
565 /// ```
566 /// use tokio::sync::Semaphore;
567 ///
568 /// # #[tokio::main(flavor = "current_thread")]
569 /// # async fn main() {
570 /// let semaphore = Semaphore::new(2);
571 ///
572 /// let permit_1 = semaphore.acquire().await.unwrap();
573 /// assert_eq!(semaphore.available_permits(), 1);
574 ///
575 /// let permit_2 = semaphore.acquire().await.unwrap();
576 /// assert_eq!(semaphore.available_permits(), 0);
577 ///
578 /// drop(permit_1);
579 /// assert_eq!(semaphore.available_permits(), 1);
580 /// # }
581 /// ```
582 ///
583 /// [`AcquireError`]: crate::sync::AcquireError
584 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
585 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
586 #[cfg(all(tokio_unstable, feature = "tracing"))]
587 let inner = trace::async_op(
588 || self.ll_sem.acquire(1),
589 self.resource_span.clone(),
590 "Semaphore::acquire",
591 "poll",
592 true,
593 );
594 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
595 let inner = self.ll_sem.acquire(1);
596
597 inner.await?;
598 Ok(SemaphorePermit {
599 sem: self,
600 permits: 1,
601 })
602 }
603
604 /// Acquires `n` permits from the semaphore.
605 ///
606 /// If the semaphore has been closed, this returns an [`AcquireError`].
607 /// Otherwise, this returns a [`SemaphorePermit`] representing the
608 /// acquired permits.
609 ///
610 /// # Cancel safety
611 ///
612 /// This method uses a queue to fairly distribute permits in the order they
613 /// were requested. Cancelling a call to `acquire_many` makes you lose your
614 /// place in the queue.
615 ///
616 /// # Examples
617 ///
618 /// ```
619 /// use tokio::sync::Semaphore;
620 ///
621 /// # #[tokio::main(flavor = "current_thread")]
622 /// # async fn main() {
623 /// let semaphore = Semaphore::new(5);
624 ///
625 /// let permit = semaphore.acquire_many(3).await.unwrap();
626 /// assert_eq!(semaphore.available_permits(), 2);
627 /// # }
628 /// ```
629 ///
630 /// [`AcquireError`]: crate::sync::AcquireError
631 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
632 pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
633 #[cfg(all(tokio_unstable, feature = "tracing"))]
634 trace::async_op(
635 || self.ll_sem.acquire(n as usize),
636 self.resource_span.clone(),
637 "Semaphore::acquire_many",
638 "poll",
639 true,
640 )
641 .await?;
642
643 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
644 self.ll_sem.acquire(n as usize).await?;
645
646 Ok(SemaphorePermit {
647 sem: self,
648 permits: n,
649 })
650 }
651
652 /// Tries to acquire a permit from the semaphore.
653 ///
654 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
655 /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
656 /// this returns a [`SemaphorePermit`] representing the acquired permits.
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// use tokio::sync::{Semaphore, TryAcquireError};
662 ///
663 /// # fn main() {
664 /// let semaphore = Semaphore::new(2);
665 ///
666 /// let permit_1 = semaphore.try_acquire().unwrap();
667 /// assert_eq!(semaphore.available_permits(), 1);
668 ///
669 /// let permit_2 = semaphore.try_acquire().unwrap();
670 /// assert_eq!(semaphore.available_permits(), 0);
671 ///
672 /// let permit_3 = semaphore.try_acquire();
673 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
674 /// # }
675 /// ```
676 ///
677 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
678 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
679 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
680 pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
681 match self.ll_sem.try_acquire(1) {
682 Ok(()) => Ok(SemaphorePermit {
683 sem: self,
684 permits: 1,
685 }),
686 Err(e) => Err(e),
687 }
688 }
689
690 /// Tries to acquire `n` permits from the semaphore.
691 ///
692 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
693 /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
694 /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
695 ///
696 /// # Examples
697 ///
698 /// ```
699 /// use tokio::sync::{Semaphore, TryAcquireError};
700 ///
701 /// # fn main() {
702 /// let semaphore = Semaphore::new(4);
703 ///
704 /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
705 /// assert_eq!(semaphore.available_permits(), 1);
706 ///
707 /// let permit_2 = semaphore.try_acquire_many(2);
708 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
709 /// # }
710 /// ```
711 ///
712 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
713 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
714 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
715 pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
716 match self.ll_sem.try_acquire(n as usize) {
717 Ok(()) => Ok(SemaphorePermit {
718 sem: self,
719 permits: n,
720 }),
721 Err(e) => Err(e),
722 }
723 }
724
725 /// Acquires a permit from the semaphore.
726 ///
727 /// The semaphore must be wrapped in an [`Arc`] to call this method.
728 /// If the semaphore has been closed, this returns an [`AcquireError`].
729 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
730 /// acquired permit.
731 ///
732 /// # Cancel safety
733 ///
734 /// This method uses a queue to fairly distribute permits in the order they
735 /// were requested. Cancelling a call to `acquire_owned` makes you lose your
736 /// place in the queue.
737 ///
738 /// # Examples
739 ///
740 /// ```
741 /// use std::sync::Arc;
742 /// use tokio::sync::Semaphore;
743 ///
744 /// # #[tokio::main(flavor = "current_thread")]
745 /// # async fn main() {
746 /// let semaphore = Arc::new(Semaphore::new(3));
747 /// let mut join_handles = Vec::new();
748 ///
749 /// for _ in 0..5 {
750 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
751 /// join_handles.push(tokio::spawn(async move {
752 /// // perform task...
753 /// // explicitly own `permit` in the task
754 /// drop(permit);
755 /// }));
756 /// }
757 ///
758 /// for handle in join_handles {
759 /// handle.await.unwrap();
760 /// }
761 /// # }
762 /// ```
763 ///
764 /// [`Arc`]: std::sync::Arc
765 /// [`AcquireError`]: crate::sync::AcquireError
766 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
767 pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
768 #[cfg(all(tokio_unstable, feature = "tracing"))]
769 let inner = trace::async_op(
770 || self.ll_sem.acquire(1),
771 self.resource_span.clone(),
772 "Semaphore::acquire_owned",
773 "poll",
774 true,
775 );
776 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
777 let inner = self.ll_sem.acquire(1);
778
779 inner.await?;
780 Ok(OwnedSemaphorePermit {
781 sem: self,
782 permits: 1,
783 })
784 }
785
786 /// Acquires `n` permits from the semaphore.
787 ///
788 /// The semaphore must be wrapped in an [`Arc`] to call this method.
789 /// If the semaphore has been closed, this returns an [`AcquireError`].
790 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
791 /// acquired permit.
792 ///
793 /// # Cancel safety
794 ///
795 /// This method uses a queue to fairly distribute permits in the order they
796 /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
797 /// your place in the queue.
798 ///
799 /// # Examples
800 ///
801 /// ```
802 /// use std::sync::Arc;
803 /// use tokio::sync::Semaphore;
804 ///
805 /// # #[tokio::main(flavor = "current_thread")]
806 /// # async fn main() {
807 /// let semaphore = Arc::new(Semaphore::new(10));
808 /// let mut join_handles = Vec::new();
809 ///
810 /// for _ in 0..5 {
811 /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
812 /// join_handles.push(tokio::spawn(async move {
813 /// // perform task...
814 /// // explicitly own `permit` in the task
815 /// drop(permit);
816 /// }));
817 /// }
818 ///
819 /// for handle in join_handles {
820 /// handle.await.unwrap();
821 /// }
822 /// # }
823 /// ```
824 ///
825 /// [`Arc`]: std::sync::Arc
826 /// [`AcquireError`]: crate::sync::AcquireError
827 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
828 pub async fn acquire_many_owned(
829 self: Arc<Self>,
830 n: u32,
831 ) -> Result<OwnedSemaphorePermit, AcquireError> {
832 #[cfg(all(tokio_unstable, feature = "tracing"))]
833 let inner = trace::async_op(
834 || self.ll_sem.acquire(n as usize),
835 self.resource_span.clone(),
836 "Semaphore::acquire_many_owned",
837 "poll",
838 true,
839 );
840 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
841 let inner = self.ll_sem.acquire(n as usize);
842
843 inner.await?;
844 Ok(OwnedSemaphorePermit {
845 sem: self,
846 permits: n,
847 })
848 }
849
850 /// Tries to acquire a permit from the semaphore.
851 ///
852 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
853 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
854 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
855 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
856 /// acquired permit.
857 ///
858 /// # Examples
859 ///
860 /// ```
861 /// use std::sync::Arc;
862 /// use tokio::sync::{Semaphore, TryAcquireError};
863 ///
864 /// # fn main() {
865 /// let semaphore = Arc::new(Semaphore::new(2));
866 ///
867 /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
868 /// assert_eq!(semaphore.available_permits(), 1);
869 ///
870 /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
871 /// assert_eq!(semaphore.available_permits(), 0);
872 ///
873 /// let permit_3 = semaphore.try_acquire_owned();
874 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
875 /// # }
876 /// ```
877 ///
878 /// [`Arc`]: std::sync::Arc
879 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
880 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
881 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
882 pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
883 match self.ll_sem.try_acquire(1) {
884 Ok(()) => Ok(OwnedSemaphorePermit {
885 sem: self,
886 permits: 1,
887 }),
888 Err(e) => Err(e),
889 }
890 }
891
892 /// Tries to acquire `n` permits from the semaphore.
893 ///
894 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
895 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
896 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
897 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
898 /// acquired permit.
899 ///
900 /// # Examples
901 ///
902 /// ```
903 /// use std::sync::Arc;
904 /// use tokio::sync::{Semaphore, TryAcquireError};
905 ///
906 /// # fn main() {
907 /// let semaphore = Arc::new(Semaphore::new(4));
908 ///
909 /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
910 /// assert_eq!(semaphore.available_permits(), 1);
911 ///
912 /// let permit_2 = semaphore.try_acquire_many_owned(2);
913 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
914 /// # }
915 /// ```
916 ///
917 /// [`Arc`]: std::sync::Arc
918 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
919 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
920 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
921 pub fn try_acquire_many_owned(
922 self: Arc<Self>,
923 n: u32,
924 ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
925 match self.ll_sem.try_acquire(n as usize) {
926 Ok(()) => Ok(OwnedSemaphorePermit {
927 sem: self,
928 permits: n,
929 }),
930 Err(e) => Err(e),
931 }
932 }
933
934 /// Closes the semaphore.
935 ///
936 /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
937 ///
938 /// # Examples
939 ///
940 /// ```
941 /// use tokio::sync::Semaphore;
942 /// use std::sync::Arc;
943 /// use tokio::sync::TryAcquireError;
944 ///
945 /// # #[tokio::main(flavor = "current_thread")]
946 /// # async fn main() {
947 /// let semaphore = Arc::new(Semaphore::new(1));
948 /// let semaphore2 = semaphore.clone();
949 ///
950 /// tokio::spawn(async move {
951 /// let permit = semaphore.acquire_many(2).await;
952 /// assert!(permit.is_err());
953 /// println!("waiter received error");
954 /// });
955 ///
956 /// println!("closing semaphore");
957 /// semaphore2.close();
958 ///
959 /// // Cannot obtain more permits
960 /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
961 /// # }
962 /// ```
963 pub fn close(&self) {
964 self.ll_sem.close();
965 }
966
967 /// Returns true if the semaphore is closed
968 pub fn is_closed(&self) -> bool {
969 self.ll_sem.is_closed()
970 }
971}
972
973impl<'a> SemaphorePermit<'a> {
974 /// Forgets the permit **without** releasing it back to the semaphore.
975 /// This can be used to reduce the amount of permits available from a
976 /// semaphore.
977 ///
978 /// # Examples
979 ///
980 /// ```
981 /// use std::sync::Arc;
982 /// use tokio::sync::Semaphore;
983 ///
984 /// let sem = Arc::new(Semaphore::new(10));
985 /// {
986 /// let permit = sem.try_acquire_many(5).unwrap();
987 /// assert_eq!(sem.available_permits(), 5);
988 /// permit.forget();
989 /// }
990 ///
991 /// // Since we forgot the permit, available permits won't go back to its initial value
992 /// // even after the permit is dropped.
993 /// assert_eq!(sem.available_permits(), 5);
994 /// ```
995 pub fn forget(mut self) {
996 self.permits = 0;
997 }
998
999 /// Merge two [`SemaphorePermit`] instances together, consuming `other`
1000 /// without releasing the permits it holds.
1001 ///
1002 /// Permits held by both `self` and `other` are released when `self` drops.
1003 ///
1004 /// # Panics
1005 ///
1006 /// This function panics if permits from different [`Semaphore`] instances
1007 /// are merged.
1008 ///
1009 /// # Examples
1010 ///
1011 /// ```
1012 /// use std::sync::Arc;
1013 /// use tokio::sync::Semaphore;
1014 ///
1015 /// let sem = Arc::new(Semaphore::new(10));
1016 /// let mut permit = sem.try_acquire().unwrap();
1017 ///
1018 /// for _ in 0..9 {
1019 /// let _permit = sem.try_acquire().unwrap();
1020 /// // Merge individual permits into a single one.
1021 /// permit.merge(_permit)
1022 /// }
1023 ///
1024 /// assert_eq!(sem.available_permits(), 0);
1025 ///
1026 /// // Release all permits in a single batch.
1027 /// drop(permit);
1028 ///
1029 /// assert_eq!(sem.available_permits(), 10);
1030 /// ```
1031 #[track_caller]
1032 pub fn merge(&mut self, mut other: Self) {
1033 assert!(
1034 std::ptr::eq(self.sem, other.sem),
1035 "merging permits from different semaphore instances"
1036 );
1037 self.permits += other.permits;
1038 other.permits = 0;
1039 }
1040
1041 /// Splits `n` permits from `self` and returns a new [`SemaphorePermit`] instance that holds `n` permits.
1042 ///
1043 /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1044 ///
1045 /// # Examples
1046 ///
1047 /// ```
1048 /// use std::sync::Arc;
1049 /// use tokio::sync::Semaphore;
1050 ///
1051 /// let sem = Arc::new(Semaphore::new(3));
1052 ///
1053 /// let mut p1 = sem.try_acquire_many(3).unwrap();
1054 /// let p2 = p1.split(1).unwrap();
1055 ///
1056 /// assert_eq!(p1.num_permits(), 2);
1057 /// assert_eq!(p2.num_permits(), 1);
1058 /// ```
1059 pub fn split(&mut self, n: usize) -> Option<Self> {
1060 let n = u32::try_from(n).ok()?;
1061
1062 if n > self.permits {
1063 return None;
1064 }
1065
1066 self.permits -= n;
1067
1068 Some(Self {
1069 sem: self.sem,
1070 permits: n,
1071 })
1072 }
1073
1074 /// Returns the number of permits held by `self`.
1075 pub fn num_permits(&self) -> usize {
1076 self.permits as usize
1077 }
1078}
1079
1080impl OwnedSemaphorePermit {
1081 /// Forgets the permit **without** releasing it back to the semaphore.
1082 /// This can be used to reduce the amount of permits available from a
1083 /// semaphore.
1084 ///
1085 /// # Examples
1086 ///
1087 /// ```
1088 /// use std::sync::Arc;
1089 /// use tokio::sync::Semaphore;
1090 ///
1091 /// let sem = Arc::new(Semaphore::new(10));
1092 /// {
1093 /// let permit = sem.clone().try_acquire_many_owned(5).unwrap();
1094 /// assert_eq!(sem.available_permits(), 5);
1095 /// permit.forget();
1096 /// }
1097 ///
1098 /// // Since we forgot the permit, available permits won't go back to its initial value
1099 /// // even after the permit is dropped.
1100 /// assert_eq!(sem.available_permits(), 5);
1101 /// ```
1102 pub fn forget(mut self) {
1103 self.permits = 0;
1104 }
1105
1106 /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
1107 /// without releasing the permits it holds.
1108 ///
1109 /// Permits held by both `self` and `other` are released when `self` drops.
1110 ///
1111 /// # Panics
1112 ///
1113 /// This function panics if permits from different [`Semaphore`] instances
1114 /// are merged.
1115 ///
1116 /// # Examples
1117 ///
1118 /// ```
1119 /// use std::sync::Arc;
1120 /// use tokio::sync::Semaphore;
1121 ///
1122 /// let sem = Arc::new(Semaphore::new(10));
1123 /// let mut permit = sem.clone().try_acquire_owned().unwrap();
1124 ///
1125 /// for _ in 0..9 {
1126 /// let _permit = sem.clone().try_acquire_owned().unwrap();
1127 /// // Merge individual permits into a single one.
1128 /// permit.merge(_permit)
1129 /// }
1130 ///
1131 /// assert_eq!(sem.available_permits(), 0);
1132 ///
1133 /// // Release all permits in a single batch.
1134 /// drop(permit);
1135 ///
1136 /// assert_eq!(sem.available_permits(), 10);
1137 /// ```
1138 #[track_caller]
1139 pub fn merge(&mut self, mut other: Self) {
1140 assert!(
1141 Arc::ptr_eq(&self.sem, &other.sem),
1142 "merging permits from different semaphore instances"
1143 );
1144 self.permits += other.permits;
1145 other.permits = 0;
1146 }
1147
1148 /// Splits `n` permits from `self` and returns a new [`OwnedSemaphorePermit`] instance that holds `n` permits.
1149 ///
1150 /// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
1151 ///
1152 /// # Note
1153 ///
1154 /// It will clone the owned `Arc<Semaphore>` to construct the new instance.
1155 ///
1156 /// # Examples
1157 ///
1158 /// ```
1159 /// use std::sync::Arc;
1160 /// use tokio::sync::Semaphore;
1161 ///
1162 /// let sem = Arc::new(Semaphore::new(3));
1163 ///
1164 /// let mut p1 = sem.try_acquire_many_owned(3).unwrap();
1165 /// let p2 = p1.split(1).unwrap();
1166 ///
1167 /// assert_eq!(p1.num_permits(), 2);
1168 /// assert_eq!(p2.num_permits(), 1);
1169 /// ```
1170 pub fn split(&mut self, n: usize) -> Option<Self> {
1171 let n = u32::try_from(n).ok()?;
1172
1173 if n > self.permits {
1174 return None;
1175 }
1176
1177 self.permits -= n;
1178
1179 Some(Self {
1180 sem: self.sem.clone(),
1181 permits: n,
1182 })
1183 }
1184
1185 /// Returns the [`Semaphore`] from which this permit was acquired.
1186 pub fn semaphore(&self) -> &Arc<Semaphore> {
1187 &self.sem
1188 }
1189
1190 /// Returns the number of permits held by `self`.
1191 pub fn num_permits(&self) -> usize {
1192 self.permits as usize
1193 }
1194}
1195
1196impl Drop for SemaphorePermit<'_> {
1197 fn drop(&mut self) {
1198 self.sem.add_permits(self.permits as usize);
1199 }
1200}
1201
1202impl Drop for OwnedSemaphorePermit {
1203 fn drop(&mut self) {
1204 self.sem.add_permits(self.permits as usize);
1205 }
1206}