ferron_common/http_proxy/
send_request.rs1use std::time::Duration;
2
3use bytes::Bytes;
4use http_body_util::combinators::BoxBody;
5use hyper::{Request, Response};
6use tokio::time::Instant;
7
8pub enum SendRequest {
10 Http1(hyper::client::conn::http1::SendRequest<BoxBody<Bytes, std::io::Error>>),
11 Http2(hyper::client::conn::http2::SendRequest<BoxBody<Bytes, std::io::Error>>),
12}
13
14impl SendRequest {
15 #[inline]
17 pub fn is_closed(&self) -> bool {
18 match self {
19 SendRequest::Http1(sender) => sender.is_closed(),
20 SendRequest::Http2(sender) => sender.is_closed(),
21 }
22 }
23
24 #[inline]
26 pub async fn ready(&mut self) -> bool {
27 match self {
28 SendRequest::Http1(sender) => !sender.is_closed() && sender.ready().await.is_ok(),
29 SendRequest::Http2(sender) => !sender.is_closed() && sender.ready().await.is_ok(),
30 }
31 }
32
33 #[inline]
35 pub fn is_ready(&self) -> bool {
36 match self {
37 SendRequest::Http1(sender) => sender.is_ready() && !sender.is_closed(),
38 SendRequest::Http2(sender) => sender.is_ready() && !sender.is_closed(),
39 }
40 }
41
42 #[inline]
44 pub async fn send_request(
45 &mut self,
46 request: Request<BoxBody<Bytes, std::io::Error>>,
47 ) -> Result<Response<hyper::body::Incoming>, hyper::Error> {
48 match self {
49 SendRequest::Http1(sender) => sender.send_request(request).await,
50 SendRequest::Http2(sender) => sender.send_request(request).await,
51 }
52 }
53}
54
55pub struct SendRequestWrapper {
57 inner: Option<SendRequest>,
58 instant: Instant,
59}
60
61impl SendRequestWrapper {
62 #[inline]
64 pub fn new(inner: SendRequest) -> Self {
65 Self {
66 inner: Some(inner),
67 instant: Instant::now(),
68 }
69 }
70
71 #[inline]
73 pub fn get(&mut self, timeout: Option<Duration>) -> (Option<SendRequest>, bool) {
74 let inner_mut = if let Some(inner) = self.inner.as_mut() {
75 inner
76 } else {
77 return (None, false);
78 };
79 if inner_mut.is_closed() || (inner_mut.is_ready() && timeout.is_some_and(|t| self.instant.elapsed() > t)) {
80 return (None, false);
81 }
82 (
83 if inner_mut.is_ready() {
84 self.inner.take()
85 } else {
86 self.instant = Instant::now();
87 None
88 },
89 true,
90 )
91 }
92
93 #[inline]
95 pub async fn wait_ready(&mut self, timeout: Option<Duration>) -> bool {
96 match self.inner.as_mut() {
97 None => false,
98 Some(inner) => {
99 if inner.is_ready() && timeout.is_some_and(|t| self.instant.elapsed() > t) {
100 return false;
101 }
102 inner.ready().await
103 }
104 }
105 }
106}