ferron/util/
multi_cancel.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use tokio::sync::{Semaphore, SemaphorePermit};
8use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
9
10pub struct MultiCancel {
12 semaphore: Semaphore,
13 cancel_token: CancellationToken,
14}
15
16impl MultiCancel {
17 pub fn new(max_cancels: usize) -> Self {
19 Self {
20 semaphore: Semaphore::new(max_cancels),
21 cancel_token: CancellationToken::new(),
22 }
23 }
24
25 pub fn cancel<'a>(&'a self) -> MultiCancelFuture<'a> {
27 let permit = self.semaphore.try_acquire().ok();
28 if permit.is_none() {
29 self.cancel_token.cancel();
30 }
31 MultiCancelFuture {
32 permit,
33 cancel_token: Box::pin(self.cancel_token.cancelled()),
34 }
35 }
36}
37
38pub struct MultiCancelFuture<'a> {
40 permit: Option<SemaphorePermit<'a>>,
41 cancel_token: Pin<Box<WaitForCancellationFuture<'a>>>,
42}
43
44impl<'a> Future for MultiCancelFuture<'a> {
45 type Output = ();
46
47 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48 Pin::new(&mut self.cancel_token).poll(cx)
49 }
50}
51
52impl Drop for MultiCancelFuture<'_> {
53 fn drop(&mut self) {
54 if let Some(permit) = self.permit.take() {
55 permit.forget();
56 }
57 }
58}