tower/retry/
future.rs

1//! Future types
2
3use super::{Policy, Retry};
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{ready, Context, Poll};
8use tower_service::Service;
9
10pin_project! {
11    /// The [`Future`] returned by a [`Retry`] service.
12    #[derive(Debug)]
13    pub struct ResponseFuture<P, S, Request>
14    where
15        P: Policy<Request, S::Response, S::Error>,
16        S: Service<Request>,
17    {
18        request: Option<Request>,
19        #[pin]
20        retry: Retry<P, S>,
21        #[pin]
22        state: State<S::Future, P::Future>,
23    }
24}
25
26pin_project! {
27    #[project = StateProj]
28    #[derive(Debug)]
29    enum State<F, P> {
30        // Polling the future from [`Service::call`]
31        Called {
32            #[pin]
33            future: F
34        },
35        // Polling the future from [`Policy::retry`]
36        Waiting {
37            #[pin]
38            waiting: P
39        },
40        // Polling [`Service::poll_ready`] after [`Waiting`] was OK.
41        Retrying,
42    }
43}
44
45impl<P, S, Request> ResponseFuture<P, S, Request>
46where
47    P: Policy<Request, S::Response, S::Error>,
48    S: Service<Request>,
49{
50    pub(crate) fn new(
51        request: Option<Request>,
52        retry: Retry<P, S>,
53        future: S::Future,
54    ) -> ResponseFuture<P, S, Request> {
55        ResponseFuture {
56            request,
57            retry,
58            state: State::Called { future },
59        }
60    }
61}
62
63impl<P, S, Request> Future for ResponseFuture<P, S, Request>
64where
65    P: Policy<Request, S::Response, S::Error>,
66    S: Service<Request>,
67{
68    type Output = Result<S::Response, S::Error>;
69
70    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71        let mut this = self.project();
72
73        loop {
74            match this.state.as_mut().project() {
75                StateProj::Called { future } => {
76                    let mut result = ready!(future.poll(cx));
77                    if let Some(req) = &mut this.request {
78                        match this.retry.policy.retry(req, &mut result) {
79                            Some(waiting) => {
80                                this.state.set(State::Waiting { waiting });
81                            }
82                            None => return Poll::Ready(result),
83                        }
84                    } else {
85                        // request wasn't cloned, so no way to retry it
86                        return Poll::Ready(result);
87                    }
88                }
89                StateProj::Waiting { waiting } => {
90                    ready!(waiting.poll(cx));
91
92                    this.state.set(State::Retrying);
93                }
94                StateProj::Retrying => {
95                    // NOTE: we assume here that
96                    //
97                    //   this.retry.poll_ready()
98                    //
99                    // is equivalent to
100                    //
101                    //   this.retry.service.poll_ready()
102                    //
103                    // we need to make that assumption to avoid adding an Unpin bound to the Policy
104                    // in Ready to make it Unpin so that we can get &mut Ready as needed to call
105                    // poll_ready on it.
106                    ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
107                    let req = this
108                        .request
109                        .take()
110                        .expect("retrying requires cloned request");
111                    *this.request = this.retry.policy.clone_request(&req);
112                    this.state.set(State::Called {
113                        future: this.retry.as_mut().project().service.call(req),
114                    });
115                }
116            }
117        }
118    }
119}