ferron/
handler.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use async_channel::{Receiver, Sender};
8use bytes::{Buf, Bytes};
9use http_body_util::{BodyExt, StreamBody};
10use hyper::body::{Frame, Incoming};
11use hyper::service::service_fn;
12use hyper::{Request, Response};
13#[cfg(feature = "runtime-tokio")]
14use hyper_util::rt::{TokioIo, TokioTimer};
15#[cfg(feature = "runtime-monoio")]
16use monoio::io::IntoPollIo;
17#[cfg(feature = "runtime-monoio")]
18use monoio::net::tcp::stream_poll::TcpStreamPoll;
19#[cfg(feature = "runtime-monoio")]
20use monoio::net::TcpStream;
21#[cfg(feature = "runtime-monoio")]
22use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
23use rustls::server::Acceptor;
24use rustls::ServerConfig;
25#[cfg(feature = "runtime-tokio")]
26use tokio::net::TcpStream;
27use tokio_rustls::server::TlsStream;
28use tokio_rustls::LazyConfigAcceptor;
29use tokio_util::sync::CancellationToken;
30
31use crate::acme::ACME_TLS_ALPN_NAME;
32use crate::config::ServerConfigurations;
33use crate::get_value;
34use crate::listener_handler_communication::ConnectionData;
35use crate::logging::{LogMessage, Loggers};
36use crate::request_handler::request_handler;
37use crate::util::read_proxy_header;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40
41// Tokio local executor
42#[cfg(feature = "runtime-tokio")]
43#[derive(Clone, Copy, Debug)]
44pub struct TokioLocalExecutor;
45
46#[cfg(feature = "runtime-tokio")]
47impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
48where
49  F: std::future::Future + 'static,
50  F::Output: 'static,
51{
52  #[inline]
53  fn execute(&self, fut: F) {
54    tokio::task::spawn_local(fut);
55  }
56}
57
58/// Creates a HTTP request handler
59#[allow(clippy::too_many_arguments)]
60pub fn create_http_handler(
61  configurations: Arc<ServerConfigurations>,
62  rx: Receiver<ConnectionData>,
63  enable_uring: bool,
64  loggers: Loggers,
65  tls_configs: HashMap<u16, Arc<ServerConfig>>,
66  http3_enabled: bool,
67  acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
68  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
69  enable_proxy_protocol: bool,
70) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
71  let shutdown_tx = CancellationToken::new();
72  let shutdown_rx = shutdown_tx.clone();
73  let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
74  std::thread::Builder::new()
75    .name("Request handler".to_string())
76    .spawn(move || {
77      crate::runtime::new_runtime(
78        async move {
79          if let Some(error) = http_handler_fn(
80            configurations,
81            rx,
82            &handler_init_tx,
83            loggers,
84            shutdown_rx,
85            tls_configs,
86            http3_enabled,
87            acme_tls_alpn_01_configs,
88            acme_http_01_resolvers,
89            enable_proxy_protocol,
90          )
91          .await
92          .err()
93          {
94            handler_init_tx.send(Some(error)).await.unwrap_or_default();
95          }
96        },
97        enable_uring,
98      )
99      .unwrap();
100    })?;
101
102  if let Some(error) = listen_error_rx.recv_blocking()? {
103    Err(error)?;
104  }
105
106  Ok(shutdown_tx)
107}
108
109/// HTTP handler function
110#[allow(clippy::too_many_arguments)]
111async fn http_handler_fn(
112  configurations: Arc<ServerConfigurations>,
113  rx: Receiver<ConnectionData>,
114  handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
115  loggers: Loggers,
116  shutdown_rx: CancellationToken,
117  tls_configs: HashMap<u16, Arc<ServerConfig>>,
118  http3_enabled: bool,
119  acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
120  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
121  enable_proxy_protocol: bool,
122) -> Result<(), Box<dyn Error + Send + Sync>> {
123  handler_init_tx.send(None).await.unwrap_or_default();
124
125  let connections_references = Arc::new(());
126
127  loop {
128    let conn_data = crate::runtime::select! {
129        result = rx.recv() => {
130            if let Ok(recv_data) = result {
131                recv_data
132            } else {
133                break;
134            }
135        }
136        _ = shutdown_rx.cancelled() => {
137            break;
138        }
139    };
140    let configurations = configurations.clone();
141    let tls_config = if matches!(
142      conn_data.connection,
143      crate::listener_handler_communication::Connection::Quic(..)
144    ) {
145      None
146    } else {
147      tls_configs.get(&conn_data.server_address.port()).cloned()
148    };
149    let acme_tls_alpn_01_config = if matches!(
150      conn_data.connection,
151      crate::listener_handler_communication::Connection::Quic(..)
152    ) {
153      None
154    } else {
155      acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
156    };
157    let acme_http_01_resolvers = acme_http_01_resolvers.clone();
158    let loggers = loggers.clone();
159    let connections_references_cloned = connections_references.clone();
160    let shutdown_rx_clone = shutdown_rx.clone();
161    crate::runtime::spawn(async move {
162      match conn_data.connection {
163        crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
164          #[cfg(feature = "runtime-monoio")]
165          let tcp_stream = match TcpStream::from_std(tcp_stream) {
166            Ok(stream) => stream,
167            Err(err) => {
168              if let Some(logging_tx) = loggers.find_global_logger() {
169                logging_tx
170                  .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
171                  .await
172                  .unwrap_or_default();
173              }
174              return;
175            }
176          };
177          let encrypted = tls_config.is_some();
178          http_tcp_handler_fn(
179            tcp_stream,
180            conn_data.client_address,
181            conn_data.server_address,
182            loggers,
183            configurations,
184            tls_config,
185            http3_enabled && encrypted,
186            connections_references_cloned,
187            acme_tls_alpn_01_config,
188            acme_http_01_resolvers,
189            enable_proxy_protocol,
190            shutdown_rx_clone,
191          )
192          .await;
193        }
194        crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
195          http_quic_handler_fn(
196            quic_incoming,
197            conn_data.client_address,
198            conn_data.server_address,
199            loggers,
200            configurations,
201            connections_references_cloned,
202            shutdown_rx_clone,
203          )
204          .await;
205        }
206      }
207    });
208  }
209
210  while Arc::weak_count(&connections_references) > 0 {
211    crate::runtime::sleep(Duration::from_millis(100)).await;
212  }
213
214  Ok(())
215}
216
217/// Enum for maybe TLS stream
218#[allow(clippy::large_enum_variant)]
219#[cfg(feature = "runtime-monoio")]
220enum MaybeTlsStream {
221  /// TLS stream
222  Tls(TlsStream<TcpStreamPoll>),
223
224  /// Plain TCP stream
225  Plain(TcpStreamPoll),
226}
227
228#[allow(clippy::large_enum_variant)]
229#[cfg(feature = "runtime-tokio")]
230enum MaybeTlsStream {
231  /// TLS stream
232  Tls(TlsStream<TcpStream>),
233
234  /// Plain TCP stream
235  Plain(TcpStream),
236}
237
238/// HTTP/1.x and HTTP/2 handler function
239#[allow(clippy::too_many_arguments)]
240async fn http_tcp_handler_fn(
241  tcp_stream: TcpStream,
242  client_address: SocketAddr,
243  server_address: SocketAddr,
244  loggers: Loggers,
245  configurations: Arc<ServerConfigurations>,
246  tls_config: Option<Arc<ServerConfig>>,
247  http3_enabled: bool,
248  connection_reference: Arc<()>,
249  acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
250  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
251  enable_proxy_protocol: bool,
252  shutdown_rx: CancellationToken,
253) {
254  let _connection_reference = Arc::downgrade(&connection_reference);
255  #[cfg(feature = "runtime-monoio")]
256  let tcp_stream = match tcp_stream.into_poll_io() {
257    Ok(stream) => stream,
258    Err(err) => {
259      if let Some(logging_tx) = loggers.find_global_logger() {
260        logging_tx
261          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
262          .await
263          .unwrap_or_default();
264      }
265      return;
266    }
267  };
268
269  // PROXY protocol header precedes TLS handshakes too...
270  let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
271    // Read and parse the PROXY protocol header
272    match read_proxy_header(tcp_stream).await {
273      Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
274      Err(err) => {
275        if let Some(logging_tx) = loggers.find_global_logger() {
276          logging_tx
277            .send(LogMessage::new(
278              format!("Error reading PROXY protocol header: {err}"),
279              true,
280            ))
281            .await
282            .unwrap_or_default();
283        }
284        return;
285      }
286    }
287  } else {
288    (tcp_stream, None, None)
289  };
290
291  let maybe_tls_stream = if let Some(tls_config) = tls_config {
292    let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
293      Ok(start_handshake) => start_handshake,
294      Err(err) => {
295        if let Some(logging_tx) = loggers.find_global_logger() {
296          logging_tx
297            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
298            .await
299            .unwrap_or_default();
300        }
301        return;
302      }
303    };
304
305    if let Some(acme_config) = acme_tls_alpn_01_config {
306      if start_handshake
307        .client_hello()
308        .alpn()
309        .into_iter()
310        .flatten()
311        .eq([ACME_TLS_ALPN_NAME])
312      {
313        match start_handshake.into_stream(acme_config).await {
314          Ok(_) => (),
315          Err(err) => {
316            if let Some(logging_tx) = loggers.find_global_logger() {
317              logging_tx
318                .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
319                .await
320                .unwrap_or_default();
321            }
322            return;
323          }
324        };
325        return;
326      }
327    }
328
329    let tls_stream = match start_handshake.into_stream(tls_config).await {
330      Ok(tls_stream) => tls_stream,
331      Err(err) => {
332        if let Some(logging_tx) = loggers.find_global_logger() {
333          logging_tx
334            .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
335            .await
336            .unwrap_or_default();
337        }
338        return;
339      }
340    };
341
342    MaybeTlsStream::Tls(tls_stream)
343  } else {
344    MaybeTlsStream::Plain(tcp_stream)
345  };
346
347  if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
348    let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
349    let is_http2 = alpn_protocol == Some("h2".as_bytes());
350
351    #[cfg(feature = "runtime-tokio")]
352    let io = TokioIo::new(tls_stream);
353
354    if is_http2 {
355      // Hyper's HTTP/2 connection doesn't require underlying I/O to be `Send`.
356      #[cfg(feature = "runtime-monoio")]
357      let io = MonoioIo::new(tls_stream);
358
359      #[cfg(feature = "runtime-monoio")]
360      let mut http2_builder = {
361        let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
362        http2_builder.timer(MonoioTimer);
363        http2_builder
364      };
365      #[cfg(feature = "runtime-tokio")]
366      let mut http2_builder = {
367        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
368        http2_builder.timer(TokioTimer::new());
369        http2_builder
370      };
371
372      let global_configuration = configurations.find_global_configuration();
373
374      if let Some(initial_window_size) = global_configuration
375        .as_deref()
376        .and_then(|c| get_value!("h2_initial_window_size", c))
377        .and_then(|v| v.as_i128())
378      {
379        http2_builder.initial_stream_window_size(initial_window_size as u32);
380      }
381      if let Some(max_frame_size) = global_configuration
382        .as_deref()
383        .and_then(|c| get_value!("h2_max_frame_size", c))
384        .and_then(|v| v.as_i128())
385      {
386        http2_builder.max_frame_size(max_frame_size as u32);
387      }
388      if let Some(max_concurrent_streams) = global_configuration
389        .as_deref()
390        .and_then(|c| get_value!("h2_max_concurrent_streams", c))
391        .and_then(|v| v.as_i128())
392      {
393        http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
394      }
395      if let Some(max_header_list_size) = global_configuration
396        .as_deref()
397        .and_then(|c| get_value!("h2_max_header_list_size", c))
398        .and_then(|v| v.as_i128())
399      {
400        http2_builder.max_header_list_size(max_header_list_size as u32);
401      }
402      if let Some(enable_connect_protocol) = global_configuration
403        .as_deref()
404        .and_then(|c| get_value!("h2_enable_connect_protocol", c))
405        .and_then(|v| v.as_bool())
406      {
407        if enable_connect_protocol {
408          http2_builder.enable_connect_protocol();
409        }
410      }
411
412      let loggers_clone = loggers.clone();
413      let mut http_future = http2_builder.serve_connection(
414        io,
415        service_fn(move |request: Request<Incoming>| {
416          let (request_parts, request_body) = request.into_parts();
417          let request = Request::from_parts(
418            request_parts,
419            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
420          );
421          request_handler(
422            request,
423            client_address,
424            server_address,
425            true,
426            configurations.clone(),
427            loggers_clone.clone(),
428            if http3_enabled {
429              Some(server_address.port())
430            } else {
431              None
432            },
433            acme_http_01_resolvers.clone(),
434            proxy_protocol_client_address,
435            proxy_protocol_server_address,
436          )
437        }),
438      );
439      let http_future_result = crate::runtime::select! {
440        result = &mut http_future => {
441          result
442        }
443        _ = shutdown_rx.cancelled() => {
444          std::pin::Pin::new(&mut http_future).graceful_shutdown();
445          http_future.await
446        }
447      };
448      if let Err(err) = http_future_result {
449        if let Some(logging_tx) = loggers.find_global_logger() {
450          logging_tx
451            .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
452            .await
453            .unwrap_or_default();
454        }
455      }
456    } else {
457      #[cfg(feature = "runtime-monoio")]
458      let io = MonoioIo::new(SendAsyncIo::new(tls_stream));
459
460      #[cfg(feature = "runtime-monoio")]
461      let http1_builder = {
462        let mut http1_builder = hyper::server::conn::http1::Builder::new();
463
464        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
465        http1_builder.timer(MonoioTimer);
466
467        http1_builder
468      };
469      #[cfg(feature = "runtime-tokio")]
470      let http1_builder = {
471        let mut http1_builder = hyper::server::conn::http1::Builder::new();
472
473        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
474        http1_builder.timer(TokioTimer::new());
475
476        http1_builder
477      };
478
479      let loggers_clone = loggers.clone();
480      let mut http_future = http1_builder
481        .serve_connection(
482          io,
483          service_fn(move |request: Request<Incoming>| {
484            let (request_parts, request_body) = request.into_parts();
485            let request = Request::from_parts(
486              request_parts,
487              request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
488            );
489            request_handler(
490              request,
491              client_address,
492              server_address,
493              true,
494              configurations.clone(),
495              loggers_clone.clone(),
496              if http3_enabled {
497                Some(server_address.port())
498              } else {
499                None
500              },
501              acme_http_01_resolvers.clone(),
502              proxy_protocol_client_address,
503              proxy_protocol_server_address,
504            )
505          }),
506        )
507        .with_upgrades();
508      let http_future_result = crate::runtime::select! {
509        result = &mut http_future => {
510          result
511        }
512        _ = shutdown_rx.cancelled() => {
513          std::pin::Pin::new(&mut http_future).graceful_shutdown();
514          http_future.await
515        }
516      };
517      if let Err(err) = http_future_result {
518        if let Some(logging_tx) = loggers.find_global_logger() {
519          logging_tx
520            .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
521            .await
522            .unwrap_or_default();
523        }
524      }
525    }
526  } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
527    #[cfg(feature = "runtime-monoio")]
528    let io = MonoioIo::new(SendAsyncIo::new(stream));
529    #[cfg(feature = "runtime-tokio")]
530    let io = TokioIo::new(stream);
531
532    #[cfg(feature = "runtime-monoio")]
533    let http1_builder = {
534      let mut http1_builder = hyper::server::conn::http1::Builder::new();
535
536      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
537      http1_builder.timer(MonoioTimer);
538
539      http1_builder
540    };
541    #[cfg(feature = "runtime-tokio")]
542    let http1_builder = {
543      let mut http1_builder = hyper::server::conn::http1::Builder::new();
544
545      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
546      http1_builder.timer(TokioTimer::new());
547
548      http1_builder
549    };
550
551    let loggers_clone = loggers.clone();
552    let mut http_future = http1_builder
553      .serve_connection(
554        io,
555        service_fn(move |request: Request<Incoming>| {
556          let (request_parts, request_body) = request.into_parts();
557          let request = Request::from_parts(
558            request_parts,
559            request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
560          );
561          request_handler(
562            request,
563            client_address,
564            server_address,
565            false,
566            configurations.clone(),
567            loggers_clone.clone(),
568            if http3_enabled {
569              Some(server_address.port())
570            } else {
571              None
572            },
573            acme_http_01_resolvers.clone(),
574            proxy_protocol_client_address,
575            proxy_protocol_server_address,
576          )
577        }),
578      )
579      .with_upgrades();
580    let http_future_result = crate::runtime::select! {
581      result = &mut http_future => {
582        result
583      }
584      _ = shutdown_rx.cancelled() => {
585        std::pin::Pin::new(&mut http_future).graceful_shutdown();
586        http_future.await
587      }
588    };
589    if let Err(err) = http_future_result {
590      if let Some(logging_tx) = loggers.find_global_logger() {
591        logging_tx
592          .send(LogMessage::new(format!("Error serving HTTP connection: {err}"), true))
593          .await
594          .unwrap_or_default();
595      }
596    }
597  }
598}
599
600/// HTTP/3 handler function
601#[allow(clippy::too_many_arguments)]
602async fn http_quic_handler_fn(
603  connection_attempt: quinn::Incoming,
604  client_address: SocketAddr,
605  server_address: SocketAddr,
606  loggers: Loggers,
607  configurations: Arc<ServerConfigurations>,
608  connection_reference: Arc<()>,
609  shutdown_rx: CancellationToken,
610) {
611  match connection_attempt.await {
612    Ok(connection) => {
613      let _connection_reference = Arc::downgrade(&connection_reference);
614      let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
615        match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
616          Ok(h3_conn) => h3_conn,
617          Err(err) => {
618            if let Some(logging_tx) = loggers.find_global_logger() {
619              logging_tx
620                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
621                .await
622                .unwrap_or_default();
623            }
624            return;
625          }
626        };
627
628      loop {
629        match h3_conn.accept().await {
630          Ok(Some(resolver)) => {
631            let configurations = configurations.clone();
632            let loggers = loggers.clone();
633            crate::runtime::spawn(async move {
634              let (request, stream) = match resolver.resolve_request().await {
635                Ok(resolved) => resolved,
636                Err(err) => {
637                  if let Some(logging_tx) = loggers.find_global_logger() {
638                    logging_tx
639                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
640                      .await
641                      .unwrap_or_default();
642                  }
643                  return;
644                }
645              };
646              let (mut send, receive) = stream.split();
647              let request_body_stream =
648                futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
649                  loop {
650                    if !is_body_finished {
651                      match receive.recv_data().await {
652                        Ok(Some(mut data)) => {
653                          return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
654                        }
655                        Ok(None) => is_body_finished = true,
656                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
657                      }
658                    } else {
659                      match receive.recv_trailers().await {
660                        Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
661                        Ok(None) => {
662                          return None;
663                        }
664                        Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
665                      }
666                    }
667                  }
668                });
669              let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
670              let (request_parts, _) = request.into_parts();
671              let request = Request::from_parts(request_parts, request_body);
672              let mut response = match request_handler(
673                request,
674                client_address,
675                server_address,
676                true,
677                configurations.clone(),
678                loggers.clone(),
679                None,
680                Arc::new(tokio::sync::RwLock::new(vec![])),
681                None,
682                None,
683              )
684              .await
685              {
686                Ok(response) => response,
687                Err(err) => {
688                  if let Some(logging_tx) = loggers.find_global_logger() {
689                    logging_tx
690                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
691                      .await
692                      .unwrap_or_default();
693                  }
694                  return;
695                }
696              };
697              if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
698                response.headers_mut().entry(hyper::header::DATE).or_insert(http_date);
699              }
700              let (response_parts, mut response_body) = response.into_parts();
701              if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
702                if let Some(logging_tx) = loggers.find_global_logger() {
703                  logging_tx
704                    .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
705                    .await
706                    .unwrap_or_default();
707                }
708                return;
709              }
710              let mut had_trailers = false;
711              while let Some(chunk) = response_body.frame().await {
712                match chunk {
713                  Ok(frame) => {
714                    if frame.is_data() {
715                      match frame.into_data() {
716                        Ok(data) => {
717                          if let Err(err) = send.send_data(data).await {
718                            if let Some(logging_tx) = loggers.find_global_logger() {
719                              logging_tx
720                                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
721                                .await
722                                .unwrap_or_default();
723                            }
724                            return;
725                          }
726                        }
727                        Err(_) => {
728                          if let Some(logging_tx) = loggers.find_global_logger() {
729                            logging_tx
730                              .send(LogMessage::new(
731                                "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
732                                true,
733                              ))
734                              .await
735                              .unwrap_or_default();
736                          }
737                          return;
738                        }
739                      }
740                    } else if frame.is_trailers() {
741                      match frame.into_trailers() {
742                        Ok(trailers) => {
743                          had_trailers = true;
744                          if let Err(err) = send.send_trailers(trailers).await {
745                            if let Some(logging_tx) = loggers.find_global_logger() {
746                              logging_tx
747                                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
748                                .await
749                                .unwrap_or_default();
750                            }
751                            return;
752                          }
753                        }
754                        Err(_) => {
755                          if let Some(logging_tx) = loggers.find_global_logger() {
756                            logging_tx
757                              .send(LogMessage::new(
758                                "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
759                                true,
760                              ))
761                              .await
762                              .unwrap_or_default();
763                          }
764                          return;
765                        }
766                      }
767                    }
768                  }
769                  Err(err) => {
770                    if let Some(logging_tx) = loggers.find_global_logger() {
771                      logging_tx
772                        .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
773                        .await
774                        .unwrap_or_default();
775                    }
776                    return;
777                  }
778                }
779              }
780              if !had_trailers {
781                if let Err(err) = send.finish().await {
782                  if let Some(logging_tx) = loggers.find_global_logger() {
783                    logging_tx
784                      .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
785                      .await
786                      .unwrap_or_default();
787                  }
788                }
789              }
790            });
791          }
792          Ok(None) => break,
793          Err(err) => {
794            if let Some(logging_tx) = loggers.find_global_logger() {
795              logging_tx
796                .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
797                .await
798                .unwrap_or_default();
799            }
800            return;
801          }
802        }
803        if shutdown_rx.is_cancelled() {
804          h3_conn.shutdown(0).await.unwrap_or_default();
805        }
806      }
807    }
808    Err(err) => {
809      if let Some(logging_tx) = loggers.find_global_logger() {
810        logging_tx
811          .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
812          .await
813          .unwrap_or_default();
814      }
815    }
816  }
817}