ferron/
handler.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use async_channel::{Receiver, Sender};
8use bytes::{Buf, Bytes};
9use ferron_common::logging::LogMessage;
10use http_body_util::{BodyExt, StreamBody};
11use hyper::body::{Frame, Incoming};
12use hyper::service::service_fn;
13use hyper::{Request, Response};
14#[cfg(feature = "runtime-tokio")]
15use hyper_util::rt::{TokioIo, TokioTimer};
16#[cfg(feature = "runtime-monoio")]
17use monoio::io::IntoPollIo;
18#[cfg(feature = "runtime-monoio")]
19use monoio::net::tcp::stream_poll::TcpStreamPoll;
20#[cfg(feature = "runtime-monoio")]
21use monoio::net::TcpStream;
22#[cfg(feature = "runtime-monoio")]
23use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
24use rustls::server::Acceptor;
25use rustls::ServerConfig;
26#[cfg(feature = "runtime-tokio")]
27use tokio::net::TcpStream;
28use tokio_rustls::server::TlsStream;
29use tokio_rustls::LazyConfigAcceptor;
30use tokio_util::sync::CancellationToken;
31
32use crate::acme::ACME_TLS_ALPN_NAME;
33use crate::config::ServerConfigurations;
34use crate::get_value;
35use crate::listener_handler_communication::ConnectionData;
36use crate::request_handler::request_handler;
37use crate::util::read_proxy_header;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40
41static HTTP3_INVALID_HEADERS: [hyper::header::HeaderName; 5] = [
42  hyper::header::HeaderName::from_static("keep-alive"),
43  hyper::header::HeaderName::from_static("proxy-connection"),
44  hyper::header::TRANSFER_ENCODING,
45  hyper::header::TE,
46  hyper::header::UPGRADE,
47];
48
49/// Tokio local executor
50#[cfg(feature = "runtime-tokio")]
51#[derive(Clone, Copy, Debug)]
52pub struct TokioLocalExecutor;
53
54#[cfg(feature = "runtime-tokio")]
55impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
56where
57  F: std::future::Future + 'static,
58  F::Output: 'static,
59{
60  #[inline]
61  fn execute(&self, fut: F) {
62    tokio::task::spawn_local(fut);
63  }
64}
65
66/// Creates a HTTP request handler
67#[allow(clippy::too_many_arguments)]
68pub fn create_http_handler(
69  configurations: Arc<ServerConfigurations>,
70  rx: Receiver<ConnectionData>,
71  enable_uring: Option<bool>,
72  tls_configs: HashMap<u16, Arc<ServerConfig>>,
73  http3_enabled: bool,
74  acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
75  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
76  enable_proxy_protocol: bool,
77  io_uring_disabled: Sender<Option<std::io::Error>>,
78) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
79  let shutdown_tx = CancellationToken::new();
80  let shutdown_rx = shutdown_tx.clone();
81  let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
82  std::thread::Builder::new()
83    .name("Request handler".to_string())
84    .spawn(move || {
85      let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
86        Ok(rt) => rt,
87        Err(error) => {
88          handler_init_tx
89            .send_blocking(Some(
90              anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
91            ))
92            .unwrap_or_default();
93          return;
94        }
95      };
96      io_uring_disabled
97        .send_blocking(rt.return_io_uring_error())
98        .unwrap_or_default();
99      rt.run(async move {
100        if let Some(error) = http_handler_fn(
101          configurations,
102          rx,
103          &handler_init_tx,
104          shutdown_rx,
105          tls_configs,
106          http3_enabled,
107          acme_tls_alpn_01_configs,
108          acme_http_01_resolvers,
109          enable_proxy_protocol,
110        )
111        .await
112        .err()
113        {
114          handler_init_tx.send(Some(error)).await.unwrap_or_default();
115        }
116      });
117    })?;
118
119  if let Some(error) = listen_error_rx.recv_blocking()? {
120    Err(error)?;
121  }
122
123  Ok(shutdown_tx)
124}
125
126/// HTTP handler function
127#[inline]
128#[allow(clippy::too_many_arguments)]
129async fn http_handler_fn(
130  configurations: Arc<ServerConfigurations>,
131  rx: Receiver<ConnectionData>,
132  handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
133  shutdown_rx: CancellationToken,
134  tls_configs: HashMap<u16, Arc<ServerConfig>>,
135  http3_enabled: bool,
136  acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
137  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
138  enable_proxy_protocol: bool,
139) -> Result<(), Box<dyn Error + Send + Sync>> {
140  handler_init_tx.send(None).await.unwrap_or_default();
141
142  let connections_references = Arc::new(());
143
144  loop {
145    let conn_data = crate::runtime::select! {
146        result = rx.recv() => {
147            if let Ok(recv_data) = result {
148                recv_data
149            } else {
150                break;
151            }
152        }
153        _ = shutdown_rx.cancelled() => {
154            break;
155        }
156    };
157    let configurations = configurations.clone();
158    let tls_config = if matches!(
159      conn_data.connection,
160      crate::listener_handler_communication::Connection::Quic(..)
161    ) {
162      None
163    } else {
164      tls_configs.get(&conn_data.server_address.port()).cloned()
165    };
166    let acme_tls_alpn_01_config = if matches!(
167      conn_data.connection,
168      crate::listener_handler_communication::Connection::Quic(..)
169    ) {
170      None
171    } else {
172      acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
173    };
174    let acme_http_01_resolvers = acme_http_01_resolvers.clone();
175    let connections_references_cloned = connections_references.clone();
176    let shutdown_rx_clone = shutdown_rx.clone();
177    crate::runtime::spawn(async move {
178      match conn_data.connection {
179        crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
180          // Toggle O_NONBLOCK for TCP stream, when using Monoio.
181          // Unset it when io_uring is enabled, and set it otherwise.
182          #[cfg(feature = "runtime-monoio")]
183          let _ = tcp_stream.set_nonblocking(monoio::utils::is_legacy());
184
185          #[cfg(feature = "runtime-monoio")]
186          let tcp_stream = match TcpStream::from_std(tcp_stream) {
187            Ok(stream) => stream,
188            Err(err) => {
189              for logging_tx in configurations
190                .find_global_configuration()
191                .as_ref()
192                .map_or(&vec![], |c| &c.observability.log_channels)
193              {
194                logging_tx
195                  .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
196                  .await
197                  .unwrap_or_default();
198              }
199              return;
200            }
201          };
202          let encrypted = tls_config.is_some();
203          http_tcp_handler_fn(
204            tcp_stream,
205            conn_data.client_address,
206            conn_data.server_address,
207            configurations,
208            tls_config,
209            http3_enabled && encrypted,
210            connections_references_cloned,
211            acme_tls_alpn_01_config,
212            acme_http_01_resolvers,
213            enable_proxy_protocol,
214            shutdown_rx_clone,
215          )
216          .await;
217        }
218        crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
219          http_quic_handler_fn(
220            quic_incoming,
221            conn_data.client_address,
222            conn_data.server_address,
223            configurations,
224            connections_references_cloned,
225            shutdown_rx_clone,
226          )
227          .await;
228        }
229      }
230    });
231  }
232
233  while Arc::weak_count(&connections_references) > 0 {
234    crate::runtime::sleep(Duration::from_millis(100)).await;
235  }
236
237  Ok(())
238}
239
240/// Enum for maybe TLS stream
241#[allow(clippy::large_enum_variant)]
242#[cfg(feature = "runtime-monoio")]
243enum MaybeTlsStream {
244  /// TLS stream
245  Tls(TlsStream<SendAsyncIo<TcpStreamPoll>>),
246
247  /// Plain TCP stream
248  Plain(SendAsyncIo<TcpStreamPoll>),
249}
250
251#[allow(clippy::large_enum_variant)]
252#[cfg(feature = "runtime-tokio")]
253enum MaybeTlsStream {
254  /// TLS stream
255  Tls(TlsStream<TcpStream>),
256
257  /// Plain TCP stream
258  Plain(TcpStream),
259}
260
261/// HTTP/1.x and HTTP/2 handler function
262#[allow(clippy::too_many_arguments)]
263async fn http_tcp_handler_fn(
264  tcp_stream: TcpStream,
265  client_address: SocketAddr,
266  server_address: SocketAddr,
267  configurations: Arc<ServerConfigurations>,
268  tls_config: Option<Arc<ServerConfig>>,
269  http3_enabled: bool,
270  connection_reference: Arc<()>,
271  acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
272  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
273  enable_proxy_protocol: bool,
274  shutdown_rx: CancellationToken,
275) {
276  let _connection_reference = Arc::downgrade(&connection_reference);
277  #[cfg(feature = "runtime-monoio")]
278  let tcp_stream = match tcp_stream.into_poll_io() {
279    Ok(stream) => SendAsyncIo::new(stream),
280    Err(err) => {
281      for logging_tx in configurations
282        .find_global_configuration()
283        .as_ref()
284        .map_or(&vec![], |c| &c.observability.log_channels)
285      {
286        logging_tx
287          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
288          .await
289          .unwrap_or_default();
290      }
291      return;
292    }
293  };
294
295  // PROXY protocol header precedes TLS handshakes too...
296  let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
297    // Read and parse the PROXY protocol header
298    match read_proxy_header(tcp_stream).await {
299      Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
300      Err(err) => {
301        for logging_tx in configurations
302          .find_global_configuration()
303          .as_ref()
304          .map_or(&vec![], |c| &c.observability.log_channels)
305        {
306          logging_tx
307            .send(LogMessage::new(
308              format!("Error reading PROXY protocol header: {err}"),
309              true,
310            ))
311            .await
312            .unwrap_or_default();
313        }
314        return;
315      }
316    }
317  } else {
318    (tcp_stream, None, None)
319  };
320
321  let maybe_tls_stream = if let Some(tls_config) = tls_config {
322    let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
323      Ok(start_handshake) => start_handshake,
324      Err(err) => {
325        for logging_tx in configurations
326          .find_global_configuration()
327          .as_ref()
328          .map_or(&vec![], |c| &c.observability.log_channels)
329        {
330          logging_tx
331            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
332            .await
333            .unwrap_or_default();
334        }
335        return;
336      }
337    };
338
339    if let Some(acme_config) = acme_tls_alpn_01_config {
340      if start_handshake
341        .client_hello()
342        .alpn()
343        .into_iter()
344        .flatten()
345        .eq([ACME_TLS_ALPN_NAME])
346      {
347        match start_handshake.into_stream(acme_config).await {
348          Ok(_) => (),
349          Err(err) => {
350            for logging_tx in configurations
351              .find_global_configuration()
352              .as_ref()
353              .map_or(&vec![], |c| &c.observability.log_channels)
354            {
355              logging_tx
356                .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
357                .await
358                .unwrap_or_default();
359            }
360            return;
361          }
362        };
363        return;
364      }
365    }
366
367    let tls_stream = match start_handshake.into_stream(tls_config).await {
368      Ok(tls_stream) => tls_stream,
369      Err(err) => {
370        for logging_tx in configurations
371          .find_global_configuration()
372          .as_ref()
373          .map_or(&vec![], |c| &c.observability.log_channels)
374        {
375          logging_tx
376            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
377            .await
378            .unwrap_or_default();
379        }
380        return;
381      }
382    };
383
384    MaybeTlsStream::Tls(tls_stream)
385  } else {
386    MaybeTlsStream::Plain(tcp_stream)
387  };
388
389  if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
390    let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
391    let is_http2 = alpn_protocol == Some("h2".as_bytes());
392
393    #[cfg(feature = "runtime-tokio")]
394    let io = TokioIo::new(tls_stream);
395
396    if is_http2 {
397      // Hyper's HTTP/2 connection doesn't require underlying I/O to be `Send`.
398      #[cfg(feature = "runtime-monoio")]
399      let io = MonoioIo::new(tls_stream);
400
401      #[cfg(feature = "runtime-monoio")]
402      let mut http2_builder = {
403        let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
404        http2_builder.timer(MonoioTimer);
405        http2_builder
406      };
407      #[cfg(feature = "runtime-tokio")]
408      let mut http2_builder = {
409        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
410        http2_builder.timer(TokioTimer::new());
411        http2_builder
412      };
413
414      let global_configuration = configurations.find_global_configuration();
415
416      if let Some(initial_window_size) = global_configuration
417        .as_deref()
418        .and_then(|c| get_value!("h2_initial_window_size", c))
419        .and_then(|v| v.as_i128())
420      {
421        http2_builder.initial_stream_window_size(initial_window_size as u32);
422      }
423      if let Some(max_frame_size) = global_configuration
424        .as_deref()
425        .and_then(|c| get_value!("h2_max_frame_size", c))
426        .and_then(|v| v.as_i128())
427      {
428        http2_builder.max_frame_size(max_frame_size as u32);
429      }
430      if let Some(max_concurrent_streams) = global_configuration
431        .as_deref()
432        .and_then(|c| get_value!("h2_max_concurrent_streams", c))
433        .and_then(|v| v.as_i128())
434      {
435        http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
436      }
437      if let Some(max_header_list_size) = global_configuration
438        .as_deref()
439        .and_then(|c| get_value!("h2_max_header_list_size", c))
440        .and_then(|v| v.as_i128())
441      {
442        http2_builder.max_header_list_size(max_header_list_size as u32);
443      }
444      if let Some(enable_connect_protocol) = global_configuration
445        .as_deref()
446        .and_then(|c| get_value!("h2_enable_connect_protocol", c))
447        .and_then(|v| v.as_bool())
448      {
449        if enable_connect_protocol {
450          http2_builder.enable_connect_protocol();
451        }
452      }
453
454      let configurations_clone = configurations.clone();
455      let mut http_future = http2_builder.serve_connection(
456        io,
457        service_fn(move |request: Request<Incoming>| {
458          let (request_parts, request_body) = request.into_parts();
459          let request = Request::from_parts(
460            request_parts,
461            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
462          );
463          request_handler(
464            request,
465            client_address,
466            server_address,
467            true,
468            configurations_clone.clone(),
469            if http3_enabled {
470              Some(server_address.port())
471            } else {
472              None
473            },
474            acme_http_01_resolvers.clone(),
475            proxy_protocol_client_address,
476            proxy_protocol_server_address,
477          )
478        }),
479      );
480      let http_future_result = crate::runtime::select! {
481        result = &mut http_future => {
482          result
483        }
484        _ = shutdown_rx.cancelled() => {
485          std::pin::Pin::new(&mut http_future).graceful_shutdown();
486          http_future.await
487        }
488      };
489      if let Err(err) = http_future_result {
490        let error_to_log = if err.is_user() {
491          err.source().unwrap_or(&err)
492        } else {
493          &err
494        };
495        for logging_tx in configurations
496          .find_global_configuration()
497          .as_ref()
498          .map_or(&vec![], |c| &c.observability.log_channels)
499        {
500          logging_tx
501            .send(LogMessage::new(
502              format!("Error serving HTTPS connection: {error_to_log}"),
503              true,
504            ))
505            .await
506            .unwrap_or_default();
507        }
508      }
509    } else {
510      #[cfg(feature = "runtime-monoio")]
511      let io = MonoioIo::new(tls_stream);
512
513      #[cfg(feature = "runtime-monoio")]
514      let http1_builder = {
515        let mut http1_builder = hyper::server::conn::http1::Builder::new();
516
517        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
518        http1_builder.timer(MonoioTimer);
519
520        http1_builder
521      };
522      #[cfg(feature = "runtime-tokio")]
523      let http1_builder = {
524        let mut http1_builder = hyper::server::conn::http1::Builder::new();
525
526        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
527        http1_builder.timer(TokioTimer::new());
528
529        http1_builder
530      };
531
532      let configurations_clone = configurations.clone();
533      let mut http_future = http1_builder
534        .serve_connection(
535          io,
536          service_fn(move |request: Request<Incoming>| {
537            let (request_parts, request_body) = request.into_parts();
538            let request = Request::from_parts(
539              request_parts,
540              request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
541            );
542            request_handler(
543              request,
544              client_address,
545              server_address,
546              true,
547              configurations_clone.clone(),
548              if http3_enabled {
549                Some(server_address.port())
550              } else {
551                None
552              },
553              acme_http_01_resolvers.clone(),
554              proxy_protocol_client_address,
555              proxy_protocol_server_address,
556            )
557          }),
558        )
559        .with_upgrades();
560      let http_future_result = crate::runtime::select! {
561        result = &mut http_future => {
562          result
563        }
564        _ = shutdown_rx.cancelled() => {
565          std::pin::Pin::new(&mut http_future).graceful_shutdown();
566          http_future.await
567        }
568      };
569      if let Err(err) = http_future_result {
570        let error_to_log = if err.is_user() {
571          err.source().unwrap_or(&err)
572        } else {
573          &err
574        };
575        for logging_tx in configurations
576          .find_global_configuration()
577          .as_ref()
578          .map_or(&vec![], |c| &c.observability.log_channels)
579        {
580          logging_tx
581            .send(LogMessage::new(
582              format!("Error serving HTTPS connection: {error_to_log}"),
583              true,
584            ))
585            .await
586            .unwrap_or_default();
587        }
588      }
589    }
590  } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
591    #[cfg(feature = "runtime-monoio")]
592    let io = MonoioIo::new(stream);
593    #[cfg(feature = "runtime-tokio")]
594    let io = TokioIo::new(stream);
595
596    #[cfg(feature = "runtime-monoio")]
597    let http1_builder = {
598      let mut http1_builder = hyper::server::conn::http1::Builder::new();
599
600      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
601      http1_builder.timer(MonoioTimer);
602
603      http1_builder
604    };
605    #[cfg(feature = "runtime-tokio")]
606    let http1_builder = {
607      let mut http1_builder = hyper::server::conn::http1::Builder::new();
608
609      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
610      http1_builder.timer(TokioTimer::new());
611
612      http1_builder
613    };
614
615    let configurations_clone = configurations.clone();
616    let mut http_future = http1_builder
617      .serve_connection(
618        io,
619        service_fn(move |request: Request<Incoming>| {
620          let (request_parts, request_body) = request.into_parts();
621          let request = Request::from_parts(
622            request_parts,
623            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
624          );
625          request_handler(
626            request,
627            client_address,
628            server_address,
629            false,
630            configurations_clone.clone(),
631            if http3_enabled {
632              Some(server_address.port())
633            } else {
634              None
635            },
636            acme_http_01_resolvers.clone(),
637            proxy_protocol_client_address,
638            proxy_protocol_server_address,
639          )
640        }),
641      )
642      .with_upgrades();
643    let http_future_result = crate::runtime::select! {
644      result = &mut http_future => {
645        result
646      }
647      _ = shutdown_rx.cancelled() => {
648        std::pin::Pin::new(&mut http_future).graceful_shutdown();
649        http_future.await
650      }
651    };
652    if let Err(err) = http_future_result {
653      let error_to_log = if err.is_user() {
654        err.source().unwrap_or(&err)
655      } else {
656        &err
657      };
658      for logging_tx in configurations
659        .find_global_configuration()
660        .as_ref()
661        .map_or(&vec![], |c| &c.observability.log_channels)
662      {
663        logging_tx
664          .send(LogMessage::new(
665            format!("Error serving HTTP connection: {error_to_log}"),
666            true,
667          ))
668          .await
669          .unwrap_or_default();
670      }
671    }
672  }
673}
674
675/// HTTP/3 handler function
676#[inline]
677#[allow(clippy::too_many_arguments)]
678async fn http_quic_handler_fn(
679  connection_attempt: quinn::Incoming,
680  client_address: SocketAddr,
681  server_address: SocketAddr,
682  configurations: Arc<ServerConfigurations>,
683  connection_reference: Arc<()>,
684  shutdown_rx: CancellationToken,
685) {
686  match connection_attempt.await {
687    Ok(connection) => {
688      let connection_reference = Arc::downgrade(&connection_reference);
689      let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
690        match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
691          Ok(h3_conn) => h3_conn,
692          Err(err) => {
693            for logging_tx in configurations
694              .find_global_configuration()
695              .as_ref()
696              .map_or(&vec![], |c| &c.observability.log_channels)
697            {
698              logging_tx
699                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
700                .await
701                .unwrap_or_default();
702            }
703            return;
704          }
705        };
706
707      loop {
708        match crate::runtime::select! {
709            biased;
710
711            _ = shutdown_rx.cancelled() => {
712              h3_conn.shutdown(0).await.unwrap_or_default();
713              return;
714            }
715            result = h3_conn.accept() => {
716              result
717            }
718        } {
719          Ok(Some(resolver)) => {
720            let configurations = configurations.clone();
721            let connection_reference = connection_reference.clone();
722            crate::runtime::spawn(async move {
723              let _connection_reference = connection_reference;
724              let (request, stream) = match resolver.resolve_request().await {
725                Ok(resolved) => resolved,
726                Err(err) => {
727                  if !err.is_h3_no_error() {
728                    for logging_tx in configurations
729                      .find_global_configuration()
730                      .as_ref()
731                      .map_or(&vec![], |c| &c.observability.log_channels)
732                    {
733                      logging_tx
734                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
735                        .await
736                        .unwrap_or_default();
737                    }
738                  }
739                  return;
740                }
741              };
742              let (mut send, receive) = stream.split();
743              let request_body_stream =
744                futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
745                  loop {
746                    if !is_body_finished {
747                      match receive.recv_data().await {
748                        Ok(Some(mut data)) => {
749                          return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
750                        }
751                        Ok(None) => is_body_finished = true,
752                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
753                      }
754                    } else {
755                      match receive.recv_trailers().await {
756                        Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
757                        Ok(None) => {
758                          return None;
759                        }
760                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
761                      }
762                    }
763                  }
764                });
765              let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
766              let (request_parts, _) = request.into_parts();
767              let request = Request::from_parts(request_parts, request_body);
768              let mut response = match request_handler(
769                request,
770                client_address,
771                server_address,
772                true,
773                configurations.clone(),
774                None,
775                Arc::new(tokio::sync::RwLock::new(vec![])),
776                None,
777                None,
778              )
779              .await
780              {
781                Ok(response) => response,
782                Err(err) => {
783                  for logging_tx in configurations
784                    .find_global_configuration()
785                    .as_ref()
786                    .map_or(&vec![], |c| &c.observability.log_channels)
787                  {
788                    logging_tx
789                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
790                      .await
791                      .unwrap_or_default();
792                  }
793                  return;
794                }
795              };
796              let response_headers = response.headers_mut();
797              if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
798                response_headers.entry(hyper::header::DATE).or_insert(http_date);
799              }
800              for header in &HTTP3_INVALID_HEADERS {
801                response_headers.remove(header);
802              }
803              if let Some(connection_header) = response_headers
804                .remove(hyper::header::CONNECTION)
805                .as_ref()
806                .and_then(|v| v.to_str().ok())
807              {
808                for name in connection_header.split(',') {
809                  response_headers.remove(name.trim());
810                }
811              }
812              let (response_parts, mut response_body) = response.into_parts();
813              if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
814                if !err.is_h3_no_error() {
815                  for logging_tx in configurations
816                    .find_global_configuration()
817                    .as_ref()
818                    .map_or(&vec![], |c| &c.observability.log_channels)
819                  {
820                    logging_tx
821                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
822                      .await
823                      .unwrap_or_default();
824                  }
825                }
826                return;
827              }
828              let mut had_trailers = false;
829              while let Some(chunk) = response_body.frame().await {
830                match chunk {
831                  Ok(frame) => {
832                    if frame.is_data() {
833                      match frame.into_data() {
834                        Ok(data) => {
835                          if let Err(err) = send.send_data(data).await {
836                            if !err.is_h3_no_error() {
837                              for logging_tx in configurations
838                                .find_global_configuration()
839                                .as_ref()
840                                .map_or(&vec![], |c| &c.observability.log_channels)
841                              {
842                                logging_tx
843                                  .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
844                                  .await
845                                  .unwrap_or_default();
846                              }
847                            }
848                            return;
849                          }
850                        }
851                        Err(_) => {
852                          for logging_tx in configurations
853                            .find_global_configuration()
854                            .as_ref()
855                            .map_or(&vec![], |c| &c.observability.log_channels)
856                          {
857                            logging_tx
858                              .send(LogMessage::new(
859                                "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
860                                true,
861                              ))
862                              .await
863                              .unwrap_or_default();
864                          }
865                          return;
866                        }
867                      }
868                    } else if frame.is_trailers() {
869                      match frame.into_trailers() {
870                        Ok(trailers) => {
871                          had_trailers = true;
872                          if let Err(err) = send.send_trailers(trailers).await {
873                            if !err.is_h3_no_error() {
874                              for logging_tx in configurations
875                                .find_global_configuration()
876                                .as_ref()
877                                .map_or(&vec![], |c| &c.observability.log_channels)
878                              {
879                                logging_tx
880                                  .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
881                                  .await
882                                  .unwrap_or_default();
883                              }
884                            }
885                            return;
886                          }
887                        }
888                        Err(_) => {
889                          for logging_tx in configurations
890                            .find_global_configuration()
891                            .as_ref()
892                            .map_or(&vec![], |c| &c.observability.log_channels)
893                          {
894                            logging_tx
895                              .send(LogMessage::new(
896                                "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
897                                true,
898                              ))
899                              .await
900                              .unwrap_or_default();
901                          }
902                          return;
903                        }
904                      }
905                    }
906                  }
907                  Err(err) => {
908                    for logging_tx in configurations
909                      .find_global_configuration()
910                      .as_ref()
911                      .map_or(&vec![], |c| &c.observability.log_channels)
912                    {
913                      logging_tx
914                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
915                        .await
916                        .unwrap_or_default();
917                    }
918
919                    return;
920                  }
921                }
922              }
923              if !had_trailers {
924                if let Err(err) = send.finish().await {
925                  if !err.is_h3_no_error() {
926                    for logging_tx in configurations
927                      .find_global_configuration()
928                      .as_ref()
929                      .map_or(&vec![], |c| &c.observability.log_channels)
930                    {
931                      logging_tx
932                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
933                        .await
934                        .unwrap_or_default();
935                    }
936                  }
937                }
938              }
939            });
940          }
941          Ok(None) => break,
942          Err(err) => {
943            if !err.is_h3_no_error() {
944              for logging_tx in configurations
945                .find_global_configuration()
946                .as_ref()
947                .map_or(&vec![], |c| &c.observability.log_channels)
948              {
949                logging_tx
950                  .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
951                  .await
952                  .unwrap_or_default();
953              }
954            }
955            return;
956          }
957        }
958      }
959    }
960    Err(err) => {
961      for logging_tx in configurations
962        .find_global_configuration()
963        .as_ref()
964        .map_or(&vec![], |c| &c.observability.log_channels)
965      {
966        logging_tx
967          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
968          .await
969          .unwrap_or_default();
970      }
971    }
972  }
973}