ferron/util/
multi_cancel.rs

1use std::{
2  future::Future,
3  pin::Pin,
4  task::{Context, Poll},
5};
6
7use tokio::sync::{Semaphore, SemaphorePermit};
8use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
9
10/// A struct that can be canceled multiple times until specified times.
11pub struct MultiCancel {
12  semaphore: Semaphore,
13  cancel_token: CancellationToken,
14}
15
16impl MultiCancel {
17  /// Creates a new `MultiCancel` instance with the specified maximum number of cancels.
18  pub fn new(max_cancels: usize) -> Self {
19    Self {
20      semaphore: Semaphore::new(max_cancels),
21      cancel_token: CancellationToken::new(),
22    }
23  }
24
25  /// Returns a future that resolves when the cancel token is canceled.
26  pub fn cancel<'a>(&'a self) -> MultiCancelFuture<'a> {
27    let permit = self.semaphore.try_acquire().ok();
28    if permit.is_none() {
29      self.cancel_token.cancel();
30    }
31    MultiCancelFuture {
32      permit,
33      cancel_token: Box::pin(self.cancel_token.cancelled()),
34    }
35  }
36}
37
38/// A future created from `MultiCancel` that resolves when the cancel token is canceled.
39pub struct MultiCancelFuture<'a> {
40  permit: Option<SemaphorePermit<'a>>,
41  cancel_token: Pin<Box<WaitForCancellationFuture<'a>>>,
42}
43
44impl<'a> Future for MultiCancelFuture<'a> {
45  type Output = ();
46
47  fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48    Pin::new(&mut self.cancel_token).poll(cx)
49  }
50}
51
52impl Drop for MultiCancelFuture<'_> {
53  fn drop(&mut self) {
54    if let Some(permit) = self.permit.take() {
55      permit.forget();
56    }
57  }
58}