ferron_common/http_proxy/
send_request.rs

1use std::time::Duration;
2
3use bytes::Bytes;
4use http_body_util::combinators::BoxBody;
5use hyper::{Request, Response};
6use tokio::time::Instant;
7
8/// A wrapper around Hyper's SendRequest that can be used with multiple HTTP versions
9pub enum SendRequest {
10  Http1(hyper::client::conn::http1::SendRequest<BoxBody<Bytes, std::io::Error>>),
11  Http2(hyper::client::conn::http2::SendRequest<BoxBody<Bytes, std::io::Error>>),
12}
13
14impl SendRequest {
15  /// Checks whether the related connection is closed.
16  #[inline]
17  pub fn is_closed(&self) -> bool {
18    match self {
19      SendRequest::Http1(sender) => sender.is_closed(),
20      SendRequest::Http2(sender) => sender.is_closed(),
21    }
22  }
23
24  /// Waits until the related connection is ready.
25  #[inline]
26  pub async fn ready(&mut self) -> bool {
27    match self {
28      SendRequest::Http1(sender) => !sender.is_closed() && sender.ready().await.is_ok(),
29      SendRequest::Http2(sender) => !sender.is_closed() && sender.ready().await.is_ok(),
30    }
31  }
32
33  /// Checks whether the related connection is ready.
34  #[inline]
35  pub fn is_ready(&self) -> bool {
36    match self {
37      SendRequest::Http1(sender) => sender.is_ready() && !sender.is_closed(),
38      SendRequest::Http2(sender) => sender.is_ready() && !sender.is_closed(),
39    }
40  }
41
42  /// Send an HTTP request to the related connection.
43  #[inline]
44  pub async fn send_request(
45    &mut self,
46    request: Request<BoxBody<Bytes, std::io::Error>>,
47  ) -> Result<Response<hyper::body::Incoming>, hyper::Error> {
48    match self {
49      SendRequest::Http1(sender) => sender.send_request(request).await,
50      SendRequest::Http2(sender) => sender.send_request(request).await,
51    }
52  }
53}
54
55/// A wrapper around `SendRequest`, with idle keep-alive timeout support.
56pub struct SendRequestWrapper {
57  inner: Option<SendRequest>,
58  instant: Instant,
59}
60
61impl SendRequestWrapper {
62  /// Creates a new `SendRequestWrapper`
63  #[inline]
64  pub fn new(inner: SendRequest) -> Self {
65    Self {
66      inner: Some(inner),
67      instant: Instant::now(),
68    }
69  }
70
71  /// Gets the inner `SendRequest`, along with information on whether to put back the connection to the pool.
72  #[inline]
73  pub fn get(&mut self, timeout: Option<Duration>) -> (Option<SendRequest>, bool) {
74    let inner_mut = if let Some(inner) = self.inner.as_mut() {
75      inner
76    } else {
77      return (None, false);
78    };
79    if inner_mut.is_closed() || (inner_mut.is_ready() && timeout.is_some_and(|t| self.instant.elapsed() > t)) {
80      return (None, false);
81    }
82    (
83      if inner_mut.is_ready() {
84        self.inner.take()
85      } else {
86        self.instant = Instant::now();
87        None
88      },
89      true,
90    )
91  }
92
93  /// Waits until the inner `SendRequest` is ready. Return information on whether to put back the connection to the pool.
94  #[inline]
95  pub async fn wait_ready(&mut self, timeout: Option<Duration>) -> bool {
96    match self.inner.as_mut() {
97      None => false,
98      Some(inner) => {
99        if inner.is_ready() && timeout.is_some_and(|t| self.instant.elapsed() > t) {
100          return false;
101        }
102        inner.ready().await
103      }
104    }
105  }
106}