1use super::{Policy, Retry};
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{ready, Context, Poll};
8use tower_service::Service;
9
10pin_project! {
11 #[derive(Debug)]
13 pub struct ResponseFuture<P, S, Request>
14 where
15 P: Policy<Request, S::Response, S::Error>,
16 S: Service<Request>,
17 {
18 request: Option<Request>,
19 #[pin]
20 retry: Retry<P, S>,
21 #[pin]
22 state: State<S::Future, P::Future>,
23 }
24}
25
26pin_project! {
27 #[project = StateProj]
28 #[derive(Debug)]
29 enum State<F, P> {
30 Called {
32 #[pin]
33 future: F
34 },
35 Waiting {
37 #[pin]
38 waiting: P
39 },
40 Retrying,
42 }
43}
44
45impl<P, S, Request> ResponseFuture<P, S, Request>
46where
47 P: Policy<Request, S::Response, S::Error>,
48 S: Service<Request>,
49{
50 pub(crate) fn new(
51 request: Option<Request>,
52 retry: Retry<P, S>,
53 future: S::Future,
54 ) -> ResponseFuture<P, S, Request> {
55 ResponseFuture {
56 request,
57 retry,
58 state: State::Called { future },
59 }
60 }
61}
62
63impl<P, S, Request> Future for ResponseFuture<P, S, Request>
64where
65 P: Policy<Request, S::Response, S::Error>,
66 S: Service<Request>,
67{
68 type Output = Result<S::Response, S::Error>;
69
70 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71 let mut this = self.project();
72
73 loop {
74 match this.state.as_mut().project() {
75 StateProj::Called { future } => {
76 let mut result = ready!(future.poll(cx));
77 if let Some(req) = &mut this.request {
78 match this.retry.policy.retry(req, &mut result) {
79 Some(waiting) => {
80 this.state.set(State::Waiting { waiting });
81 }
82 None => return Poll::Ready(result),
83 }
84 } else {
85 return Poll::Ready(result);
87 }
88 }
89 StateProj::Waiting { waiting } => {
90 ready!(waiting.poll(cx));
91
92 this.state.set(State::Retrying);
93 }
94 StateProj::Retrying => {
95 ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
107 let req = this
108 .request
109 .take()
110 .expect("retrying requires cloned request");
111 *this.request = this.retry.policy.clone_request(&req);
112 this.state.set(State::Called {
113 future: this.retry.as_mut().project().service.call(req),
114 });
115 }
116 }
117 }
118 }
119}