1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use async_channel::{Receiver, Sender};
8use bytes::{Buf, Bytes};
9use ferron_common::logging::LogMessage;
10use http_body_util::{BodyExt, StreamBody};
11use hyper::body::{Frame, Incoming};
12use hyper::service::service_fn;
13use hyper::{Request, Response};
14#[cfg(feature = "runtime-tokio")]
15use hyper_util::rt::{TokioIo, TokioTimer};
16#[cfg(feature = "runtime-monoio")]
17use monoio::io::IntoPollIo;
18#[cfg(feature = "runtime-monoio")]
19use monoio::net::tcp::stream_poll::TcpStreamPoll;
20#[cfg(feature = "runtime-monoio")]
21use monoio::net::TcpStream;
22#[cfg(feature = "runtime-monoio")]
23use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
24use rustls::server::Acceptor;
25use rustls::ServerConfig;
26#[cfg(feature = "runtime-tokio")]
27use tokio::net::TcpStream;
28use tokio_rustls::server::TlsStream;
29use tokio_rustls::LazyConfigAcceptor;
30use tokio_util::sync::CancellationToken;
31
32use crate::acme::ACME_TLS_ALPN_NAME;
33use crate::config::ServerConfigurations;
34use crate::get_value;
35use crate::listener_handler_communication::ConnectionData;
36use crate::request_handler::request_handler;
37use crate::util::read_proxy_header;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40
41#[cfg(feature = "runtime-tokio")]
43#[derive(Clone, Copy, Debug)]
44pub struct TokioLocalExecutor;
45
46#[cfg(feature = "runtime-tokio")]
47impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
48where
49 F: std::future::Future + 'static,
50 F::Output: 'static,
51{
52 #[inline]
53 fn execute(&self, fut: F) {
54 tokio::task::spawn_local(fut);
55 }
56}
57
58#[allow(clippy::too_many_arguments)]
60pub fn create_http_handler(
61 configurations: Arc<ServerConfigurations>,
62 rx: Receiver<ConnectionData>,
63 enable_uring: bool,
64 tls_configs: HashMap<u16, Arc<ServerConfig>>,
65 http3_enabled: bool,
66 acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
67 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
68 enable_proxy_protocol: bool,
69) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
70 let shutdown_tx = CancellationToken::new();
71 let shutdown_rx = shutdown_tx.clone();
72 let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
73 std::thread::Builder::new()
74 .name("Request handler".to_string())
75 .spawn(move || {
76 crate::runtime::new_runtime(
77 async move {
78 if let Some(error) = http_handler_fn(
79 configurations,
80 rx,
81 &handler_init_tx,
82 shutdown_rx,
83 tls_configs,
84 http3_enabled,
85 acme_tls_alpn_01_configs,
86 acme_http_01_resolvers,
87 enable_proxy_protocol,
88 )
89 .await
90 .err()
91 {
92 handler_init_tx.send(Some(error)).await.unwrap_or_default();
93 }
94 },
95 enable_uring,
96 )
97 .unwrap();
98 })?;
99
100 if let Some(error) = listen_error_rx.recv_blocking()? {
101 Err(error)?;
102 }
103
104 Ok(shutdown_tx)
105}
106
107#[allow(clippy::too_many_arguments)]
109async fn http_handler_fn(
110 configurations: Arc<ServerConfigurations>,
111 rx: Receiver<ConnectionData>,
112 handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
113 shutdown_rx: CancellationToken,
114 tls_configs: HashMap<u16, Arc<ServerConfig>>,
115 http3_enabled: bool,
116 acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
117 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
118 enable_proxy_protocol: bool,
119) -> Result<(), Box<dyn Error + Send + Sync>> {
120 handler_init_tx.send(None).await.unwrap_or_default();
121
122 let connections_references = Arc::new(());
123
124 loop {
125 let conn_data = crate::runtime::select! {
126 result = rx.recv() => {
127 if let Ok(recv_data) = result {
128 recv_data
129 } else {
130 break;
131 }
132 }
133 _ = shutdown_rx.cancelled() => {
134 break;
135 }
136 };
137 let configurations = configurations.clone();
138 let tls_config = if matches!(
139 conn_data.connection,
140 crate::listener_handler_communication::Connection::Quic(..)
141 ) {
142 None
143 } else {
144 tls_configs.get(&conn_data.server_address.port()).cloned()
145 };
146 let acme_tls_alpn_01_config = if matches!(
147 conn_data.connection,
148 crate::listener_handler_communication::Connection::Quic(..)
149 ) {
150 None
151 } else {
152 acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
153 };
154 let acme_http_01_resolvers = acme_http_01_resolvers.clone();
155 let connections_references_cloned = connections_references.clone();
156 let shutdown_rx_clone = shutdown_rx.clone();
157 crate::runtime::spawn(async move {
158 match conn_data.connection {
159 crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
160 #[cfg(feature = "runtime-monoio")]
161 let tcp_stream = match TcpStream::from_std(tcp_stream) {
162 Ok(stream) => stream,
163 Err(err) => {
164 for logging_tx in configurations
165 .find_global_configuration()
166 .as_ref()
167 .map_or(&vec![], |c| &c.observability.log_channels)
168 {
169 logging_tx
170 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
171 .await
172 .unwrap_or_default();
173 }
174 return;
175 }
176 };
177 let encrypted = tls_config.is_some();
178 http_tcp_handler_fn(
179 tcp_stream,
180 conn_data.client_address,
181 conn_data.server_address,
182 configurations,
183 tls_config,
184 http3_enabled && encrypted,
185 connections_references_cloned,
186 acme_tls_alpn_01_config,
187 acme_http_01_resolvers,
188 enable_proxy_protocol,
189 shutdown_rx_clone,
190 )
191 .await;
192 }
193 crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
194 http_quic_handler_fn(
195 quic_incoming,
196 conn_data.client_address,
197 conn_data.server_address,
198 configurations,
199 connections_references_cloned,
200 shutdown_rx_clone,
201 )
202 .await;
203 }
204 }
205 });
206 }
207
208 while Arc::weak_count(&connections_references) > 0 {
209 crate::runtime::sleep(Duration::from_millis(100)).await;
210 }
211
212 Ok(())
213}
214
215#[allow(clippy::large_enum_variant)]
217#[cfg(feature = "runtime-monoio")]
218enum MaybeTlsStream {
219 Tls(TlsStream<TcpStreamPoll>),
221
222 Plain(TcpStreamPoll),
224}
225
226#[allow(clippy::large_enum_variant)]
227#[cfg(feature = "runtime-tokio")]
228enum MaybeTlsStream {
229 Tls(TlsStream<TcpStream>),
231
232 Plain(TcpStream),
234}
235
236#[allow(clippy::too_many_arguments)]
238async fn http_tcp_handler_fn(
239 tcp_stream: TcpStream,
240 client_address: SocketAddr,
241 server_address: SocketAddr,
242 configurations: Arc<ServerConfigurations>,
243 tls_config: Option<Arc<ServerConfig>>,
244 http3_enabled: bool,
245 connection_reference: Arc<()>,
246 acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
247 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
248 enable_proxy_protocol: bool,
249 shutdown_rx: CancellationToken,
250) {
251 let _connection_reference = Arc::downgrade(&connection_reference);
252 #[cfg(feature = "runtime-monoio")]
253 let tcp_stream = match tcp_stream.into_poll_io() {
254 Ok(stream) => stream,
255 Err(err) => {
256 for logging_tx in configurations
257 .find_global_configuration()
258 .as_ref()
259 .map_or(&vec![], |c| &c.observability.log_channels)
260 {
261 logging_tx
262 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
263 .await
264 .unwrap_or_default();
265 }
266 return;
267 }
268 };
269
270 let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
272 match read_proxy_header(tcp_stream).await {
274 Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
275 Err(err) => {
276 for logging_tx in configurations
277 .find_global_configuration()
278 .as_ref()
279 .map_or(&vec![], |c| &c.observability.log_channels)
280 {
281 logging_tx
282 .send(LogMessage::new(
283 format!("Error reading PROXY protocol header: {err}"),
284 true,
285 ))
286 .await
287 .unwrap_or_default();
288 }
289 return;
290 }
291 }
292 } else {
293 (tcp_stream, None, None)
294 };
295
296 let maybe_tls_stream = if let Some(tls_config) = tls_config {
297 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
298 Ok(start_handshake) => start_handshake,
299 Err(err) => {
300 for logging_tx in configurations
301 .find_global_configuration()
302 .as_ref()
303 .map_or(&vec![], |c| &c.observability.log_channels)
304 {
305 logging_tx
306 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
307 .await
308 .unwrap_or_default();
309 }
310 return;
311 }
312 };
313
314 if let Some(acme_config) = acme_tls_alpn_01_config {
315 if start_handshake
316 .client_hello()
317 .alpn()
318 .into_iter()
319 .flatten()
320 .eq([ACME_TLS_ALPN_NAME])
321 {
322 match start_handshake.into_stream(acme_config).await {
323 Ok(_) => (),
324 Err(err) => {
325 for logging_tx in configurations
326 .find_global_configuration()
327 .as_ref()
328 .map_or(&vec![], |c| &c.observability.log_channels)
329 {
330 logging_tx
331 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
332 .await
333 .unwrap_or_default();
334 }
335 return;
336 }
337 };
338 return;
339 }
340 }
341
342 let tls_stream = match start_handshake.into_stream(tls_config).await {
343 Ok(tls_stream) => tls_stream,
344 Err(err) => {
345 for logging_tx in configurations
346 .find_global_configuration()
347 .as_ref()
348 .map_or(&vec![], |c| &c.observability.log_channels)
349 {
350 logging_tx
351 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
352 .await
353 .unwrap_or_default();
354 }
355 return;
356 }
357 };
358
359 MaybeTlsStream::Tls(tls_stream)
360 } else {
361 MaybeTlsStream::Plain(tcp_stream)
362 };
363
364 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
365 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
366 let is_http2 = alpn_protocol == Some("h2".as_bytes());
367
368 #[cfg(feature = "runtime-tokio")]
369 let io = TokioIo::new(tls_stream);
370
371 if is_http2 {
372 #[cfg(feature = "runtime-monoio")]
374 let io = MonoioIo::new(tls_stream);
375
376 #[cfg(feature = "runtime-monoio")]
377 let mut http2_builder = {
378 let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
379 http2_builder.timer(MonoioTimer);
380 http2_builder
381 };
382 #[cfg(feature = "runtime-tokio")]
383 let mut http2_builder = {
384 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
385 http2_builder.timer(TokioTimer::new());
386 http2_builder
387 };
388
389 let global_configuration = configurations.find_global_configuration();
390
391 if let Some(initial_window_size) = global_configuration
392 .as_deref()
393 .and_then(|c| get_value!("h2_initial_window_size", c))
394 .and_then(|v| v.as_i128())
395 {
396 http2_builder.initial_stream_window_size(initial_window_size as u32);
397 }
398 if let Some(max_frame_size) = global_configuration
399 .as_deref()
400 .and_then(|c| get_value!("h2_max_frame_size", c))
401 .and_then(|v| v.as_i128())
402 {
403 http2_builder.max_frame_size(max_frame_size as u32);
404 }
405 if let Some(max_concurrent_streams) = global_configuration
406 .as_deref()
407 .and_then(|c| get_value!("h2_max_concurrent_streams", c))
408 .and_then(|v| v.as_i128())
409 {
410 http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
411 }
412 if let Some(max_header_list_size) = global_configuration
413 .as_deref()
414 .and_then(|c| get_value!("h2_max_header_list_size", c))
415 .and_then(|v| v.as_i128())
416 {
417 http2_builder.max_header_list_size(max_header_list_size as u32);
418 }
419 if let Some(enable_connect_protocol) = global_configuration
420 .as_deref()
421 .and_then(|c| get_value!("h2_enable_connect_protocol", c))
422 .and_then(|v| v.as_bool())
423 {
424 if enable_connect_protocol {
425 http2_builder.enable_connect_protocol();
426 }
427 }
428
429 let configurations_clone = configurations.clone();
430 let mut http_future = http2_builder.serve_connection(
431 io,
432 service_fn(move |request: Request<Incoming>| {
433 let (request_parts, request_body) = request.into_parts();
434 let request = Request::from_parts(
435 request_parts,
436 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
437 );
438 request_handler(
439 request,
440 client_address,
441 server_address,
442 true,
443 configurations_clone.clone(),
444 if http3_enabled {
445 Some(server_address.port())
446 } else {
447 None
448 },
449 acme_http_01_resolvers.clone(),
450 proxy_protocol_client_address,
451 proxy_protocol_server_address,
452 )
453 }),
454 );
455 let http_future_result = crate::runtime::select! {
456 result = &mut http_future => {
457 result
458 }
459 _ = shutdown_rx.cancelled() => {
460 std::pin::Pin::new(&mut http_future).graceful_shutdown();
461 http_future.await
462 }
463 };
464 if let Err(err) = http_future_result {
465 for logging_tx in configurations
466 .find_global_configuration()
467 .as_ref()
468 .map_or(&vec![], |c| &c.observability.log_channels)
469 {
470 logging_tx
471 .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
472 .await
473 .unwrap_or_default();
474 }
475 }
476 } else {
477 #[cfg(feature = "runtime-monoio")]
478 let io = MonoioIo::new(SendAsyncIo::new(tls_stream));
479
480 #[cfg(feature = "runtime-monoio")]
481 let http1_builder = {
482 let mut http1_builder = hyper::server::conn::http1::Builder::new();
483
484 http1_builder.timer(MonoioTimer);
486
487 http1_builder
488 };
489 #[cfg(feature = "runtime-tokio")]
490 let http1_builder = {
491 let mut http1_builder = hyper::server::conn::http1::Builder::new();
492
493 http1_builder.timer(TokioTimer::new());
495
496 http1_builder
497 };
498
499 let configurations_clone = configurations.clone();
500 let mut http_future = http1_builder
501 .serve_connection(
502 io,
503 service_fn(move |request: Request<Incoming>| {
504 let (request_parts, request_body) = request.into_parts();
505 let request = Request::from_parts(
506 request_parts,
507 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
508 );
509 request_handler(
510 request,
511 client_address,
512 server_address,
513 true,
514 configurations_clone.clone(),
515 if http3_enabled {
516 Some(server_address.port())
517 } else {
518 None
519 },
520 acme_http_01_resolvers.clone(),
521 proxy_protocol_client_address,
522 proxy_protocol_server_address,
523 )
524 }),
525 )
526 .with_upgrades();
527 let http_future_result = crate::runtime::select! {
528 result = &mut http_future => {
529 result
530 }
531 _ = shutdown_rx.cancelled() => {
532 std::pin::Pin::new(&mut http_future).graceful_shutdown();
533 http_future.await
534 }
535 };
536 if let Err(err) = http_future_result {
537 for logging_tx in configurations
538 .find_global_configuration()
539 .as_ref()
540 .map_or(&vec![], |c| &c.observability.log_channels)
541 {
542 logging_tx
543 .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
544 .await
545 .unwrap_or_default();
546 }
547 }
548 }
549 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
550 #[cfg(feature = "runtime-monoio")]
551 let io = MonoioIo::new(SendAsyncIo::new(stream));
552 #[cfg(feature = "runtime-tokio")]
553 let io = TokioIo::new(stream);
554
555 #[cfg(feature = "runtime-monoio")]
556 let http1_builder = {
557 let mut http1_builder = hyper::server::conn::http1::Builder::new();
558
559 http1_builder.timer(MonoioTimer);
561
562 http1_builder
563 };
564 #[cfg(feature = "runtime-tokio")]
565 let http1_builder = {
566 let mut http1_builder = hyper::server::conn::http1::Builder::new();
567
568 http1_builder.timer(TokioTimer::new());
570
571 http1_builder
572 };
573
574 let configurations_clone = configurations.clone();
575 let mut http_future = http1_builder
576 .serve_connection(
577 io,
578 service_fn(move |request: Request<Incoming>| {
579 let (request_parts, request_body) = request.into_parts();
580 let request = Request::from_parts(
581 request_parts,
582 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
583 );
584 request_handler(
585 request,
586 client_address,
587 server_address,
588 false,
589 configurations_clone.clone(),
590 if http3_enabled {
591 Some(server_address.port())
592 } else {
593 None
594 },
595 acme_http_01_resolvers.clone(),
596 proxy_protocol_client_address,
597 proxy_protocol_server_address,
598 )
599 }),
600 )
601 .with_upgrades();
602 let http_future_result = crate::runtime::select! {
603 result = &mut http_future => {
604 result
605 }
606 _ = shutdown_rx.cancelled() => {
607 std::pin::Pin::new(&mut http_future).graceful_shutdown();
608 http_future.await
609 }
610 };
611 if let Err(err) = http_future_result {
612 for logging_tx in configurations
613 .find_global_configuration()
614 .as_ref()
615 .map_or(&vec![], |c| &c.observability.log_channels)
616 {
617 logging_tx
618 .send(LogMessage::new(format!("Error serving HTTP connection: {err}"), true))
619 .await
620 .unwrap_or_default();
621 }
622 }
623 }
624}
625
626#[allow(clippy::too_many_arguments)]
628async fn http_quic_handler_fn(
629 connection_attempt: quinn::Incoming,
630 client_address: SocketAddr,
631 server_address: SocketAddr,
632 configurations: Arc<ServerConfigurations>,
633 connection_reference: Arc<()>,
634 shutdown_rx: CancellationToken,
635) {
636 match connection_attempt.await {
637 Ok(connection) => {
638 let _connection_reference = Arc::downgrade(&connection_reference);
639 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
640 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
641 Ok(h3_conn) => h3_conn,
642 Err(err) => {
643 for logging_tx in configurations
644 .find_global_configuration()
645 .as_ref()
646 .map_or(&vec![], |c| &c.observability.log_channels)
647 {
648 logging_tx
649 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
650 .await
651 .unwrap_or_default();
652 }
653 return;
654 }
655 };
656
657 loop {
658 match h3_conn.accept().await {
659 Ok(Some(resolver)) => {
660 let configurations = configurations.clone();
661 crate::runtime::spawn(async move {
662 let (request, stream) = match resolver.resolve_request().await {
663 Ok(resolved) => resolved,
664 Err(err) => {
665 for logging_tx in configurations
666 .find_global_configuration()
667 .as_ref()
668 .map_or(&vec![], |c| &c.observability.log_channels)
669 {
670 logging_tx
671 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
672 .await
673 .unwrap_or_default();
674 }
675 return;
676 }
677 };
678 let (mut send, receive) = stream.split();
679 let request_body_stream =
680 futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
681 loop {
682 if !is_body_finished {
683 match receive.recv_data().await {
684 Ok(Some(mut data)) => {
685 return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
686 }
687 Ok(None) => is_body_finished = true,
688 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
689 }
690 } else {
691 match receive.recv_trailers().await {
692 Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
693 Ok(None) => {
694 return None;
695 }
696 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
697 }
698 }
699 }
700 });
701 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
702 let (request_parts, _) = request.into_parts();
703 let request = Request::from_parts(request_parts, request_body);
704 let mut response = match request_handler(
705 request,
706 client_address,
707 server_address,
708 true,
709 configurations.clone(),
710 None,
711 Arc::new(tokio::sync::RwLock::new(vec![])),
712 None,
713 None,
714 )
715 .await
716 {
717 Ok(response) => response,
718 Err(err) => {
719 for logging_tx in configurations
720 .find_global_configuration()
721 .as_ref()
722 .map_or(&vec![], |c| &c.observability.log_channels)
723 {
724 logging_tx
725 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
726 .await
727 .unwrap_or_default();
728 }
729 return;
730 }
731 };
732 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
733 response.headers_mut().entry(hyper::header::DATE).or_insert(http_date);
734 }
735 let (response_parts, mut response_body) = response.into_parts();
736 if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
737 for logging_tx in configurations
738 .find_global_configuration()
739 .as_ref()
740 .map_or(&vec![], |c| &c.observability.log_channels)
741 {
742 logging_tx
743 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
744 .await
745 .unwrap_or_default();
746 }
747 return;
748 }
749 let mut had_trailers = false;
750 while let Some(chunk) = response_body.frame().await {
751 match chunk {
752 Ok(frame) => {
753 if frame.is_data() {
754 match frame.into_data() {
755 Ok(data) => {
756 if let Err(err) = send.send_data(data).await {
757 for logging_tx in configurations
758 .find_global_configuration()
759 .as_ref()
760 .map_or(&vec![], |c| &c.observability.log_channels)
761 {
762 logging_tx
763 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
764 .await
765 .unwrap_or_default();
766 }
767 return;
768 }
769 }
770 Err(_) => {
771 for logging_tx in configurations
772 .find_global_configuration()
773 .as_ref()
774 .map_or(&vec![], |c| &c.observability.log_channels)
775 {
776 logging_tx
777 .send(LogMessage::new(
778 "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
779 true,
780 ))
781 .await
782 .unwrap_or_default();
783 }
784 return;
785 }
786 }
787 } else if frame.is_trailers() {
788 match frame.into_trailers() {
789 Ok(trailers) => {
790 had_trailers = true;
791 if let Err(err) = send.send_trailers(trailers).await {
792 for logging_tx in configurations
793 .find_global_configuration()
794 .as_ref()
795 .map_or(&vec![], |c| &c.observability.log_channels)
796 {
797 logging_tx
798 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
799 .await
800 .unwrap_or_default();
801 }
802 return;
803 }
804 }
805 Err(_) => {
806 for logging_tx in configurations
807 .find_global_configuration()
808 .as_ref()
809 .map_or(&vec![], |c| &c.observability.log_channels)
810 {
811 logging_tx
812 .send(LogMessage::new(
813 "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
814 true,
815 ))
816 .await
817 .unwrap_or_default();
818 }
819 return;
820 }
821 }
822 }
823 }
824 Err(err) => {
825 for logging_tx in configurations
826 .find_global_configuration()
827 .as_ref()
828 .map_or(&vec![], |c| &c.observability.log_channels)
829 {
830 logging_tx
831 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
832 .await
833 .unwrap_or_default();
834 }
835 return;
836 }
837 }
838 }
839 if !had_trailers {
840 if let Err(err) = send.finish().await {
841 for logging_tx in configurations
842 .find_global_configuration()
843 .as_ref()
844 .map_or(&vec![], |c| &c.observability.log_channels)
845 {
846 logging_tx
847 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
848 .await
849 .unwrap_or_default();
850 }
851 }
852 }
853 });
854 }
855 Ok(None) => break,
856 Err(err) => {
857 for logging_tx in configurations
858 .find_global_configuration()
859 .as_ref()
860 .map_or(&vec![], |c| &c.observability.log_channels)
861 {
862 logging_tx
863 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
864 .await
865 .unwrap_or_default();
866 }
867 return;
868 }
869 }
870 if shutdown_rx.is_cancelled() {
871 h3_conn.shutdown(0).await.unwrap_or_default();
872 }
873 }
874 }
875 Err(err) => {
876 for logging_tx in configurations
877 .find_global_configuration()
878 .as_ref()
879 .map_or(&vec![], |c| &c.observability.log_channels)
880 {
881 logging_tx
882 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
883 .await
884 .unwrap_or_default();
885 }
886 }
887 }
888}