ferron/
handler.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use arc_swap::ArcSwap;
8use async_channel::{Receiver, Sender};
9use bytes::{Buf, Bytes};
10use ferron_common::logging::LogMessage;
11use http_body_util::{BodyExt, StreamBody};
12use hyper::body::{Frame, Incoming};
13use hyper::service::service_fn;
14use hyper::{Request, Response};
15#[cfg(feature = "runtime-tokio")]
16use hyper_util::rt::{TokioIo, TokioTimer};
17#[cfg(feature = "runtime-monoio")]
18use monoio::io::IntoPollIo;
19#[cfg(feature = "runtime-monoio")]
20use monoio::net::tcp::stream_poll::TcpStreamPoll;
21#[cfg(feature = "runtime-monoio")]
22use monoio::net::TcpStream;
23#[cfg(feature = "runtime-monoio")]
24use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
25use rustls::server::Acceptor;
26use rustls::ServerConfig;
27#[cfg(feature = "runtime-tokio")]
28use tokio::net::TcpStream;
29use tokio_rustls::server::TlsStream;
30use tokio_rustls::LazyConfigAcceptor;
31use tokio_util::sync::CancellationToken;
32
33use crate::acme::ACME_TLS_ALPN_NAME;
34use crate::config::ServerConfigurations;
35use crate::get_value;
36use crate::listener_handler_communication::ConnectionData;
37use crate::request_handler::request_handler;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40use crate::util::{read_proxy_header, MultiCancel};
41
42static HTTP3_INVALID_HEADERS: [hyper::header::HeaderName; 5] = [
43  hyper::header::HeaderName::from_static("keep-alive"),
44  hyper::header::HeaderName::from_static("proxy-connection"),
45  hyper::header::TRANSFER_ENCODING,
46  hyper::header::TE,
47  hyper::header::UPGRADE,
48];
49
50/// A struct holding reloadable data for handler threads
51pub struct ReloadableHandlerData {
52  /// ACME TLS-ALPN-01 configurations
53  pub acme_tls_alpn_01_configs: Arc<HashMap<u16, Arc<ServerConfig>>>,
54  /// ACME HTTP-01 resolvers
55  pub acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
56  /// Server configurations
57  pub configurations: Arc<ServerConfigurations>,
58  /// TLS configurations
59  pub tls_configs: Arc<HashMap<u16, Arc<ServerConfig>>>,
60  /// Whether HTTP/3 is enabled
61  pub http3_enabled: bool,
62  /// Whether PROXY protocol is enabled
63  pub enable_proxy_protocol: bool,
64}
65
66/// Tokio local executor
67#[cfg(feature = "runtime-tokio")]
68#[derive(Clone, Copy, Debug)]
69pub struct TokioLocalExecutor;
70
71#[cfg(feature = "runtime-tokio")]
72impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
73where
74  F: std::future::Future + 'static,
75  F::Output: 'static,
76{
77  #[inline]
78  fn execute(&self, fut: F) {
79    tokio::task::spawn_local(fut);
80  }
81}
82
83/// Creates a HTTP request handler
84pub fn create_http_handler(
85  reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
86  rx: Receiver<ConnectionData>,
87  enable_uring: Option<bool>,
88  io_uring_disabled: Sender<Option<std::io::Error>>,
89  multi_cancel: Arc<MultiCancel>,
90) -> Result<(CancellationToken, Sender<()>), Box<dyn Error + Send + Sync>> {
91  let shutdown_tx = CancellationToken::new();
92  let shutdown_rx = shutdown_tx.clone();
93  let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
94  let (graceful_tx, graceful_rx) = async_channel::unbounded();
95  std::thread::Builder::new()
96    .name("Request handler".to_string())
97    .spawn(move || {
98      let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
99        Ok(rt) => rt,
100        Err(error) => {
101          handler_init_tx
102            .send_blocking(Some(
103              anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
104            ))
105            .unwrap_or_default();
106          return;
107        }
108      };
109      io_uring_disabled
110        .send_blocking(rt.return_io_uring_error())
111        .unwrap_or_default();
112      rt.run(async move {
113        if let Some(error) = http_handler_fn(
114          reloadable_data,
115          rx,
116          &handler_init_tx,
117          shutdown_rx,
118          graceful_rx,
119          multi_cancel,
120        )
121        .await
122        .err()
123        {
124          handler_init_tx.send(Some(error)).await.unwrap_or_default();
125        }
126      });
127    })?;
128
129  if let Some(error) = listen_error_rx.recv_blocking()? {
130    Err(error)?;
131  }
132
133  Ok((shutdown_tx, graceful_tx))
134}
135
136/// HTTP handler function
137#[inline]
138async fn http_handler_fn(
139  reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
140  rx: Receiver<ConnectionData>,
141  handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
142  shutdown_rx: CancellationToken,
143  graceful_rx: Receiver<()>,
144  multi_cancel: Arc<MultiCancel>,
145) -> Result<(), Box<dyn Error + Send + Sync>> {
146  handler_init_tx.send(None).await.unwrap_or_default();
147
148  let connections_references = Arc::new(());
149  let graceful_shutdown_token = Arc::new(ArcSwap::from_pointee(CancellationToken::new()));
150  let graceful_shutdown_token_clone = graceful_shutdown_token.clone();
151
152  let mut graceful_rx_recv_future = Box::pin(async move {
153    while graceful_rx.recv().await.is_ok() {
154      graceful_shutdown_token_clone
155        .swap(Arc::new(CancellationToken::new()))
156        .cancel();
157    }
158
159    futures_util::future::pending::<()>().await;
160  });
161
162  loop {
163    let conn_data = crate::runtime::select! {
164        biased;
165
166        _ = &mut graceful_rx_recv_future => {
167            // This future should be always pending...
168            break;
169        }
170        _ = shutdown_rx.cancelled() => {
171            break;
172        }
173        result = rx.recv() => {
174            if let Ok(recv_data) = result {
175                recv_data
176            } else {
177                break;
178            }
179        }
180    };
181    let ReloadableHandlerData {
182      configurations,
183      tls_configs,
184      http3_enabled,
185      acme_tls_alpn_01_configs,
186      acme_http_01_resolvers,
187      enable_proxy_protocol,
188    } = &**reloadable_data.load();
189    let configurations = configurations.clone();
190    let tls_config = if matches!(
191      conn_data.connection,
192      crate::listener_handler_communication::Connection::Quic(..)
193    ) {
194      None
195    } else {
196      tls_configs.get(&conn_data.server_address.port()).cloned()
197    };
198    let acme_tls_alpn_01_config = if matches!(
199      conn_data.connection,
200      crate::listener_handler_communication::Connection::Quic(..)
201    ) {
202      None
203    } else {
204      acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
205    };
206    let acme_http_01_resolvers = acme_http_01_resolvers.clone();
207    let connections_references_cloned = connections_references.clone();
208    let shutdown_rx_clone = shutdown_rx.clone();
209    let http3_enabled = *http3_enabled;
210    let enable_proxy_protocol = *enable_proxy_protocol;
211    let graceful_shutdown_token = graceful_shutdown_token.load().clone();
212    crate::runtime::spawn(async move {
213      match conn_data.connection {
214        crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
215          // Toggle O_NONBLOCK for TCP stream, when using Monoio.
216          // Unset it when io_uring is enabled, and set it otherwise.
217          #[cfg(feature = "runtime-monoio")]
218          let _ = tcp_stream.set_nonblocking(monoio::utils::is_legacy());
219
220          #[cfg(feature = "runtime-monoio")]
221          let tcp_stream = match TcpStream::from_std(tcp_stream) {
222            Ok(stream) => stream,
223            Err(err) => {
224              for logging_tx in configurations
225                .find_global_configuration()
226                .as_ref()
227                .map_or(&vec![], |c| &c.observability.log_channels)
228              {
229                logging_tx
230                  .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
231                  .await
232                  .unwrap_or_default();
233              }
234              return;
235            }
236          };
237          let encrypted = tls_config.is_some();
238          http_tcp_handler_fn(
239            tcp_stream,
240            conn_data.client_address,
241            conn_data.server_address,
242            configurations,
243            tls_config,
244            http3_enabled && encrypted,
245            connections_references_cloned,
246            acme_tls_alpn_01_config,
247            acme_http_01_resolvers,
248            enable_proxy_protocol,
249            shutdown_rx_clone,
250            graceful_shutdown_token,
251          )
252          .await;
253        }
254        crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
255          http_quic_handler_fn(
256            quic_incoming,
257            conn_data.client_address,
258            conn_data.server_address,
259            configurations,
260            connections_references_cloned,
261            shutdown_rx_clone,
262            graceful_shutdown_token,
263          )
264          .await;
265        }
266      }
267    });
268  }
269
270  while Arc::weak_count(&connections_references) > 0 {
271    crate::runtime::sleep(Duration::from_millis(100)).await;
272  }
273
274  // Wait until all connections are closed, then shut down all the previous handler threads
275  multi_cancel.cancel().await;
276
277  Ok(())
278}
279
280/// Enum for maybe TLS stream
281#[allow(clippy::large_enum_variant)]
282#[cfg(feature = "runtime-monoio")]
283enum MaybeTlsStream {
284  /// TLS stream
285  Tls(TlsStream<SendAsyncIo<TcpStreamPoll>>),
286
287  /// Plain TCP stream
288  Plain(SendAsyncIo<TcpStreamPoll>),
289}
290
291#[allow(clippy::large_enum_variant)]
292#[cfg(feature = "runtime-tokio")]
293enum MaybeTlsStream {
294  /// TLS stream
295  Tls(TlsStream<TcpStream>),
296
297  /// Plain TCP stream
298  Plain(TcpStream),
299}
300
301/// HTTP/1.x and HTTP/2 handler function
302#[allow(clippy::too_many_arguments)]
303#[inline]
304async fn http_tcp_handler_fn(
305  tcp_stream: TcpStream,
306  client_address: SocketAddr,
307  server_address: SocketAddr,
308  configurations: Arc<ServerConfigurations>,
309  tls_config: Option<Arc<ServerConfig>>,
310  http3_enabled: bool,
311  connection_reference: Arc<()>,
312  acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
313  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
314  enable_proxy_protocol: bool,
315  shutdown_rx: CancellationToken,
316  graceful_shutdown_token: Arc<CancellationToken>,
317) {
318  let _connection_reference = Arc::downgrade(&connection_reference);
319  #[cfg(feature = "runtime-monoio")]
320  let tcp_stream = match tcp_stream.into_poll_io() {
321    Ok(stream) => SendAsyncIo::new(stream),
322    Err(err) => {
323      for logging_tx in configurations
324        .find_global_configuration()
325        .as_ref()
326        .map_or(&vec![], |c| &c.observability.log_channels)
327      {
328        logging_tx
329          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
330          .await
331          .unwrap_or_default();
332      }
333      return;
334    }
335  };
336
337  // PROXY protocol header precedes TLS handshakes too...
338  let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
339    // Read and parse the PROXY protocol header
340    match read_proxy_header(tcp_stream).await {
341      Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
342      Err(err) => {
343        for logging_tx in configurations
344          .find_global_configuration()
345          .as_ref()
346          .map_or(&vec![], |c| &c.observability.log_channels)
347        {
348          logging_tx
349            .send(LogMessage::new(
350              format!("Error reading PROXY protocol header: {err}"),
351              true,
352            ))
353            .await
354            .unwrap_or_default();
355        }
356        return;
357      }
358    }
359  } else {
360    (tcp_stream, None, None)
361  };
362
363  let maybe_tls_stream = if let Some(tls_config) = tls_config {
364    let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
365      Ok(start_handshake) => start_handshake,
366      Err(err) => {
367        for logging_tx in configurations
368          .find_global_configuration()
369          .as_ref()
370          .map_or(&vec![], |c| &c.observability.log_channels)
371        {
372          logging_tx
373            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
374            .await
375            .unwrap_or_default();
376        }
377        return;
378      }
379    };
380
381    if let Some(acme_config) = acme_tls_alpn_01_config {
382      if start_handshake
383        .client_hello()
384        .alpn()
385        .into_iter()
386        .flatten()
387        .eq([ACME_TLS_ALPN_NAME])
388      {
389        if let Err(err) = start_handshake.into_stream(acme_config).await {
390          for logging_tx in configurations
391            .find_global_configuration()
392            .as_ref()
393            .map_or(&vec![], |c| &c.observability.log_channels)
394          {
395            logging_tx
396              .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
397              .await
398              .unwrap_or_default();
399          }
400          return;
401        };
402        return;
403      }
404    }
405
406    let tls_stream = match start_handshake.into_stream(tls_config).await {
407      Ok(tls_stream) => tls_stream,
408      Err(err) => {
409        for logging_tx in configurations
410          .find_global_configuration()
411          .as_ref()
412          .map_or(&vec![], |c| &c.observability.log_channels)
413        {
414          logging_tx
415            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
416            .await
417            .unwrap_or_default();
418        }
419        return;
420      }
421    };
422
423    MaybeTlsStream::Tls(tls_stream)
424  } else {
425    MaybeTlsStream::Plain(tcp_stream)
426  };
427
428  if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
429    let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
430    let is_http2 = alpn_protocol == Some("h2".as_bytes());
431
432    #[cfg(feature = "runtime-tokio")]
433    let io = TokioIo::new(tls_stream);
434
435    if is_http2 {
436      // Hyper's HTTP/2 connection doesn't require underlying I/O to be `Send`.
437      #[cfg(feature = "runtime-monoio")]
438      let io = MonoioIo::new(tls_stream);
439
440      #[cfg(feature = "runtime-monoio")]
441      let mut http2_builder = {
442        let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
443        http2_builder.timer(MonoioTimer);
444        http2_builder
445      };
446      #[cfg(feature = "runtime-tokio")]
447      let mut http2_builder = {
448        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
449        http2_builder.timer(TokioTimer::new());
450        http2_builder
451      };
452
453      let global_configuration = configurations.find_global_configuration();
454
455      if let Some(initial_window_size) = global_configuration
456        .as_deref()
457        .and_then(|c| get_value!("h2_initial_window_size", c))
458        .and_then(|v| v.as_i128())
459      {
460        http2_builder.initial_stream_window_size(initial_window_size as u32);
461      }
462      if let Some(max_frame_size) = global_configuration
463        .as_deref()
464        .and_then(|c| get_value!("h2_max_frame_size", c))
465        .and_then(|v| v.as_i128())
466      {
467        http2_builder.max_frame_size(max_frame_size as u32);
468      }
469      if let Some(max_concurrent_streams) = global_configuration
470        .as_deref()
471        .and_then(|c| get_value!("h2_max_concurrent_streams", c))
472        .and_then(|v| v.as_i128())
473      {
474        http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
475      }
476      if let Some(max_header_list_size) = global_configuration
477        .as_deref()
478        .and_then(|c| get_value!("h2_max_header_list_size", c))
479        .and_then(|v| v.as_i128())
480      {
481        http2_builder.max_header_list_size(max_header_list_size as u32);
482      }
483      if let Some(enable_connect_protocol) = global_configuration
484        .as_deref()
485        .and_then(|c| get_value!("h2_enable_connect_protocol", c))
486        .and_then(|v| v.as_bool())
487      {
488        if enable_connect_protocol {
489          http2_builder.enable_connect_protocol();
490        }
491      }
492
493      let configurations_clone = configurations.clone();
494      let mut http_future = http2_builder.serve_connection(
495        io,
496        service_fn(move |request: Request<Incoming>| {
497          let (request_parts, request_body) = request.into_parts();
498          let request = Request::from_parts(
499            request_parts,
500            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
501          );
502          request_handler(
503            request,
504            client_address,
505            server_address,
506            true,
507            configurations_clone.clone(),
508            if http3_enabled {
509              Some(server_address.port())
510            } else {
511              None
512            },
513            acme_http_01_resolvers.clone(),
514            proxy_protocol_client_address,
515            proxy_protocol_server_address,
516          )
517        }),
518      );
519      let http_future_result = crate::runtime::select! {
520        result = &mut http_future => {
521          result
522        }
523        _ = shutdown_rx.cancelled() => {
524          std::pin::Pin::new(&mut http_future).graceful_shutdown();
525          http_future.await
526        }
527        _ = graceful_shutdown_token.cancelled() => {
528          std::pin::Pin::new(&mut http_future).graceful_shutdown();
529          http_future.await
530        }
531      };
532      if let Err(err) = http_future_result {
533        let error_to_log = if err.is_user() {
534          err.source().unwrap_or(&err)
535        } else {
536          &err
537        };
538        for logging_tx in configurations
539          .find_global_configuration()
540          .as_ref()
541          .map_or(&vec![], |c| &c.observability.log_channels)
542        {
543          logging_tx
544            .send(LogMessage::new(
545              format!("Error serving HTTPS connection: {error_to_log}"),
546              true,
547            ))
548            .await
549            .unwrap_or_default();
550        }
551      }
552    } else {
553      #[cfg(feature = "runtime-monoio")]
554      let io = MonoioIo::new(tls_stream);
555
556      #[cfg(feature = "runtime-monoio")]
557      let http1_builder = {
558        let mut http1_builder = hyper::server::conn::http1::Builder::new();
559
560        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
561        http1_builder.timer(MonoioTimer);
562
563        http1_builder
564      };
565      #[cfg(feature = "runtime-tokio")]
566      let http1_builder = {
567        let mut http1_builder = hyper::server::conn::http1::Builder::new();
568
569        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
570        http1_builder.timer(TokioTimer::new());
571
572        http1_builder
573      };
574
575      let configurations_clone = configurations.clone();
576      let mut http_future = http1_builder
577        .serve_connection(
578          io,
579          service_fn(move |request: Request<Incoming>| {
580            let (request_parts, request_body) = request.into_parts();
581            let request = Request::from_parts(
582              request_parts,
583              request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
584            );
585            request_handler(
586              request,
587              client_address,
588              server_address,
589              true,
590              configurations_clone.clone(),
591              if http3_enabled {
592                Some(server_address.port())
593              } else {
594                None
595              },
596              acme_http_01_resolvers.clone(),
597              proxy_protocol_client_address,
598              proxy_protocol_server_address,
599            )
600          }),
601        )
602        .with_upgrades();
603      let http_future_result = crate::runtime::select! {
604        result = &mut http_future => {
605          result
606        }
607        _ = shutdown_rx.cancelled() => {
608          std::pin::Pin::new(&mut http_future).graceful_shutdown();
609          http_future.await
610        }
611        _ = graceful_shutdown_token.cancelled() => {
612          std::pin::Pin::new(&mut http_future).graceful_shutdown();
613          http_future.await
614        }
615      };
616      if let Err(err) = http_future_result {
617        let error_to_log = if err.is_user() {
618          err.source().unwrap_or(&err)
619        } else {
620          &err
621        };
622        for logging_tx in configurations
623          .find_global_configuration()
624          .as_ref()
625          .map_or(&vec![], |c| &c.observability.log_channels)
626        {
627          logging_tx
628            .send(LogMessage::new(
629              format!("Error serving HTTPS connection: {error_to_log}"),
630              true,
631            ))
632            .await
633            .unwrap_or_default();
634        }
635      }
636    }
637  } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
638    #[cfg(feature = "runtime-monoio")]
639    let io = MonoioIo::new(stream);
640    #[cfg(feature = "runtime-tokio")]
641    let io = TokioIo::new(stream);
642
643    #[cfg(feature = "runtime-monoio")]
644    let http1_builder = {
645      let mut http1_builder = hyper::server::conn::http1::Builder::new();
646
647      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
648      http1_builder.timer(MonoioTimer);
649
650      http1_builder
651    };
652    #[cfg(feature = "runtime-tokio")]
653    let http1_builder = {
654      let mut http1_builder = hyper::server::conn::http1::Builder::new();
655
656      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
657      http1_builder.timer(TokioTimer::new());
658
659      http1_builder
660    };
661
662    let configurations_clone = configurations.clone();
663    let mut http_future = http1_builder
664      .serve_connection(
665        io,
666        service_fn(move |request: Request<Incoming>| {
667          let (request_parts, request_body) = request.into_parts();
668          let request = Request::from_parts(
669            request_parts,
670            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
671          );
672          request_handler(
673            request,
674            client_address,
675            server_address,
676            false,
677            configurations_clone.clone(),
678            if http3_enabled {
679              Some(server_address.port())
680            } else {
681              None
682            },
683            acme_http_01_resolvers.clone(),
684            proxy_protocol_client_address,
685            proxy_protocol_server_address,
686          )
687        }),
688      )
689      .with_upgrades();
690    let http_future_result = crate::runtime::select! {
691      result = &mut http_future => {
692        result
693      }
694      _ = shutdown_rx.cancelled() => {
695        std::pin::Pin::new(&mut http_future).graceful_shutdown();
696        http_future.await
697      }
698      _ = graceful_shutdown_token.cancelled() => {
699        std::pin::Pin::new(&mut http_future).graceful_shutdown();
700        http_future.await
701      }
702    };
703    if let Err(err) = http_future_result {
704      let error_to_log = if err.is_user() {
705        err.source().unwrap_or(&err)
706      } else {
707        &err
708      };
709      for logging_tx in configurations
710        .find_global_configuration()
711        .as_ref()
712        .map_or(&vec![], |c| &c.observability.log_channels)
713      {
714        logging_tx
715          .send(LogMessage::new(
716            format!("Error serving HTTP connection: {error_to_log}"),
717            true,
718          ))
719          .await
720          .unwrap_or_default();
721      }
722    }
723  }
724}
725
726/// HTTP/3 handler function
727#[inline]
728#[allow(clippy::too_many_arguments)]
729async fn http_quic_handler_fn(
730  connection_attempt: quinn::Incoming,
731  client_address: SocketAddr,
732  server_address: SocketAddr,
733  configurations: Arc<ServerConfigurations>,
734  connection_reference: Arc<()>,
735  shutdown_rx: CancellationToken,
736  graceful_shutdown_token: Arc<CancellationToken>,
737) {
738  match connection_attempt.await {
739    Ok(connection) => {
740      let connection_reference = Arc::downgrade(&connection_reference);
741      let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
742        match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
743          Ok(h3_conn) => h3_conn,
744          Err(err) => {
745            for logging_tx in configurations
746              .find_global_configuration()
747              .as_ref()
748              .map_or(&vec![], |c| &c.observability.log_channels)
749            {
750              logging_tx
751                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
752                .await
753                .unwrap_or_default();
754            }
755            return;
756          }
757        };
758
759      loop {
760        match crate::runtime::select! {
761            biased;
762
763            _ = shutdown_rx.cancelled() => {
764              h3_conn.shutdown(0).await.unwrap_or_default();
765              return;
766            }
767            _ = graceful_shutdown_token.cancelled() => {
768              h3_conn.shutdown(0).await.unwrap_or_default();
769              return;
770            }
771            result = h3_conn.accept() => {
772              result
773            }
774        } {
775          Ok(Some(resolver)) => {
776            let configurations = configurations.clone();
777            let connection_reference = connection_reference.clone();
778            crate::runtime::spawn(async move {
779              let _connection_reference = connection_reference;
780              let (request, stream) = match resolver.resolve_request().await {
781                Ok(resolved) => resolved,
782                Err(err) => {
783                  if !err.is_h3_no_error() {
784                    for logging_tx in configurations
785                      .find_global_configuration()
786                      .as_ref()
787                      .map_or(&vec![], |c| &c.observability.log_channels)
788                    {
789                      logging_tx
790                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
791                        .await
792                        .unwrap_or_default();
793                    }
794                  }
795                  return;
796                }
797              };
798              let (mut send, receive) = stream.split();
799              let request_body_stream =
800                futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
801                  loop {
802                    if !is_body_finished {
803                      match receive.recv_data().await {
804                        Ok(Some(mut data)) => {
805                          return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
806                        }
807                        Ok(None) => is_body_finished = true,
808                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
809                      }
810                    } else {
811                      match receive.recv_trailers().await {
812                        Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
813                        Ok(None) => {
814                          return None;
815                        }
816                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
817                      }
818                    }
819                  }
820                });
821              let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
822              let (request_parts, _) = request.into_parts();
823              let request = Request::from_parts(request_parts, request_body);
824              let mut response = match request_handler(
825                request,
826                client_address,
827                server_address,
828                true,
829                configurations.clone(),
830                None,
831                Arc::new(tokio::sync::RwLock::new(vec![])),
832                None,
833                None,
834              )
835              .await
836              {
837                Ok(response) => response,
838                Err(err) => {
839                  for logging_tx in configurations
840                    .find_global_configuration()
841                    .as_ref()
842                    .map_or(&vec![], |c| &c.observability.log_channels)
843                  {
844                    logging_tx
845                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
846                      .await
847                      .unwrap_or_default();
848                  }
849                  return;
850                }
851              };
852              let response_headers = response.headers_mut();
853              if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
854                response_headers.entry(hyper::header::DATE).or_insert(http_date);
855              }
856              for header in &HTTP3_INVALID_HEADERS {
857                response_headers.remove(header);
858              }
859              if let Some(connection_header) = response_headers
860                .remove(hyper::header::CONNECTION)
861                .as_ref()
862                .and_then(|v| v.to_str().ok())
863              {
864                for name in connection_header.split(',') {
865                  response_headers.remove(name.trim());
866                }
867              }
868              let (response_parts, mut response_body) = response.into_parts();
869              if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
870                if !err.is_h3_no_error() {
871                  for logging_tx in configurations
872                    .find_global_configuration()
873                    .as_ref()
874                    .map_or(&vec![], |c| &c.observability.log_channels)
875                  {
876                    logging_tx
877                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
878                      .await
879                      .unwrap_or_default();
880                  }
881                }
882                return;
883              }
884              let mut had_trailers = false;
885              while let Some(chunk) = response_body.frame().await {
886                match chunk {
887                  Ok(frame) => {
888                    if frame.is_data() {
889                      match frame.into_data() {
890                        Ok(data) => {
891                          if let Err(err) = send.send_data(data).await {
892                            if !err.is_h3_no_error() {
893                              for logging_tx in configurations
894                                .find_global_configuration()
895                                .as_ref()
896                                .map_or(&vec![], |c| &c.observability.log_channels)
897                              {
898                                logging_tx
899                                  .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
900                                  .await
901                                  .unwrap_or_default();
902                              }
903                            }
904                            return;
905                          }
906                        }
907                        Err(_) => {
908                          for logging_tx in configurations
909                            .find_global_configuration()
910                            .as_ref()
911                            .map_or(&vec![], |c| &c.observability.log_channels)
912                          {
913                            logging_tx
914                              .send(LogMessage::new(
915                                "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
916                                true,
917                              ))
918                              .await
919                              .unwrap_or_default();
920                          }
921                          return;
922                        }
923                      }
924                    } else if frame.is_trailers() {
925                      match frame.into_trailers() {
926                        Ok(trailers) => {
927                          had_trailers = true;
928                          if let Err(err) = send.send_trailers(trailers).await {
929                            if !err.is_h3_no_error() {
930                              for logging_tx in configurations
931                                .find_global_configuration()
932                                .as_ref()
933                                .map_or(&vec![], |c| &c.observability.log_channels)
934                              {
935                                logging_tx
936                                  .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
937                                  .await
938                                  .unwrap_or_default();
939                              }
940                            }
941                            return;
942                          }
943                        }
944                        Err(_) => {
945                          for logging_tx in configurations
946                            .find_global_configuration()
947                            .as_ref()
948                            .map_or(&vec![], |c| &c.observability.log_channels)
949                          {
950                            logging_tx
951                              .send(LogMessage::new(
952                                "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
953                                true,
954                              ))
955                              .await
956                              .unwrap_or_default();
957                          }
958                          return;
959                        }
960                      }
961                    }
962                  }
963                  Err(err) => {
964                    for logging_tx in configurations
965                      .find_global_configuration()
966                      .as_ref()
967                      .map_or(&vec![], |c| &c.observability.log_channels)
968                    {
969                      logging_tx
970                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
971                        .await
972                        .unwrap_or_default();
973                    }
974
975                    return;
976                  }
977                }
978              }
979              if !had_trailers {
980                if let Err(err) = send.finish().await {
981                  if !err.is_h3_no_error() {
982                    for logging_tx in configurations
983                      .find_global_configuration()
984                      .as_ref()
985                      .map_or(&vec![], |c| &c.observability.log_channels)
986                    {
987                      logging_tx
988                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
989                        .await
990                        .unwrap_or_default();
991                    }
992                  }
993                }
994              }
995            });
996          }
997          Ok(None) => break,
998          Err(err) => {
999            if !err.is_h3_no_error() {
1000              for logging_tx in configurations
1001                .find_global_configuration()
1002                .as_ref()
1003                .map_or(&vec![], |c| &c.observability.log_channels)
1004              {
1005                logging_tx
1006                  .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
1007                  .await
1008                  .unwrap_or_default();
1009              }
1010            }
1011            return;
1012          }
1013        }
1014      }
1015    }
1016    Err(err) => {
1017      for logging_tx in configurations
1018        .find_global_configuration()
1019        .as_ref()
1020        .map_or(&vec![], |c| &c.observability.log_channels)
1021      {
1022        logging_tx
1023          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
1024          .await
1025          .unwrap_or_default();
1026      }
1027    }
1028  }
1029}