tower/buffer/
future.rs

1//! Future types for the [`Buffer`] middleware.
2//!
3//! [`Buffer`]: crate::buffer::Buffer
4
5use super::{error::Closed, message};
6use pin_project_lite::pin_project;
7use std::{
8    future::Future,
9    pin::Pin,
10    task::{ready, Context, Poll},
11};
12
13pin_project! {
14    /// Future that completes when the buffered service eventually services the submitted request.
15    #[derive(Debug)]
16    pub struct ResponseFuture<T> {
17        #[pin]
18        state: ResponseState<T>,
19    }
20}
21
22pin_project! {
23    #[project = ResponseStateProj]
24    #[derive(Debug)]
25    enum ResponseState<T> {
26        Failed {
27            error: Option<crate::BoxError>,
28        },
29        Rx {
30            #[pin]
31            rx: message::Rx<T>,
32        },
33        Poll {
34            #[pin]
35            fut: T,
36        },
37    }
38}
39
40impl<T> ResponseFuture<T> {
41    pub(crate) fn new(rx: message::Rx<T>) -> Self {
42        ResponseFuture {
43            state: ResponseState::Rx { rx },
44        }
45    }
46
47    pub(crate) fn failed(err: crate::BoxError) -> Self {
48        ResponseFuture {
49            state: ResponseState::Failed { error: Some(err) },
50        }
51    }
52}
53
54impl<F, T, E> Future for ResponseFuture<F>
55where
56    F: Future<Output = Result<T, E>>,
57    E: Into<crate::BoxError>,
58{
59    type Output = Result<T, crate::BoxError>;
60
61    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62        let mut this = self.project();
63
64        loop {
65            match this.state.as_mut().project() {
66                ResponseStateProj::Failed { error } => {
67                    return Poll::Ready(Err(error.take().expect("polled after error")));
68                }
69                ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
70                    Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
71                    Ok(Err(e)) => return Poll::Ready(Err(e.into())),
72                    Err(_) => return Poll::Ready(Err(Closed::new().into())),
73                },
74                ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
75            }
76        }
77    }
78}