ferron/
handler.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Display;
4use std::net::{IpAddr, SocketAddr};
5use std::sync::Arc;
6use std::time::Duration;
7#[cfg(not(feature = "runtime-vibeio"))]
8use std::time::SystemTime;
9
10use crate::acme::ACME_TLS_ALPN_NAME;
11use crate::config::ServerConfigurations;
12use crate::get_value;
13use crate::listener_handler_communication::ConnectionData;
14use crate::request_handler::request_handler;
15#[cfg(feature = "runtime-monoio")]
16use crate::util::SendAsyncIo;
17use crate::util::{read_proxy_header, MultiCancel};
18use arc_swap::ArcSwap;
19use async_channel::{Receiver, Sender};
20#[cfg(not(feature = "runtime-vibeio"))]
21use bytes::{Buf, Bytes};
22#[cfg(feature = "runtime-vibeio")]
23use core_affinity::CoreId;
24use ferron_common::logging::LogMessage;
25use http_body_util::BodyExt;
26#[cfg(not(feature = "runtime-vibeio"))]
27use http_body_util::StreamBody;
28#[cfg(not(feature = "runtime-vibeio"))]
29use hyper::body::{Frame, Incoming};
30#[cfg(not(feature = "runtime-vibeio"))]
31use hyper::service::service_fn;
32use hyper::Request;
33#[cfg(not(feature = "runtime-vibeio"))]
34use hyper::Response;
35#[cfg(feature = "runtime-tokio")]
36use hyper_util::rt::{TokioIo, TokioTimer};
37#[cfg(feature = "runtime-monoio")]
38use monoio::io::IntoPollIo;
39#[cfg(feature = "runtime-monoio")]
40use monoio::net::tcp::stream_poll::TcpStreamPoll;
41#[cfg(feature = "runtime-monoio")]
42use monoio::net::TcpStream;
43#[cfg(feature = "runtime-monoio")]
44use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
45use rustls::server::Acceptor;
46use rustls::ServerConfig;
47#[cfg(feature = "runtime-tokio")]
48use tokio::net::TcpStream;
49use tokio_rustls::server::TlsStream;
50use tokio_rustls::LazyConfigAcceptor;
51use tokio_util::sync::CancellationToken;
52#[cfg(feature = "runtime-vibeio")]
53use vibeio::net::PollTcpStream;
54#[cfg(feature = "runtime-vibeio")]
55use vibeio::net::TcpStream;
56
57#[cfg(not(feature = "runtime-vibeio"))]
58static HTTP3_INVALID_HEADERS: [hyper::header::HeaderName; 5] = [
59  hyper::header::HeaderName::from_static("keep-alive"),
60  hyper::header::HeaderName::from_static("proxy-connection"),
61  hyper::header::TRANSFER_ENCODING,
62  hyper::header::TE,
63  hyper::header::UPGRADE,
64];
65
66/// A struct holding reloadable data for handler threads
67#[allow(clippy::type_complexity)]
68pub struct ReloadableHandlerData {
69  /// ACME TLS-ALPN-01 configurations
70  pub acme_tls_alpn_01_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<ServerConfig>>>,
71  /// ACME HTTP-01 resolvers
72  pub acme_http_01_resolvers: AcmeHttp01Resolvers,
73  /// Server configurations
74  pub configurations: Arc<ServerConfigurations>,
75  /// TLS configurations
76  pub tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<ServerConfig>>>,
77  /// Whether HTTP/3 is enabled
78  pub http3_enabled: bool,
79  /// Whether PROXY protocol is enabled
80  pub enable_proxy_protocol: bool,
81  /// QUIC TLS configurations
82  pub quic_tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<quinn::ServerConfig>>>,
83}
84
85type AcmeHttp01Resolvers = Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>;
86
87/// Tokio local executor
88#[cfg(feature = "runtime-tokio")]
89#[derive(Clone, Copy, Debug)]
90pub struct TokioLocalExecutor;
91
92#[cfg(feature = "runtime-tokio")]
93impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
94where
95  F: std::future::Future + 'static,
96  F::Output: 'static,
97{
98  #[inline]
99  fn execute(&self, fut: F) {
100    tokio::task::spawn_local(fut);
101  }
102}
103
104/// Creates a HTTP request handler
105pub fn create_http_handler(
106  reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
107  rx: Receiver<ConnectionData>,
108  enable_uring: Option<bool>,
109  io_uring_disabled: Sender<Option<std::io::Error>>,
110  multi_cancel: Arc<MultiCancel>,
111  #[cfg(feature = "runtime-vibeio")] core_affinity: Option<CoreId>,
112) -> Result<(CancellationToken, Sender<()>), Box<dyn Error + Send + Sync>> {
113  let shutdown_tx = CancellationToken::new();
114  let shutdown_rx = shutdown_tx.clone();
115  let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
116  let (graceful_tx, graceful_rx) = async_channel::unbounded();
117  std::thread::Builder::new()
118    .name("Request handler".to_string())
119    .spawn(move || {
120      #[cfg(feature = "runtime-vibeio")]
121      if let Some(affinity) = core_affinity {
122        core_affinity::set_for_current(affinity);
123      }
124      let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
125        Ok(rt) => rt,
126        Err(error) => {
127          handler_init_tx
128            .send_blocking(Some(
129              anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
130            ))
131            .unwrap_or_default();
132          return;
133        }
134      };
135      io_uring_disabled
136        .send_blocking(rt.return_io_uring_error())
137        .unwrap_or_default();
138      rt.run(async move {
139        if let Some(error) = http_handler_fn(
140          reloadable_data,
141          rx,
142          &handler_init_tx,
143          shutdown_rx,
144          graceful_rx,
145          multi_cancel,
146        )
147        .await
148        .err()
149        {
150          handler_init_tx.send(Some(error)).await.unwrap_or_default();
151        }
152      });
153    })?;
154
155  if let Some(error) = listen_error_rx.recv_blocking()? {
156    Err(error)?;
157  }
158
159  Ok((shutdown_tx, graceful_tx))
160}
161
162/// HTTP handler function
163#[inline]
164async fn http_handler_fn(
165  reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
166  rx: Receiver<ConnectionData>,
167  handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
168  shutdown_rx: CancellationToken,
169  graceful_rx: Receiver<()>,
170  multi_cancel: Arc<MultiCancel>,
171) -> Result<(), Box<dyn Error + Send + Sync>> {
172  handler_init_tx.send(None).await.unwrap_or_default();
173
174  let connections_references = Arc::new(());
175  let graceful_shutdown_token = Arc::new(ArcSwap::from_pointee(CancellationToken::new()));
176  let graceful_shutdown_token_clone = graceful_shutdown_token.clone();
177
178  let mut graceful_rx_recv_future = Box::pin(async move {
179    while graceful_rx.recv().await.is_ok() {
180      graceful_shutdown_token_clone
181        .swap(Arc::new(CancellationToken::new()))
182        .cancel();
183    }
184
185    futures_util::future::pending::<()>().await;
186  });
187
188  loop {
189    let conn_data = crate::runtime::select! {
190        biased;
191
192        _ = &mut graceful_rx_recv_future => {
193            // This future should be always pending...
194            break;
195        }
196        _ = shutdown_rx.cancelled() => {
197            break;
198        }
199        result = rx.recv() => {
200            if let Ok(recv_data) = result {
201                recv_data
202            } else {
203                break;
204            }
205        }
206    };
207    let ReloadableHandlerData {
208      configurations,
209      tls_configs,
210      http3_enabled,
211      acme_tls_alpn_01_configs,
212      acme_http_01_resolvers,
213      enable_proxy_protocol,
214      quic_tls_configs,
215    } = &**reloadable_data.load();
216    let quic_tls_configs = quic_tls_configs.clone();
217    let configurations = configurations.clone();
218    let tls_config = if matches!(
219      conn_data.connection,
220      crate::listener_handler_communication::Connection::Quic(..)
221    ) {
222      None
223    } else {
224      tls_configs
225        .get(&(
226          Some(conn_data.server_address.ip().to_canonical()),
227          conn_data.server_address.port(),
228        ))
229        .cloned()
230        .or_else(|| tls_configs.get(&(None, conn_data.server_address.port())).cloned())
231    };
232    let acme_tls_alpn_01_config = if matches!(
233      conn_data.connection,
234      crate::listener_handler_communication::Connection::Quic(..)
235    ) {
236      None
237    } else {
238      acme_tls_alpn_01_configs
239        .get(&(
240          Some(conn_data.server_address.ip().to_canonical()),
241          conn_data.server_address.port(),
242        ))
243        .cloned()
244        .or_else(|| {
245          acme_tls_alpn_01_configs
246            .get(&(None, conn_data.server_address.port()))
247            .cloned()
248        })
249    };
250    let acme_http_01_resolvers = acme_http_01_resolvers.clone();
251    let connections_references_cloned = connections_references.clone();
252    let shutdown_rx_clone = shutdown_rx.clone();
253    let http3_enabled = *http3_enabled;
254    let enable_proxy_protocol = *enable_proxy_protocol;
255    let graceful_shutdown_token = graceful_shutdown_token.load().clone();
256    crate::runtime::spawn(async move {
257      match conn_data.connection {
258        crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
259          // Toggle O_NONBLOCK for TCP stream, when using Monoio.
260          // Unset it when io_uring is enabled, and set it otherwise.
261          #[cfg(feature = "runtime-monoio")]
262          let _ = tcp_stream.set_nonblocking(monoio::utils::is_legacy());
263          #[cfg(feature = "runtime-vibeio")]
264          let _ = tcp_stream.set_nonblocking(vibeio::util::supports_completion());
265
266          #[cfg(any(feature = "runtime-vibeio", feature = "runtime-monoio"))]
267          let tcp_stream = match TcpStream::from_std(tcp_stream) {
268            Ok(stream) => stream,
269            Err(err) => {
270              log_connection_accept_error(&configurations, err).await;
271              return;
272            }
273          };
274          let encrypted = tls_config.is_some();
275          http_tcp_handler_fn(
276            tcp_stream,
277            conn_data.client_address,
278            conn_data.server_address,
279            configurations,
280            tls_config,
281            http3_enabled && encrypted,
282            connections_references_cloned,
283            acme_tls_alpn_01_config,
284            acme_http_01_resolvers,
285            enable_proxy_protocol,
286            shutdown_rx_clone,
287            graceful_shutdown_token,
288          )
289          .await;
290        }
291        crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
292          http_quic_handler_fn(
293            quic_incoming,
294            conn_data.client_address,
295            conn_data.server_address,
296            configurations,
297            quic_tls_configs,
298            connections_references_cloned,
299            shutdown_rx_clone,
300            graceful_shutdown_token,
301          )
302          .await;
303        }
304      }
305    });
306  }
307
308  while Arc::weak_count(&connections_references) > 0 {
309    crate::runtime::sleep(Duration::from_millis(100)).await;
310  }
311
312  // Wait until all connections are closed, then shut down all the previous handler threads
313  multi_cancel.cancel().await;
314
315  Ok(())
316}
317
318/// Enum for maybe TLS stream
319#[cfg(feature = "runtime-monoio")]
320type HttpTcpStream = SendAsyncIo<TcpStreamPoll>;
321
322#[cfg(feature = "runtime-vibeio")]
323type HttpTcpStream = PollTcpStream;
324
325#[cfg(feature = "runtime-tokio")]
326type HttpTcpStream = TcpStream;
327
328/// Enum for maybe TLS stream
329#[allow(clippy::large_enum_variant)]
330enum MaybeTlsStream {
331  /// TLS stream
332  Tls(TlsStream<HttpTcpStream>),
333
334  /// Plain TCP stream
335  Plain(HttpTcpStream),
336}
337
338#[derive(Clone, Copy, Default)]
339struct Http2Settings {
340  initial_window_size: Option<u32>,
341  max_frame_size: Option<u32>,
342  max_concurrent_streams: Option<u32>,
343  max_header_list_size: Option<u32>,
344  enable_connect_protocol: bool,
345}
346
347#[inline]
348fn get_http2_settings(configurations: &ServerConfigurations) -> Http2Settings {
349  let global_configuration = configurations.find_global_configuration();
350
351  Http2Settings {
352    initial_window_size: global_configuration
353      .as_deref()
354      .and_then(|c| get_value!("h2_initial_window_size", c))
355      .and_then(|v| v.as_i128())
356      .map(|v| v as u32),
357    max_frame_size: global_configuration
358      .as_deref()
359      .and_then(|c| get_value!("h2_max_frame_size", c))
360      .and_then(|v| v.as_i128())
361      .map(|v| v as u32),
362    max_concurrent_streams: global_configuration
363      .as_deref()
364      .and_then(|c| get_value!("h2_max_concurrent_streams", c))
365      .and_then(|v| v.as_i128())
366      .map(|v| v as u32),
367    max_header_list_size: global_configuration
368      .as_deref()
369      .and_then(|c| get_value!("h2_max_header_list_size", c))
370      .and_then(|v| v.as_i128())
371      .map(|v| v as u32),
372    enable_connect_protocol: global_configuration
373      .as_deref()
374      .and_then(|c| get_value!("h2_enable_connect_protocol", c))
375      .and_then(|v| v.as_bool())
376      .unwrap_or(false),
377  }
378}
379
380#[inline]
381fn get_http3_port(http3_enabled: bool, server_address: SocketAddr) -> Option<u16> {
382  if http3_enabled {
383    Some(server_address.port())
384  } else {
385    None
386  }
387}
388
389#[inline]
390fn empty_acme_http_01_resolvers() -> AcmeHttp01Resolvers {
391  Arc::new(tokio::sync::RwLock::new(Vec::new()))
392}
393
394#[inline]
395async fn log_handler_error(configurations: &ServerConfigurations, message: impl Into<String>) {
396  let message = message.into();
397  let global_configuration = configurations.find_global_configuration();
398  let log_channels = global_configuration
399    .as_deref()
400    .map_or(&[][..], |c| c.observability.log_channels.as_slice());
401  for logging_tx in log_channels {
402    logging_tx
403      .send(LogMessage::new(message.clone(), true))
404      .await
405      .unwrap_or_default();
406  }
407}
408
409#[inline]
410async fn log_connection_accept_error(configurations: &ServerConfigurations, err: impl Display) {
411  log_handler_error(configurations, format!("Cannot accept a connection: {err}")).await;
412}
413
414#[inline]
415async fn log_http_connection_error(configurations: &ServerConfigurations, protocol: &str, err: impl Display) {
416  log_handler_error(configurations, format!("Error serving {protocol} connection: {err}")).await;
417}
418
419#[cfg(feature = "runtime-monoio")]
420#[inline]
421async fn convert_tcp_stream_for_runtime(
422  tcp_stream: TcpStream,
423  configurations: &Arc<ServerConfigurations>,
424) -> Option<HttpTcpStream> {
425  match tcp_stream.into_poll_io() {
426    Ok(stream) => Some(SendAsyncIo::new(stream)),
427    Err(err) => {
428      log_connection_accept_error(configurations, err).await;
429      None
430    }
431  }
432}
433
434#[cfg(feature = "runtime-vibeio")]
435#[inline]
436async fn convert_tcp_stream_for_runtime(
437  tcp_stream: TcpStream,
438  configurations: &Arc<ServerConfigurations>,
439) -> Option<HttpTcpStream> {
440  match tcp_stream.into_poll() {
441    Ok(stream) => Some(stream),
442    Err(err) => {
443      log_connection_accept_error(configurations, err).await;
444      None
445    }
446  }
447}
448
449#[cfg(feature = "runtime-tokio")]
450#[inline]
451async fn convert_tcp_stream_for_runtime(
452  tcp_stream: TcpStream,
453  _configurations: &Arc<ServerConfigurations>,
454) -> Option<HttpTcpStream> {
455  Some(tcp_stream)
456}
457
458#[inline]
459async fn maybe_read_proxy_protocol_header(
460  tcp_stream: HttpTcpStream,
461  enable_proxy_protocol: bool,
462  configurations: &Arc<ServerConfigurations>,
463) -> Option<(HttpTcpStream, Option<SocketAddr>, Option<SocketAddr>)> {
464  if !enable_proxy_protocol {
465    return Some((tcp_stream, None, None));
466  }
467
468  match read_proxy_header(tcp_stream).await {
469    Ok((stream, client_address, server_address)) => Some((stream, client_address, server_address)),
470    Err(err) => {
471      log_handler_error(configurations, format!("Error reading PROXY protocol header: {err}")).await;
472      None
473    }
474  }
475}
476
477#[inline]
478async fn maybe_accept_tls_stream(
479  tcp_stream: HttpTcpStream,
480  tls_config: Option<Arc<ServerConfig>>,
481  acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
482  configurations: &Arc<ServerConfigurations>,
483) -> Option<MaybeTlsStream> {
484  let Some(tls_config) = tls_config else {
485    return Some(MaybeTlsStream::Plain(tcp_stream));
486  };
487
488  let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
489    Ok(start_handshake) => start_handshake,
490    Err(err) => {
491      log_handler_error(configurations, format!("Error during TLS handshake: {err}")).await;
492      return None;
493    }
494  };
495
496  if let Some(acme_config) = acme_tls_alpn_01_config {
497    if start_handshake
498      .client_hello()
499      .alpn()
500      .into_iter()
501      .flatten()
502      .eq([ACME_TLS_ALPN_NAME])
503    {
504      if let Err(err) = start_handshake.into_stream(acme_config).await {
505        log_handler_error(configurations, format!("Error during TLS handshake: {err}")).await;
506      }
507      return None;
508    }
509  }
510
511  match start_handshake.into_stream(tls_config).await {
512    Ok(tls_stream) => Some(MaybeTlsStream::Tls(tls_stream)),
513    Err(err) => {
514      log_handler_error(configurations, format!("Error during TLS handshake: {err}")).await;
515      None
516    }
517  }
518}
519
520#[cfg(not(feature = "runtime-vibeio"))]
521#[inline]
522fn sanitize_http3_response_headers(response_headers: &mut hyper::HeaderMap) {
523  if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
524    response_headers.entry(hyper::header::DATE).or_insert(http_date);
525  }
526  for header in &HTTP3_INVALID_HEADERS {
527    response_headers.remove(header);
528  }
529  if let Some(connection_header) = response_headers
530    .remove(hyper::header::CONNECTION)
531    .as_ref()
532    .and_then(|v| v.to_str().ok())
533  {
534    for name in connection_header.split(',') {
535      response_headers.remove(name.trim());
536    }
537  }
538}
539
540/// HTTP/1.x and HTTP/2 handler function
541#[allow(clippy::too_many_arguments)]
542#[inline]
543async fn http_tcp_handler_fn(
544  tcp_stream: TcpStream,
545  client_address: SocketAddr,
546  server_address: SocketAddr,
547  configurations: Arc<ServerConfigurations>,
548  tls_config: Option<Arc<ServerConfig>>,
549  http3_enabled: bool,
550  connection_reference: Arc<()>,
551  acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
552  acme_http_01_resolvers: AcmeHttp01Resolvers,
553  enable_proxy_protocol: bool,
554  shutdown_rx: CancellationToken,
555  graceful_shutdown_token: Arc<CancellationToken>,
556) {
557  let _connection_reference = Arc::downgrade(&connection_reference);
558  let Some(tcp_stream) = convert_tcp_stream_for_runtime(tcp_stream, &configurations).await else {
559    return;
560  };
561  let Some((tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address)) =
562    maybe_read_proxy_protocol_header(tcp_stream, enable_proxy_protocol, &configurations).await
563  else {
564    return;
565  };
566  let Some(maybe_tls_stream) =
567    maybe_accept_tls_stream(tcp_stream, tls_config, acme_tls_alpn_01_config, &configurations).await
568  else {
569    return;
570  };
571
572  if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
573    let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
574    let is_http2 = alpn_protocol == Some("h2".as_bytes());
575
576    #[cfg(feature = "runtime-tokio")]
577    let io = TokioIo::new(tls_stream);
578
579    // Ferron with Vibeio would use `vibeio-http` for HTTP
580    #[cfg(feature = "runtime-vibeio")]
581    if is_http2 {
582      use vibeio_http::{Http2Options, HttpProtocol};
583
584      let mut h2_options = Http2Options::default();
585      let http2_builder = h2_options.h2_builder();
586      let http2_settings = get_http2_settings(&configurations);
587      if let Some(initial_window_size) = http2_settings.initial_window_size {
588        http2_builder.initial_window_size(initial_window_size);
589      }
590      if let Some(max_frame_size) = http2_settings.max_frame_size {
591        http2_builder.max_frame_size(max_frame_size);
592      }
593      if let Some(max_concurrent_streams) = http2_settings.max_concurrent_streams {
594        http2_builder.max_concurrent_streams(max_concurrent_streams);
595      }
596      if let Some(max_header_list_size) = http2_settings.max_header_list_size {
597        http2_builder.max_header_list_size(max_header_list_size);
598      }
599      if http2_settings.enable_connect_protocol {
600        http2_builder.enable_connect_protocol();
601      }
602
603      let configurations_clone = configurations.clone();
604      let graceful_shutdown_token2 = CancellationToken::new();
605      let connection_reference = _connection_reference.clone();
606      let http_future = vibeio_http::Http2::new(tls_stream, h2_options)
607        .graceful_shutdown_token(graceful_shutdown_token2.clone())
608        .handle(move |request: Request<vibeio_http::Incoming>| {
609          let (request_parts, request_body) = request.into_parts();
610          let request = Request::from_parts(
611            request_parts,
612            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
613          );
614          let fut = request_handler(
615            request,
616            client_address,
617            server_address,
618            true,
619            configurations_clone.clone(),
620            get_http3_port(http3_enabled, server_address),
621            acme_http_01_resolvers.clone(),
622            proxy_protocol_client_address,
623            proxy_protocol_server_address,
624          );
625          let connection_reference = connection_reference.clone();
626          async move {
627            let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
628            drop(connection_reference);
629            r
630          }
631        });
632      let mut http_future_pin = std::pin::pin!(http_future);
633      let http_future_result = crate::runtime::select! {
634        result = &mut http_future_pin => {
635          result
636        }
637        _ = shutdown_rx.cancelled() => {
638            graceful_shutdown_token2.cancel();
639            http_future_pin.await
640        }
641        _ = graceful_shutdown_token.cancelled() => {
642            graceful_shutdown_token2.cancel();
643          http_future_pin.await
644        }
645      };
646      if let Err(err) = http_future_result {
647        log_http_connection_error(&configurations, "HTTPS", err).await;
648      }
649    } else {
650      use vibeio_http::{Http1Options, HttpProtocol};
651
652      let configurations_clone = configurations.clone();
653      let graceful_shutdown_token2 = CancellationToken::new();
654      let connection_reference = _connection_reference.clone();
655      let mut http_future = Box::pin(
656        vibeio_http::Http1::new(tls_stream, Http1Options::default())
657          .graceful_shutdown_token(graceful_shutdown_token2.clone())
658          .handle(move |request: Request<vibeio_http::Incoming>| {
659            let (request_parts, request_body) = request.into_parts();
660            let request = Request::from_parts(
661              request_parts,
662              request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
663            );
664            let fut = request_handler(
665              request,
666              client_address,
667              server_address,
668              true,
669              configurations_clone.clone(),
670              get_http3_port(http3_enabled, server_address),
671              acme_http_01_resolvers.clone(),
672              proxy_protocol_client_address,
673              proxy_protocol_server_address,
674            );
675            let connection_reference = connection_reference.clone();
676            async move {
677              let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
678              drop(connection_reference);
679              r
680            }
681          }),
682      );
683      let http_future_result = crate::runtime::select! {
684        result = &mut http_future => {
685          result
686        }
687        _ = shutdown_rx.cancelled() => {
688            graceful_shutdown_token2.cancel();
689            http_future.await
690        }
691        _ = graceful_shutdown_token.cancelled() => {
692            graceful_shutdown_token2.cancel();
693          http_future.await
694        }
695      };
696      if let Err(err) = http_future_result {
697        log_http_connection_error(&configurations, "HTTPS", err).await;
698      }
699    }
700
701    #[cfg(not(feature = "runtime-vibeio"))]
702    if is_http2 {
703      // Hyper's HTTP/2 connection doesn't require underlying I/O to be `Send`.
704      #[cfg(feature = "runtime-monoio")]
705      let io = MonoioIo::new(tls_stream);
706
707      #[cfg(feature = "runtime-monoio")]
708      let mut http2_builder = {
709        let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
710        http2_builder.timer(MonoioTimer);
711        http2_builder
712      };
713      #[cfg(feature = "runtime-tokio")]
714      let mut http2_builder = {
715        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
716        http2_builder.timer(TokioTimer::new());
717        http2_builder
718      };
719
720      let http2_settings = get_http2_settings(&configurations);
721      if let Some(initial_window_size) = http2_settings.initial_window_size {
722        http2_builder.initial_stream_window_size(initial_window_size);
723      }
724      if let Some(max_frame_size) = http2_settings.max_frame_size {
725        http2_builder.max_frame_size(max_frame_size);
726      }
727      if let Some(max_concurrent_streams) = http2_settings.max_concurrent_streams {
728        http2_builder.max_concurrent_streams(max_concurrent_streams);
729      }
730      if let Some(max_header_list_size) = http2_settings.max_header_list_size {
731        http2_builder.max_header_list_size(max_header_list_size);
732      }
733      if http2_settings.enable_connect_protocol {
734        http2_builder.enable_connect_protocol();
735      }
736
737      let configurations_clone = configurations.clone();
738      let mut http_future = http2_builder.serve_connection(
739        io,
740        service_fn(move |request: Request<Incoming>| {
741          let (request_parts, request_body) = request.into_parts();
742          let request = Request::from_parts(
743            request_parts,
744            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
745          );
746          request_handler(
747            request,
748            client_address,
749            server_address,
750            true,
751            configurations_clone.clone(),
752            get_http3_port(http3_enabled, server_address),
753            acme_http_01_resolvers.clone(),
754            proxy_protocol_client_address,
755            proxy_protocol_server_address,
756          )
757        }),
758      );
759      let http_future_result = crate::runtime::select! {
760        result = &mut http_future => {
761          result
762        }
763        _ = shutdown_rx.cancelled() => {
764          std::pin::Pin::new(&mut http_future).graceful_shutdown();
765          http_future.await
766        }
767        _ = graceful_shutdown_token.cancelled() => {
768          std::pin::Pin::new(&mut http_future).graceful_shutdown();
769          http_future.await
770        }
771      };
772      if let Err(err) = http_future_result {
773        let error_to_log = if err.is_user() {
774          err.source().unwrap_or(&err)
775        } else {
776          &err
777        };
778        log_http_connection_error(&configurations, "HTTPS", error_to_log).await;
779      }
780    } else {
781      #[cfg(feature = "runtime-monoio")]
782      let io = MonoioIo::new(tls_stream);
783
784      #[cfg(feature = "runtime-monoio")]
785      let http1_builder = {
786        let mut http1_builder = hyper::server::conn::http1::Builder::new();
787
788        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
789        http1_builder.timer(MonoioTimer);
790
791        http1_builder
792      };
793      #[cfg(feature = "runtime-tokio")]
794      let http1_builder = {
795        let mut http1_builder = hyper::server::conn::http1::Builder::new();
796
797        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
798        http1_builder.timer(TokioTimer::new());
799
800        http1_builder
801      };
802
803      let configurations_clone = configurations.clone();
804      let mut http_future = http1_builder
805        .serve_connection(
806          io,
807          service_fn(move |request: Request<Incoming>| {
808            let (request_parts, request_body) = request.into_parts();
809            let request = Request::from_parts(
810              request_parts,
811              request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
812            );
813            request_handler(
814              request,
815              client_address,
816              server_address,
817              true,
818              configurations_clone.clone(),
819              get_http3_port(http3_enabled, server_address),
820              acme_http_01_resolvers.clone(),
821              proxy_protocol_client_address,
822              proxy_protocol_server_address,
823            )
824          }),
825        )
826        .with_upgrades();
827      let http_future_result = crate::runtime::select! {
828        result = &mut http_future => {
829          result
830        }
831        _ = shutdown_rx.cancelled() => {
832          std::pin::Pin::new(&mut http_future).graceful_shutdown();
833          http_future.await
834        }
835        _ = graceful_shutdown_token.cancelled() => {
836          std::pin::Pin::new(&mut http_future).graceful_shutdown();
837          http_future.await
838        }
839      };
840      if let Err(err) = http_future_result {
841        let error_to_log = if err.is_user() {
842          err.source().unwrap_or(&err)
843        } else {
844          &err
845        };
846        log_http_connection_error(&configurations, "HTTPS", error_to_log).await;
847      }
848    }
849  } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
850    #[cfg(feature = "runtime-vibeio")]
851    {
852      use vibeio_http::{Http1Options, HttpProtocol};
853
854      let configurations_clone = configurations.clone();
855      let connection_reference = _connection_reference.clone();
856      let graceful_shutdown_token2 = CancellationToken::new();
857      let http1 = vibeio_http::Http1::new(stream, Http1Options::default())
858        .graceful_shutdown_token(graceful_shutdown_token2.clone());
859
860      #[cfg(target_os = "linux")]
861      let mut http_future = Box::pin(http1.zerocopy().handle(move |request: Request<vibeio_http::Incoming>| {
862        let (request_parts, request_body) = request.into_parts();
863        let request = Request::from_parts(
864          request_parts,
865          request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
866        );
867        let fut = request_handler(
868          request,
869          client_address,
870          server_address,
871          false,
872          configurations_clone.clone(),
873          get_http3_port(http3_enabled, server_address),
874          acme_http_01_resolvers.clone(),
875          proxy_protocol_client_address,
876          proxy_protocol_server_address,
877        );
878        let connection_reference = connection_reference.clone();
879        async move {
880          let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
881          drop(connection_reference);
882          r
883        }
884      }));
885      #[cfg(not(target_os = "linux"))]
886      let mut http_future = Box::pin(http1.handle(move |request: Request<vibeio_http::Incoming>| {
887        let (request_parts, request_body) = request.into_parts();
888        let request = Request::from_parts(
889          request_parts,
890          request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
891        );
892        let fut = request_handler(
893          request,
894          client_address,
895          server_address,
896          true,
897          configurations_clone.clone(),
898          get_http3_port(http3_enabled, server_address),
899          acme_http_01_resolvers.clone(),
900          proxy_protocol_client_address,
901          proxy_protocol_server_address,
902        );
903        let connection_reference = connection_reference.clone();
904        async move {
905          let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
906          drop(connection_reference);
907          r
908        }
909      }));
910      let http_future_result = crate::runtime::select! {
911        result = &mut http_future => {
912          result
913        }
914        _ = shutdown_rx.cancelled() => {
915            graceful_shutdown_token2.cancel();
916            http_future.await
917        }
918        _ = graceful_shutdown_token.cancelled() => {
919            graceful_shutdown_token2.cancel();
920          http_future.await
921        }
922      };
923      if let Err(err) = http_future_result {
924        log_http_connection_error(&configurations, "HTTP", err).await;
925      }
926    }
927    #[cfg(not(feature = "runtime-vibeio"))]
928    {
929      #[cfg(feature = "runtime-monoio")]
930      let io = MonoioIo::new(stream);
931      #[cfg(feature = "runtime-tokio")]
932      let io = TokioIo::new(stream);
933
934      #[cfg(feature = "runtime-monoio")]
935      let http1_builder = {
936        let mut http1_builder = hyper::server::conn::http1::Builder::new();
937
938        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
939        http1_builder.timer(MonoioTimer);
940
941        http1_builder
942      };
943      #[cfg(feature = "runtime-tokio")]
944      let http1_builder = {
945        let mut http1_builder = hyper::server::conn::http1::Builder::new();
946
947        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
948        http1_builder.timer(TokioTimer::new());
949
950        http1_builder
951      };
952
953      let configurations_clone = configurations.clone();
954      let mut http_future = http1_builder
955        .serve_connection(
956          io,
957          service_fn(move |request: Request<Incoming>| {
958            let (request_parts, request_body) = request.into_parts();
959            let request = Request::from_parts(
960              request_parts,
961              request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
962            );
963            request_handler(
964              request,
965              client_address,
966              server_address,
967              false,
968              configurations_clone.clone(),
969              get_http3_port(http3_enabled, server_address),
970              acme_http_01_resolvers.clone(),
971              proxy_protocol_client_address,
972              proxy_protocol_server_address,
973            )
974          }),
975        )
976        .with_upgrades();
977      let http_future_result = crate::runtime::select! {
978        result = &mut http_future => {
979          result
980        }
981        _ = shutdown_rx.cancelled() => {
982          std::pin::Pin::new(&mut http_future).graceful_shutdown();
983          http_future.await
984        }
985        _ = graceful_shutdown_token.cancelled() => {
986          std::pin::Pin::new(&mut http_future).graceful_shutdown();
987          http_future.await
988        }
989      };
990      if let Err(err) = http_future_result {
991        let error_to_log = if err.is_user() {
992          err.source().unwrap_or(&err)
993        } else {
994          &err
995        };
996        log_http_connection_error(&configurations, "HTTP", error_to_log).await;
997      }
998    }
999  }
1000}
1001
1002/// HTTP/3 handler function
1003#[inline]
1004#[cfg(feature = "runtime-vibeio")]
1005#[allow(clippy::too_many_arguments)]
1006#[allow(clippy::type_complexity)]
1007async fn http_quic_handler_fn(
1008  connection_attempt: quinn::Incoming,
1009  client_address: SocketAddr,
1010  server_address: SocketAddr,
1011  configurations: Arc<ServerConfigurations>,
1012  quic_tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<quinn::ServerConfig>>>,
1013  connection_reference: Arc<()>,
1014  shutdown_rx: CancellationToken,
1015  graceful_shutdown_token: Arc<CancellationToken>,
1016) {
1017  use vibeio_http::{Http3Options, HttpProtocol};
1018
1019  let connection = if let Some(tls_config) = quic_tls_configs
1020    .get(&(Some(server_address.ip().to_canonical()), server_address.port()))
1021    .cloned()
1022    .or_else(|| quic_tls_configs.get(&(None, server_address.port())).cloned())
1023  {
1024    match connection_attempt.accept_with(tls_config) {
1025      Ok(connecting) => match connecting.await {
1026        Ok(connection) => connection,
1027        Err(err) => {
1028          log_connection_accept_error(&configurations, err).await;
1029          return;
1030        }
1031      },
1032      Err(err) => {
1033        log_connection_accept_error(&configurations, err).await;
1034        return;
1035      }
1036    }
1037  } else {
1038    match connection_attempt.await {
1039      Ok(connection) => connection,
1040      Err(err) => {
1041        log_connection_accept_error(&configurations, err).await;
1042        return;
1043      }
1044    }
1045  };
1046
1047  let _connection_reference = Arc::downgrade(&connection_reference);
1048  let configurations_clone = configurations.clone();
1049  let graceful_shutdown_token2 = CancellationToken::new();
1050  let mut http_future = Box::pin(
1051    vibeio_http::Http3::new(h3_quinn::Connection::new(connection), Http3Options::default())
1052      .graceful_shutdown_token(graceful_shutdown_token2.clone())
1053      .handle(move |request: Request<vibeio_http::Incoming>| {
1054        let (request_parts, request_body) = request.into_parts();
1055        let request = Request::from_parts(
1056          request_parts,
1057          request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
1058        );
1059        let fut = request_handler(
1060          request,
1061          client_address,
1062          server_address,
1063          true,
1064          configurations_clone.clone(),
1065          None,
1066          empty_acme_http_01_resolvers(),
1067          None,
1068          None,
1069        );
1070        let connection_reference = connection_reference.clone();
1071        async move {
1072          let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
1073          drop(connection_reference);
1074          r
1075        }
1076      }),
1077  );
1078  let http_future_result = crate::runtime::select! {
1079    result = &mut http_future => {
1080      result
1081    }
1082    _ = shutdown_rx.cancelled() => {
1083        graceful_shutdown_token2.cancel();
1084        http_future.await
1085    }
1086    _ = graceful_shutdown_token.cancelled() => {
1087        graceful_shutdown_token2.cancel();
1088      http_future.await
1089    }
1090  };
1091  if let Err(err) = http_future_result {
1092    log_http_connection_error(&configurations, "HTTP/3", err).await;
1093  }
1094}
1095
1096/// HTTP/3 handler function
1097#[cfg(not(feature = "runtime-vibeio"))]
1098#[inline]
1099#[allow(clippy::too_many_arguments)]
1100#[allow(clippy::type_complexity)]
1101async fn http_quic_handler_fn(
1102  connection_attempt: quinn::Incoming,
1103  client_address: SocketAddr,
1104  server_address: SocketAddr,
1105  configurations: Arc<ServerConfigurations>,
1106  quic_tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<quinn::ServerConfig>>>,
1107  connection_reference: Arc<()>,
1108  shutdown_rx: CancellationToken,
1109  graceful_shutdown_token: Arc<CancellationToken>,
1110) {
1111  let connection = if let Some(tls_config) = quic_tls_configs
1112    .get(&(Some(server_address.ip().to_canonical()), server_address.port()))
1113    .cloned()
1114    .or_else(|| quic_tls_configs.get(&(None, server_address.port())).cloned())
1115  {
1116    match connection_attempt.accept_with(tls_config) {
1117      Ok(connecting) => match connecting.await {
1118        Ok(connection) => connection,
1119        Err(err) => {
1120          log_connection_accept_error(&configurations, err).await;
1121          return;
1122        }
1123      },
1124      Err(err) => {
1125        log_connection_accept_error(&configurations, err).await;
1126        return;
1127      }
1128    }
1129  } else {
1130    match connection_attempt.await {
1131      Ok(connection) => connection,
1132      Err(err) => {
1133        log_connection_accept_error(&configurations, err).await;
1134        return;
1135      }
1136    }
1137  };
1138
1139  let connection_reference = Arc::downgrade(&connection_reference);
1140  let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
1141    match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
1142      Ok(h3_conn) => h3_conn,
1143      Err(err) => {
1144        log_http_connection_error(&configurations, "HTTP/3", err).await;
1145        return;
1146      }
1147    };
1148
1149  loop {
1150    match crate::runtime::select! {
1151        biased;
1152
1153        _ = shutdown_rx.cancelled() => {
1154          h3_conn.shutdown(0).await.unwrap_or_default();
1155          return;
1156        }
1157        _ = graceful_shutdown_token.cancelled() => {
1158          h3_conn.shutdown(0).await.unwrap_or_default();
1159          return;
1160        }
1161        result = h3_conn.accept() => {
1162          result
1163        }
1164    } {
1165      Ok(Some(resolver)) => {
1166        let configurations = configurations.clone();
1167        let connection_reference = connection_reference.clone();
1168        crate::runtime::spawn(async move {
1169          let _connection_reference = connection_reference;
1170          let (request, stream) = match resolver.resolve_request().await {
1171            Ok(resolved) => resolved,
1172            Err(err) => {
1173              if !err.is_h3_no_error() {
1174                log_http_connection_error(&configurations, "HTTP/3", err).await;
1175              }
1176              return;
1177            }
1178          };
1179
1180          let (mut send, receive) = stream.split();
1181          let request_body_stream =
1182            futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
1183              loop {
1184                if !is_body_finished {
1185                  match receive.recv_data().await {
1186                    Ok(Some(mut data)) => {
1187                      return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)));
1188                    }
1189                    Ok(None) => is_body_finished = true,
1190                    Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
1191                  }
1192                } else {
1193                  match receive.recv_trailers().await {
1194                    Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
1195                    Ok(None) => return None,
1196                    Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
1197                  }
1198                }
1199              }
1200            });
1201          let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
1202          let (request_parts, _) = request.into_parts();
1203          let request = Request::from_parts(request_parts, request_body);
1204          let mut response = match request_handler(
1205            request,
1206            client_address,
1207            server_address,
1208            true,
1209            configurations.clone(),
1210            None,
1211            empty_acme_http_01_resolvers(),
1212            None,
1213            None,
1214          )
1215          .await
1216          {
1217            Ok(response) => response,
1218            Err(err) => {
1219              log_http_connection_error(&configurations, "HTTP/3", err).await;
1220              return;
1221            }
1222          };
1223
1224          sanitize_http3_response_headers(response.headers_mut());
1225
1226          let (response_parts, mut response_body) = response.into_parts();
1227          if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
1228            if !err.is_h3_no_error() {
1229              log_http_connection_error(&configurations, "HTTP/3", err).await;
1230            }
1231            return;
1232          }
1233
1234          let mut had_trailers = false;
1235          while let Some(chunk) = response_body.frame().await {
1236            match chunk {
1237              Ok(frame) if frame.is_data() => match frame.into_data() {
1238                Ok(data) => {
1239                  if let Err(err) = send.send_data(data).await {
1240                    if !err.is_h3_no_error() {
1241                      log_http_connection_error(&configurations, "HTTP/3", err).await;
1242                    }
1243                    return;
1244                  }
1245                }
1246                Err(_) => {
1247                  log_handler_error(
1248                    &configurations,
1249                    "Error serving HTTP/3 connection: the frame isn't really a data frame",
1250                  )
1251                  .await;
1252                  return;
1253                }
1254              },
1255              Ok(frame) if frame.is_trailers() => match frame.into_trailers() {
1256                Ok(trailers) => {
1257                  had_trailers = true;
1258                  if let Err(err) = send.send_trailers(trailers).await {
1259                    if !err.is_h3_no_error() {
1260                      log_http_connection_error(&configurations, "HTTP/3", err).await;
1261                    }
1262                    return;
1263                  }
1264                }
1265                Err(_) => {
1266                  log_handler_error(
1267                    &configurations,
1268                    "Error serving HTTP/3 connection: the frame isn't really a trailers frame",
1269                  )
1270                  .await;
1271                  return;
1272                }
1273              },
1274              Ok(_) => {}
1275              Err(err) => {
1276                log_http_connection_error(&configurations, "HTTP/3", err).await;
1277                return;
1278              }
1279            }
1280          }
1281
1282          if !had_trailers {
1283            if let Err(err) = send.finish().await {
1284              if !err.is_h3_no_error() {
1285                log_http_connection_error(&configurations, "HTTP/3", err).await;
1286              }
1287            }
1288          }
1289        });
1290      }
1291      Ok(None) => break,
1292      Err(err) => {
1293        if !err.is_h3_no_error() {
1294          log_http_connection_error(&configurations, "HTTP/3", err).await;
1295        }
1296        return;
1297      }
1298    }
1299  }
1300}