1use super::{error::Closed, message};
6use pin_project_lite::pin_project;
7use std::{
8 future::Future,
9 pin::Pin,
10 task::{ready, Context, Poll},
11};
12
13pin_project! {
14 #[derive(Debug)]
16 pub struct ResponseFuture<T> {
17 #[pin]
18 state: ResponseState<T>,
19 }
20}
21
22pin_project! {
23 #[project = ResponseStateProj]
24 #[derive(Debug)]
25 enum ResponseState<T> {
26 Failed {
27 error: Option<crate::BoxError>,
28 },
29 Rx {
30 #[pin]
31 rx: message::Rx<T>,
32 },
33 Poll {
34 #[pin]
35 fut: T,
36 },
37 }
38}
39
40impl<T> ResponseFuture<T> {
41 pub(crate) fn new(rx: message::Rx<T>) -> Self {
42 ResponseFuture {
43 state: ResponseState::Rx { rx },
44 }
45 }
46
47 pub(crate) fn failed(err: crate::BoxError) -> Self {
48 ResponseFuture {
49 state: ResponseState::Failed { error: Some(err) },
50 }
51 }
52}
53
54impl<F, T, E> Future for ResponseFuture<F>
55where
56 F: Future<Output = Result<T, E>>,
57 E: Into<crate::BoxError>,
58{
59 type Output = Result<T, crate::BoxError>;
60
61 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62 let mut this = self.project();
63
64 loop {
65 match this.state.as_mut().project() {
66 ResponseStateProj::Failed { error } => {
67 return Poll::Ready(Err(error.take().expect("polled after error")));
68 }
69 ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
70 Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
71 Ok(Err(e)) => return Poll::Ready(Err(e.into())),
72 Err(_) => return Poll::Ready(Err(Closed::new().into())),
73 },
74 ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
75 }
76 }
77 }
78}