ferron/
handler.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use async_channel::{Receiver, Sender};
8use bytes::{Buf, Bytes};
9use ferron_common::logging::LogMessage;
10use http_body_util::{BodyExt, StreamBody};
11use hyper::body::{Frame, Incoming};
12use hyper::service::service_fn;
13use hyper::{Request, Response};
14#[cfg(feature = "runtime-tokio")]
15use hyper_util::rt::{TokioIo, TokioTimer};
16#[cfg(feature = "runtime-monoio")]
17use monoio::io::IntoPollIo;
18#[cfg(feature = "runtime-monoio")]
19use monoio::net::tcp::stream_poll::TcpStreamPoll;
20#[cfg(feature = "runtime-monoio")]
21use monoio::net::TcpStream;
22#[cfg(feature = "runtime-monoio")]
23use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
24use rustls::server::Acceptor;
25use rustls::ServerConfig;
26#[cfg(feature = "runtime-tokio")]
27use tokio::net::TcpStream;
28use tokio_rustls::server::TlsStream;
29use tokio_rustls::LazyConfigAcceptor;
30use tokio_util::sync::CancellationToken;
31
32use crate::acme::ACME_TLS_ALPN_NAME;
33use crate::config::ServerConfigurations;
34use crate::get_value;
35use crate::listener_handler_communication::ConnectionData;
36use crate::request_handler::request_handler;
37use crate::util::read_proxy_header;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40
41// Tokio local executor
42#[cfg(feature = "runtime-tokio")]
43#[derive(Clone, Copy, Debug)]
44pub struct TokioLocalExecutor;
45
46#[cfg(feature = "runtime-tokio")]
47impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
48where
49  F: std::future::Future + 'static,
50  F::Output: 'static,
51{
52  #[inline]
53  fn execute(&self, fut: F) {
54    tokio::task::spawn_local(fut);
55  }
56}
57
58/// Creates a HTTP request handler
59#[allow(clippy::too_many_arguments)]
60pub fn create_http_handler(
61  configurations: Arc<ServerConfigurations>,
62  rx: Receiver<ConnectionData>,
63  enable_uring: bool,
64  tls_configs: HashMap<u16, Arc<ServerConfig>>,
65  http3_enabled: bool,
66  acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
67  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
68  enable_proxy_protocol: bool,
69) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
70  let shutdown_tx = CancellationToken::new();
71  let shutdown_rx = shutdown_tx.clone();
72  let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
73  std::thread::Builder::new()
74    .name("Request handler".to_string())
75    .spawn(move || {
76      crate::runtime::new_runtime(
77        async move {
78          if let Some(error) = http_handler_fn(
79            configurations,
80            rx,
81            &handler_init_tx,
82            shutdown_rx,
83            tls_configs,
84            http3_enabled,
85            acme_tls_alpn_01_configs,
86            acme_http_01_resolvers,
87            enable_proxy_protocol,
88          )
89          .await
90          .err()
91          {
92            handler_init_tx.send(Some(error)).await.unwrap_or_default();
93          }
94        },
95        enable_uring,
96      )
97      .unwrap();
98    })?;
99
100  if let Some(error) = listen_error_rx.recv_blocking()? {
101    Err(error)?;
102  }
103
104  Ok(shutdown_tx)
105}
106
107/// HTTP handler function
108#[allow(clippy::too_many_arguments)]
109async fn http_handler_fn(
110  configurations: Arc<ServerConfigurations>,
111  rx: Receiver<ConnectionData>,
112  handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
113  shutdown_rx: CancellationToken,
114  tls_configs: HashMap<u16, Arc<ServerConfig>>,
115  http3_enabled: bool,
116  acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
117  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
118  enable_proxy_protocol: bool,
119) -> Result<(), Box<dyn Error + Send + Sync>> {
120  handler_init_tx.send(None).await.unwrap_or_default();
121
122  let connections_references = Arc::new(());
123
124  loop {
125    let conn_data = crate::runtime::select! {
126        result = rx.recv() => {
127            if let Ok(recv_data) = result {
128                recv_data
129            } else {
130                break;
131            }
132        }
133        _ = shutdown_rx.cancelled() => {
134            break;
135        }
136    };
137    let configurations = configurations.clone();
138    let tls_config = if matches!(
139      conn_data.connection,
140      crate::listener_handler_communication::Connection::Quic(..)
141    ) {
142      None
143    } else {
144      tls_configs.get(&conn_data.server_address.port()).cloned()
145    };
146    let acme_tls_alpn_01_config = if matches!(
147      conn_data.connection,
148      crate::listener_handler_communication::Connection::Quic(..)
149    ) {
150      None
151    } else {
152      acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
153    };
154    let acme_http_01_resolvers = acme_http_01_resolvers.clone();
155    let connections_references_cloned = connections_references.clone();
156    let shutdown_rx_clone = shutdown_rx.clone();
157    crate::runtime::spawn(async move {
158      match conn_data.connection {
159        crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
160          #[cfg(feature = "runtime-monoio")]
161          let tcp_stream = match TcpStream::from_std(tcp_stream) {
162            Ok(stream) => stream,
163            Err(err) => {
164              for logging_tx in configurations
165                .find_global_configuration()
166                .as_ref()
167                .map_or(&vec![], |c| &c.observability.log_channels)
168              {
169                logging_tx
170                  .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
171                  .await
172                  .unwrap_or_default();
173              }
174              return;
175            }
176          };
177          let encrypted = tls_config.is_some();
178          http_tcp_handler_fn(
179            tcp_stream,
180            conn_data.client_address,
181            conn_data.server_address,
182            configurations,
183            tls_config,
184            http3_enabled && encrypted,
185            connections_references_cloned,
186            acme_tls_alpn_01_config,
187            acme_http_01_resolvers,
188            enable_proxy_protocol,
189            shutdown_rx_clone,
190          )
191          .await;
192        }
193        crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
194          http_quic_handler_fn(
195            quic_incoming,
196            conn_data.client_address,
197            conn_data.server_address,
198            configurations,
199            connections_references_cloned,
200            shutdown_rx_clone,
201          )
202          .await;
203        }
204      }
205    });
206  }
207
208  while Arc::weak_count(&connections_references) > 0 {
209    crate::runtime::sleep(Duration::from_millis(100)).await;
210  }
211
212  Ok(())
213}
214
215/// Enum for maybe TLS stream
216#[allow(clippy::large_enum_variant)]
217#[cfg(feature = "runtime-monoio")]
218enum MaybeTlsStream {
219  /// TLS stream
220  Tls(TlsStream<TcpStreamPoll>),
221
222  /// Plain TCP stream
223  Plain(TcpStreamPoll),
224}
225
226#[allow(clippy::large_enum_variant)]
227#[cfg(feature = "runtime-tokio")]
228enum MaybeTlsStream {
229  /// TLS stream
230  Tls(TlsStream<TcpStream>),
231
232  /// Plain TCP stream
233  Plain(TcpStream),
234}
235
236/// HTTP/1.x and HTTP/2 handler function
237#[allow(clippy::too_many_arguments)]
238async fn http_tcp_handler_fn(
239  tcp_stream: TcpStream,
240  client_address: SocketAddr,
241  server_address: SocketAddr,
242  configurations: Arc<ServerConfigurations>,
243  tls_config: Option<Arc<ServerConfig>>,
244  http3_enabled: bool,
245  connection_reference: Arc<()>,
246  acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
247  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
248  enable_proxy_protocol: bool,
249  shutdown_rx: CancellationToken,
250) {
251  let _connection_reference = Arc::downgrade(&connection_reference);
252  #[cfg(feature = "runtime-monoio")]
253  let tcp_stream = match tcp_stream.into_poll_io() {
254    Ok(stream) => stream,
255    Err(err) => {
256      for logging_tx in configurations
257        .find_global_configuration()
258        .as_ref()
259        .map_or(&vec![], |c| &c.observability.log_channels)
260      {
261        logging_tx
262          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
263          .await
264          .unwrap_or_default();
265      }
266      return;
267    }
268  };
269
270  // PROXY protocol header precedes TLS handshakes too...
271  let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
272    // Read and parse the PROXY protocol header
273    match read_proxy_header(tcp_stream).await {
274      Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
275      Err(err) => {
276        for logging_tx in configurations
277          .find_global_configuration()
278          .as_ref()
279          .map_or(&vec![], |c| &c.observability.log_channels)
280        {
281          logging_tx
282            .send(LogMessage::new(
283              format!("Error reading PROXY protocol header: {err}"),
284              true,
285            ))
286            .await
287            .unwrap_or_default();
288        }
289        return;
290      }
291    }
292  } else {
293    (tcp_stream, None, None)
294  };
295
296  let maybe_tls_stream = if let Some(tls_config) = tls_config {
297    let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
298      Ok(start_handshake) => start_handshake,
299      Err(err) => {
300        for logging_tx in configurations
301          .find_global_configuration()
302          .as_ref()
303          .map_or(&vec![], |c| &c.observability.log_channels)
304        {
305          logging_tx
306            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
307            .await
308            .unwrap_or_default();
309        }
310        return;
311      }
312    };
313
314    if let Some(acme_config) = acme_tls_alpn_01_config {
315      if start_handshake
316        .client_hello()
317        .alpn()
318        .into_iter()
319        .flatten()
320        .eq([ACME_TLS_ALPN_NAME])
321      {
322        match start_handshake.into_stream(acme_config).await {
323          Ok(_) => (),
324          Err(err) => {
325            for logging_tx in configurations
326              .find_global_configuration()
327              .as_ref()
328              .map_or(&vec![], |c| &c.observability.log_channels)
329            {
330              logging_tx
331                .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
332                .await
333                .unwrap_or_default();
334            }
335            return;
336          }
337        };
338        return;
339      }
340    }
341
342    let tls_stream = match start_handshake.into_stream(tls_config).await {
343      Ok(tls_stream) => tls_stream,
344      Err(err) => {
345        for logging_tx in configurations
346          .find_global_configuration()
347          .as_ref()
348          .map_or(&vec![], |c| &c.observability.log_channels)
349        {
350          logging_tx
351            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
352            .await
353            .unwrap_or_default();
354        }
355        return;
356      }
357    };
358
359    MaybeTlsStream::Tls(tls_stream)
360  } else {
361    MaybeTlsStream::Plain(tcp_stream)
362  };
363
364  if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
365    let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
366    let is_http2 = alpn_protocol == Some("h2".as_bytes());
367
368    #[cfg(feature = "runtime-tokio")]
369    let io = TokioIo::new(tls_stream);
370
371    if is_http2 {
372      // Hyper's HTTP/2 connection doesn't require underlying I/O to be `Send`.
373      #[cfg(feature = "runtime-monoio")]
374      let io = MonoioIo::new(tls_stream);
375
376      #[cfg(feature = "runtime-monoio")]
377      let mut http2_builder = {
378        let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
379        http2_builder.timer(MonoioTimer);
380        http2_builder
381      };
382      #[cfg(feature = "runtime-tokio")]
383      let mut http2_builder = {
384        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
385        http2_builder.timer(TokioTimer::new());
386        http2_builder
387      };
388
389      let global_configuration = configurations.find_global_configuration();
390
391      if let Some(initial_window_size) = global_configuration
392        .as_deref()
393        .and_then(|c| get_value!("h2_initial_window_size", c))
394        .and_then(|v| v.as_i128())
395      {
396        http2_builder.initial_stream_window_size(initial_window_size as u32);
397      }
398      if let Some(max_frame_size) = global_configuration
399        .as_deref()
400        .and_then(|c| get_value!("h2_max_frame_size", c))
401        .and_then(|v| v.as_i128())
402      {
403        http2_builder.max_frame_size(max_frame_size as u32);
404      }
405      if let Some(max_concurrent_streams) = global_configuration
406        .as_deref()
407        .and_then(|c| get_value!("h2_max_concurrent_streams", c))
408        .and_then(|v| v.as_i128())
409      {
410        http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
411      }
412      if let Some(max_header_list_size) = global_configuration
413        .as_deref()
414        .and_then(|c| get_value!("h2_max_header_list_size", c))
415        .and_then(|v| v.as_i128())
416      {
417        http2_builder.max_header_list_size(max_header_list_size as u32);
418      }
419      if let Some(enable_connect_protocol) = global_configuration
420        .as_deref()
421        .and_then(|c| get_value!("h2_enable_connect_protocol", c))
422        .and_then(|v| v.as_bool())
423      {
424        if enable_connect_protocol {
425          http2_builder.enable_connect_protocol();
426        }
427      }
428
429      let configurations_clone = configurations.clone();
430      let mut http_future = http2_builder.serve_connection(
431        io,
432        service_fn(move |request: Request<Incoming>| {
433          let (request_parts, request_body) = request.into_parts();
434          let request = Request::from_parts(
435            request_parts,
436            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
437          );
438          request_handler(
439            request,
440            client_address,
441            server_address,
442            true,
443            configurations_clone.clone(),
444            if http3_enabled {
445              Some(server_address.port())
446            } else {
447              None
448            },
449            acme_http_01_resolvers.clone(),
450            proxy_protocol_client_address,
451            proxy_protocol_server_address,
452          )
453        }),
454      );
455      let http_future_result = crate::runtime::select! {
456        result = &mut http_future => {
457          result
458        }
459        _ = shutdown_rx.cancelled() => {
460          std::pin::Pin::new(&mut http_future).graceful_shutdown();
461          http_future.await
462        }
463      };
464      if let Err(err) = http_future_result {
465        for logging_tx in configurations
466          .find_global_configuration()
467          .as_ref()
468          .map_or(&vec![], |c| &c.observability.log_channels)
469        {
470          logging_tx
471            .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
472            .await
473            .unwrap_or_default();
474        }
475      }
476    } else {
477      #[cfg(feature = "runtime-monoio")]
478      let io = MonoioIo::new(SendAsyncIo::new(tls_stream));
479
480      #[cfg(feature = "runtime-monoio")]
481      let http1_builder = {
482        let mut http1_builder = hyper::server::conn::http1::Builder::new();
483
484        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
485        http1_builder.timer(MonoioTimer);
486
487        http1_builder
488      };
489      #[cfg(feature = "runtime-tokio")]
490      let http1_builder = {
491        let mut http1_builder = hyper::server::conn::http1::Builder::new();
492
493        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
494        http1_builder.timer(TokioTimer::new());
495
496        http1_builder
497      };
498
499      let configurations_clone = configurations.clone();
500      let mut http_future = http1_builder
501        .serve_connection(
502          io,
503          service_fn(move |request: Request<Incoming>| {
504            let (request_parts, request_body) = request.into_parts();
505            let request = Request::from_parts(
506              request_parts,
507              request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
508            );
509            request_handler(
510              request,
511              client_address,
512              server_address,
513              true,
514              configurations_clone.clone(),
515              if http3_enabled {
516                Some(server_address.port())
517              } else {
518                None
519              },
520              acme_http_01_resolvers.clone(),
521              proxy_protocol_client_address,
522              proxy_protocol_server_address,
523            )
524          }),
525        )
526        .with_upgrades();
527      let http_future_result = crate::runtime::select! {
528        result = &mut http_future => {
529          result
530        }
531        _ = shutdown_rx.cancelled() => {
532          std::pin::Pin::new(&mut http_future).graceful_shutdown();
533          http_future.await
534        }
535      };
536      if let Err(err) = http_future_result {
537        for logging_tx in configurations
538          .find_global_configuration()
539          .as_ref()
540          .map_or(&vec![], |c| &c.observability.log_channels)
541        {
542          logging_tx
543            .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
544            .await
545            .unwrap_or_default();
546        }
547      }
548    }
549  } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
550    #[cfg(feature = "runtime-monoio")]
551    let io = MonoioIo::new(SendAsyncIo::new(stream));
552    #[cfg(feature = "runtime-tokio")]
553    let io = TokioIo::new(stream);
554
555    #[cfg(feature = "runtime-monoio")]
556    let http1_builder = {
557      let mut http1_builder = hyper::server::conn::http1::Builder::new();
558
559      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
560      http1_builder.timer(MonoioTimer);
561
562      http1_builder
563    };
564    #[cfg(feature = "runtime-tokio")]
565    let http1_builder = {
566      let mut http1_builder = hyper::server::conn::http1::Builder::new();
567
568      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
569      http1_builder.timer(TokioTimer::new());
570
571      http1_builder
572    };
573
574    let configurations_clone = configurations.clone();
575    let mut http_future = http1_builder
576      .serve_connection(
577        io,
578        service_fn(move |request: Request<Incoming>| {
579          let (request_parts, request_body) = request.into_parts();
580          let request = Request::from_parts(
581            request_parts,
582            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
583          );
584          request_handler(
585            request,
586            client_address,
587            server_address,
588            false,
589            configurations_clone.clone(),
590            if http3_enabled {
591              Some(server_address.port())
592            } else {
593              None
594            },
595            acme_http_01_resolvers.clone(),
596            proxy_protocol_client_address,
597            proxy_protocol_server_address,
598          )
599        }),
600      )
601      .with_upgrades();
602    let http_future_result = crate::runtime::select! {
603      result = &mut http_future => {
604        result
605      }
606      _ = shutdown_rx.cancelled() => {
607        std::pin::Pin::new(&mut http_future).graceful_shutdown();
608        http_future.await
609      }
610    };
611    if let Err(err) = http_future_result {
612      for logging_tx in configurations
613        .find_global_configuration()
614        .as_ref()
615        .map_or(&vec![], |c| &c.observability.log_channels)
616      {
617        logging_tx
618          .send(LogMessage::new(format!("Error serving HTTP connection: {err}"), true))
619          .await
620          .unwrap_or_default();
621      }
622    }
623  }
624}
625
626/// HTTP/3 handler function
627#[allow(clippy::too_many_arguments)]
628async fn http_quic_handler_fn(
629  connection_attempt: quinn::Incoming,
630  client_address: SocketAddr,
631  server_address: SocketAddr,
632  configurations: Arc<ServerConfigurations>,
633  connection_reference: Arc<()>,
634  shutdown_rx: CancellationToken,
635) {
636  match connection_attempt.await {
637    Ok(connection) => {
638      let _connection_reference = Arc::downgrade(&connection_reference);
639      let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
640        match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
641          Ok(h3_conn) => h3_conn,
642          Err(err) => {
643            for logging_tx in configurations
644              .find_global_configuration()
645              .as_ref()
646              .map_or(&vec![], |c| &c.observability.log_channels)
647            {
648              logging_tx
649                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
650                .await
651                .unwrap_or_default();
652            }
653            return;
654          }
655        };
656
657      loop {
658        match h3_conn.accept().await {
659          Ok(Some(resolver)) => {
660            let configurations = configurations.clone();
661            crate::runtime::spawn(async move {
662              let (request, stream) = match resolver.resolve_request().await {
663                Ok(resolved) => resolved,
664                Err(err) => {
665                  for logging_tx in configurations
666                    .find_global_configuration()
667                    .as_ref()
668                    .map_or(&vec![], |c| &c.observability.log_channels)
669                  {
670                    logging_tx
671                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
672                      .await
673                      .unwrap_or_default();
674                  }
675                  return;
676                }
677              };
678              let (mut send, receive) = stream.split();
679              let request_body_stream =
680                futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
681                  loop {
682                    if !is_body_finished {
683                      match receive.recv_data().await {
684                        Ok(Some(mut data)) => {
685                          return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
686                        }
687                        Ok(None) => is_body_finished = true,
688                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
689                      }
690                    } else {
691                      match receive.recv_trailers().await {
692                        Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
693                        Ok(None) => {
694                          return None;
695                        }
696                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
697                      }
698                    }
699                  }
700                });
701              let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
702              let (request_parts, _) = request.into_parts();
703              let request = Request::from_parts(request_parts, request_body);
704              let mut response = match request_handler(
705                request,
706                client_address,
707                server_address,
708                true,
709                configurations.clone(),
710                None,
711                Arc::new(tokio::sync::RwLock::new(vec![])),
712                None,
713                None,
714              )
715              .await
716              {
717                Ok(response) => response,
718                Err(err) => {
719                  for logging_tx in configurations
720                    .find_global_configuration()
721                    .as_ref()
722                    .map_or(&vec![], |c| &c.observability.log_channels)
723                  {
724                    logging_tx
725                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
726                      .await
727                      .unwrap_or_default();
728                  }
729                  return;
730                }
731              };
732              if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
733                response.headers_mut().entry(hyper::header::DATE).or_insert(http_date);
734              }
735              let (response_parts, mut response_body) = response.into_parts();
736              if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
737                for logging_tx in configurations
738                  .find_global_configuration()
739                  .as_ref()
740                  .map_or(&vec![], |c| &c.observability.log_channels)
741                {
742                  logging_tx
743                    .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
744                    .await
745                    .unwrap_or_default();
746                }
747                return;
748              }
749              let mut had_trailers = false;
750              while let Some(chunk) = response_body.frame().await {
751                match chunk {
752                  Ok(frame) => {
753                    if frame.is_data() {
754                      match frame.into_data() {
755                        Ok(data) => {
756                          if let Err(err) = send.send_data(data).await {
757                            for logging_tx in configurations
758                              .find_global_configuration()
759                              .as_ref()
760                              .map_or(&vec![], |c| &c.observability.log_channels)
761                            {
762                              logging_tx
763                                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
764                                .await
765                                .unwrap_or_default();
766                            }
767                            return;
768                          }
769                        }
770                        Err(_) => {
771                          for logging_tx in configurations
772                            .find_global_configuration()
773                            .as_ref()
774                            .map_or(&vec![], |c| &c.observability.log_channels)
775                          {
776                            logging_tx
777                              .send(LogMessage::new(
778                                "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
779                                true,
780                              ))
781                              .await
782                              .unwrap_or_default();
783                          }
784                          return;
785                        }
786                      }
787                    } else if frame.is_trailers() {
788                      match frame.into_trailers() {
789                        Ok(trailers) => {
790                          had_trailers = true;
791                          if let Err(err) = send.send_trailers(trailers).await {
792                            for logging_tx in configurations
793                              .find_global_configuration()
794                              .as_ref()
795                              .map_or(&vec![], |c| &c.observability.log_channels)
796                            {
797                              logging_tx
798                                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
799                                .await
800                                .unwrap_or_default();
801                            }
802                            return;
803                          }
804                        }
805                        Err(_) => {
806                          for logging_tx in configurations
807                            .find_global_configuration()
808                            .as_ref()
809                            .map_or(&vec![], |c| &c.observability.log_channels)
810                          {
811                            logging_tx
812                              .send(LogMessage::new(
813                                "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
814                                true,
815                              ))
816                              .await
817                              .unwrap_or_default();
818                          }
819                          return;
820                        }
821                      }
822                    }
823                  }
824                  Err(err) => {
825                    for logging_tx in configurations
826                      .find_global_configuration()
827                      .as_ref()
828                      .map_or(&vec![], |c| &c.observability.log_channels)
829                    {
830                      logging_tx
831                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
832                        .await
833                        .unwrap_or_default();
834                    }
835                    return;
836                  }
837                }
838              }
839              if !had_trailers {
840                if let Err(err) = send.finish().await {
841                  for logging_tx in configurations
842                    .find_global_configuration()
843                    .as_ref()
844                    .map_or(&vec![], |c| &c.observability.log_channels)
845                  {
846                    logging_tx
847                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
848                      .await
849                      .unwrap_or_default();
850                  }
851                }
852              }
853            });
854          }
855          Ok(None) => break,
856          Err(err) => {
857            for logging_tx in configurations
858              .find_global_configuration()
859              .as_ref()
860              .map_or(&vec![], |c| &c.observability.log_channels)
861            {
862              logging_tx
863                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
864                .await
865                .unwrap_or_default();
866            }
867            return;
868          }
869        }
870        if shutdown_rx.is_cancelled() {
871          h3_conn.shutdown(0).await.unwrap_or_default();
872        }
873      }
874    }
875    Err(err) => {
876      for logging_tx in configurations
877        .find_global_configuration()
878        .as_ref()
879        .map_or(&vec![], |c| &c.observability.log_channels)
880      {
881        logging_tx
882          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
883          .await
884          .unwrap_or_default();
885      }
886    }
887  }
888}