1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use async_channel::{Receiver, Sender};
8use bytes::{Buf, Bytes};
9use ferron_common::logging::LogMessage;
10use http_body_util::{BodyExt, StreamBody};
11use hyper::body::{Frame, Incoming};
12use hyper::service::service_fn;
13use hyper::{Request, Response};
14#[cfg(feature = "runtime-tokio")]
15use hyper_util::rt::{TokioIo, TokioTimer};
16#[cfg(feature = "runtime-monoio")]
17use monoio::io::IntoPollIo;
18#[cfg(feature = "runtime-monoio")]
19use monoio::net::tcp::stream_poll::TcpStreamPoll;
20#[cfg(feature = "runtime-monoio")]
21use monoio::net::TcpStream;
22#[cfg(feature = "runtime-monoio")]
23use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
24use rustls::server::Acceptor;
25use rustls::ServerConfig;
26#[cfg(feature = "runtime-tokio")]
27use tokio::net::TcpStream;
28use tokio_rustls::server::TlsStream;
29use tokio_rustls::LazyConfigAcceptor;
30use tokio_util::sync::CancellationToken;
31
32use crate::acme::ACME_TLS_ALPN_NAME;
33use crate::config::ServerConfigurations;
34use crate::get_value;
35use crate::listener_handler_communication::ConnectionData;
36use crate::request_handler::request_handler;
37use crate::util::read_proxy_header;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40
41static HTTP3_INVALID_HEADERS: [hyper::header::HeaderName; 5] = [
42 hyper::header::HeaderName::from_static("keep-alive"),
43 hyper::header::HeaderName::from_static("proxy-connection"),
44 hyper::header::TRANSFER_ENCODING,
45 hyper::header::TE,
46 hyper::header::UPGRADE,
47];
48
49#[cfg(feature = "runtime-tokio")]
51#[derive(Clone, Copy, Debug)]
52pub struct TokioLocalExecutor;
53
54#[cfg(feature = "runtime-tokio")]
55impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
56where
57 F: std::future::Future + 'static,
58 F::Output: 'static,
59{
60 #[inline]
61 fn execute(&self, fut: F) {
62 tokio::task::spawn_local(fut);
63 }
64}
65
66#[allow(clippy::too_many_arguments)]
68pub fn create_http_handler(
69 configurations: Arc<ServerConfigurations>,
70 rx: Receiver<ConnectionData>,
71 enable_uring: Option<bool>,
72 tls_configs: HashMap<u16, Arc<ServerConfig>>,
73 http3_enabled: bool,
74 acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
75 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
76 enable_proxy_protocol: bool,
77 io_uring_disabled: Sender<Option<std::io::Error>>,
78) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
79 let shutdown_tx = CancellationToken::new();
80 let shutdown_rx = shutdown_tx.clone();
81 let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
82 std::thread::Builder::new()
83 .name("Request handler".to_string())
84 .spawn(move || {
85 let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
86 Ok(rt) => rt,
87 Err(error) => {
88 handler_init_tx
89 .send_blocking(Some(
90 anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
91 ))
92 .unwrap_or_default();
93 return;
94 }
95 };
96 io_uring_disabled
97 .send_blocking(rt.return_io_uring_error())
98 .unwrap_or_default();
99 rt.run(async move {
100 if let Some(error) = http_handler_fn(
101 configurations,
102 rx,
103 &handler_init_tx,
104 shutdown_rx,
105 tls_configs,
106 http3_enabled,
107 acme_tls_alpn_01_configs,
108 acme_http_01_resolvers,
109 enable_proxy_protocol,
110 )
111 .await
112 .err()
113 {
114 handler_init_tx.send(Some(error)).await.unwrap_or_default();
115 }
116 });
117 })?;
118
119 if let Some(error) = listen_error_rx.recv_blocking()? {
120 Err(error)?;
121 }
122
123 Ok(shutdown_tx)
124}
125
126#[inline]
128#[allow(clippy::too_many_arguments)]
129async fn http_handler_fn(
130 configurations: Arc<ServerConfigurations>,
131 rx: Receiver<ConnectionData>,
132 handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
133 shutdown_rx: CancellationToken,
134 tls_configs: HashMap<u16, Arc<ServerConfig>>,
135 http3_enabled: bool,
136 acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
137 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
138 enable_proxy_protocol: bool,
139) -> Result<(), Box<dyn Error + Send + Sync>> {
140 handler_init_tx.send(None).await.unwrap_or_default();
141
142 let connections_references = Arc::new(());
143
144 loop {
145 let conn_data = crate::runtime::select! {
146 result = rx.recv() => {
147 if let Ok(recv_data) = result {
148 recv_data
149 } else {
150 break;
151 }
152 }
153 _ = shutdown_rx.cancelled() => {
154 break;
155 }
156 };
157 let configurations = configurations.clone();
158 let tls_config = if matches!(
159 conn_data.connection,
160 crate::listener_handler_communication::Connection::Quic(..)
161 ) {
162 None
163 } else {
164 tls_configs.get(&conn_data.server_address.port()).cloned()
165 };
166 let acme_tls_alpn_01_config = if matches!(
167 conn_data.connection,
168 crate::listener_handler_communication::Connection::Quic(..)
169 ) {
170 None
171 } else {
172 acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
173 };
174 let acme_http_01_resolvers = acme_http_01_resolvers.clone();
175 let connections_references_cloned = connections_references.clone();
176 let shutdown_rx_clone = shutdown_rx.clone();
177 crate::runtime::spawn(async move {
178 match conn_data.connection {
179 crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
180 #[cfg(feature = "runtime-monoio")]
183 let _ = tcp_stream.set_nonblocking(monoio::utils::is_legacy());
184
185 #[cfg(feature = "runtime-monoio")]
186 let tcp_stream = match TcpStream::from_std(tcp_stream) {
187 Ok(stream) => stream,
188 Err(err) => {
189 for logging_tx in configurations
190 .find_global_configuration()
191 .as_ref()
192 .map_or(&vec![], |c| &c.observability.log_channels)
193 {
194 logging_tx
195 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
196 .await
197 .unwrap_or_default();
198 }
199 return;
200 }
201 };
202 let encrypted = tls_config.is_some();
203 http_tcp_handler_fn(
204 tcp_stream,
205 conn_data.client_address,
206 conn_data.server_address,
207 configurations,
208 tls_config,
209 http3_enabled && encrypted,
210 connections_references_cloned,
211 acme_tls_alpn_01_config,
212 acme_http_01_resolvers,
213 enable_proxy_protocol,
214 shutdown_rx_clone,
215 )
216 .await;
217 }
218 crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
219 http_quic_handler_fn(
220 quic_incoming,
221 conn_data.client_address,
222 conn_data.server_address,
223 configurations,
224 connections_references_cloned,
225 shutdown_rx_clone,
226 )
227 .await;
228 }
229 }
230 });
231 }
232
233 while Arc::weak_count(&connections_references) > 0 {
234 crate::runtime::sleep(Duration::from_millis(100)).await;
235 }
236
237 Ok(())
238}
239
240#[allow(clippy::large_enum_variant)]
242#[cfg(feature = "runtime-monoio")]
243enum MaybeTlsStream {
244 Tls(TlsStream<SendAsyncIo<TcpStreamPoll>>),
246
247 Plain(SendAsyncIo<TcpStreamPoll>),
249}
250
251#[allow(clippy::large_enum_variant)]
252#[cfg(feature = "runtime-tokio")]
253enum MaybeTlsStream {
254 Tls(TlsStream<TcpStream>),
256
257 Plain(TcpStream),
259}
260
261#[allow(clippy::too_many_arguments)]
263async fn http_tcp_handler_fn(
264 tcp_stream: TcpStream,
265 client_address: SocketAddr,
266 server_address: SocketAddr,
267 configurations: Arc<ServerConfigurations>,
268 tls_config: Option<Arc<ServerConfig>>,
269 http3_enabled: bool,
270 connection_reference: Arc<()>,
271 acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
272 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
273 enable_proxy_protocol: bool,
274 shutdown_rx: CancellationToken,
275) {
276 let _connection_reference = Arc::downgrade(&connection_reference);
277 #[cfg(feature = "runtime-monoio")]
278 let tcp_stream = match tcp_stream.into_poll_io() {
279 Ok(stream) => SendAsyncIo::new(stream),
280 Err(err) => {
281 for logging_tx in configurations
282 .find_global_configuration()
283 .as_ref()
284 .map_or(&vec![], |c| &c.observability.log_channels)
285 {
286 logging_tx
287 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
288 .await
289 .unwrap_or_default();
290 }
291 return;
292 }
293 };
294
295 let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
297 match read_proxy_header(tcp_stream).await {
299 Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
300 Err(err) => {
301 for logging_tx in configurations
302 .find_global_configuration()
303 .as_ref()
304 .map_or(&vec![], |c| &c.observability.log_channels)
305 {
306 logging_tx
307 .send(LogMessage::new(
308 format!("Error reading PROXY protocol header: {err}"),
309 true,
310 ))
311 .await
312 .unwrap_or_default();
313 }
314 return;
315 }
316 }
317 } else {
318 (tcp_stream, None, None)
319 };
320
321 let maybe_tls_stream = if let Some(tls_config) = tls_config {
322 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
323 Ok(start_handshake) => start_handshake,
324 Err(err) => {
325 for logging_tx in configurations
326 .find_global_configuration()
327 .as_ref()
328 .map_or(&vec![], |c| &c.observability.log_channels)
329 {
330 logging_tx
331 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
332 .await
333 .unwrap_or_default();
334 }
335 return;
336 }
337 };
338
339 if let Some(acme_config) = acme_tls_alpn_01_config {
340 if start_handshake
341 .client_hello()
342 .alpn()
343 .into_iter()
344 .flatten()
345 .eq([ACME_TLS_ALPN_NAME])
346 {
347 match start_handshake.into_stream(acme_config).await {
348 Ok(_) => (),
349 Err(err) => {
350 for logging_tx in configurations
351 .find_global_configuration()
352 .as_ref()
353 .map_or(&vec![], |c| &c.observability.log_channels)
354 {
355 logging_tx
356 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
357 .await
358 .unwrap_or_default();
359 }
360 return;
361 }
362 };
363 return;
364 }
365 }
366
367 let tls_stream = match start_handshake.into_stream(tls_config).await {
368 Ok(tls_stream) => tls_stream,
369 Err(err) => {
370 for logging_tx in configurations
371 .find_global_configuration()
372 .as_ref()
373 .map_or(&vec![], |c| &c.observability.log_channels)
374 {
375 logging_tx
376 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
377 .await
378 .unwrap_or_default();
379 }
380 return;
381 }
382 };
383
384 MaybeTlsStream::Tls(tls_stream)
385 } else {
386 MaybeTlsStream::Plain(tcp_stream)
387 };
388
389 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
390 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
391 let is_http2 = alpn_protocol == Some("h2".as_bytes());
392
393 #[cfg(feature = "runtime-tokio")]
394 let io = TokioIo::new(tls_stream);
395
396 if is_http2 {
397 #[cfg(feature = "runtime-monoio")]
399 let io = MonoioIo::new(tls_stream);
400
401 #[cfg(feature = "runtime-monoio")]
402 let mut http2_builder = {
403 let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
404 http2_builder.timer(MonoioTimer);
405 http2_builder
406 };
407 #[cfg(feature = "runtime-tokio")]
408 let mut http2_builder = {
409 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
410 http2_builder.timer(TokioTimer::new());
411 http2_builder
412 };
413
414 let global_configuration = configurations.find_global_configuration();
415
416 if let Some(initial_window_size) = global_configuration
417 .as_deref()
418 .and_then(|c| get_value!("h2_initial_window_size", c))
419 .and_then(|v| v.as_i128())
420 {
421 http2_builder.initial_stream_window_size(initial_window_size as u32);
422 }
423 if let Some(max_frame_size) = global_configuration
424 .as_deref()
425 .and_then(|c| get_value!("h2_max_frame_size", c))
426 .and_then(|v| v.as_i128())
427 {
428 http2_builder.max_frame_size(max_frame_size as u32);
429 }
430 if let Some(max_concurrent_streams) = global_configuration
431 .as_deref()
432 .and_then(|c| get_value!("h2_max_concurrent_streams", c))
433 .and_then(|v| v.as_i128())
434 {
435 http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
436 }
437 if let Some(max_header_list_size) = global_configuration
438 .as_deref()
439 .and_then(|c| get_value!("h2_max_header_list_size", c))
440 .and_then(|v| v.as_i128())
441 {
442 http2_builder.max_header_list_size(max_header_list_size as u32);
443 }
444 if let Some(enable_connect_protocol) = global_configuration
445 .as_deref()
446 .and_then(|c| get_value!("h2_enable_connect_protocol", c))
447 .and_then(|v| v.as_bool())
448 {
449 if enable_connect_protocol {
450 http2_builder.enable_connect_protocol();
451 }
452 }
453
454 let configurations_clone = configurations.clone();
455 let mut http_future = http2_builder.serve_connection(
456 io,
457 service_fn(move |request: Request<Incoming>| {
458 let (request_parts, request_body) = request.into_parts();
459 let request = Request::from_parts(
460 request_parts,
461 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
462 );
463 request_handler(
464 request,
465 client_address,
466 server_address,
467 true,
468 configurations_clone.clone(),
469 if http3_enabled {
470 Some(server_address.port())
471 } else {
472 None
473 },
474 acme_http_01_resolvers.clone(),
475 proxy_protocol_client_address,
476 proxy_protocol_server_address,
477 )
478 }),
479 );
480 let http_future_result = crate::runtime::select! {
481 result = &mut http_future => {
482 result
483 }
484 _ = shutdown_rx.cancelled() => {
485 std::pin::Pin::new(&mut http_future).graceful_shutdown();
486 http_future.await
487 }
488 };
489 if let Err(err) = http_future_result {
490 let error_to_log = if err.is_user() {
491 err.source().unwrap_or(&err)
492 } else {
493 &err
494 };
495 for logging_tx in configurations
496 .find_global_configuration()
497 .as_ref()
498 .map_or(&vec![], |c| &c.observability.log_channels)
499 {
500 logging_tx
501 .send(LogMessage::new(
502 format!("Error serving HTTPS connection: {error_to_log}"),
503 true,
504 ))
505 .await
506 .unwrap_or_default();
507 }
508 }
509 } else {
510 #[cfg(feature = "runtime-monoio")]
511 let io = MonoioIo::new(tls_stream);
512
513 #[cfg(feature = "runtime-monoio")]
514 let http1_builder = {
515 let mut http1_builder = hyper::server::conn::http1::Builder::new();
516
517 http1_builder.timer(MonoioTimer);
519
520 http1_builder
521 };
522 #[cfg(feature = "runtime-tokio")]
523 let http1_builder = {
524 let mut http1_builder = hyper::server::conn::http1::Builder::new();
525
526 http1_builder.timer(TokioTimer::new());
528
529 http1_builder
530 };
531
532 let configurations_clone = configurations.clone();
533 let mut http_future = http1_builder
534 .serve_connection(
535 io,
536 service_fn(move |request: Request<Incoming>| {
537 let (request_parts, request_body) = request.into_parts();
538 let request = Request::from_parts(
539 request_parts,
540 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
541 );
542 request_handler(
543 request,
544 client_address,
545 server_address,
546 true,
547 configurations_clone.clone(),
548 if http3_enabled {
549 Some(server_address.port())
550 } else {
551 None
552 },
553 acme_http_01_resolvers.clone(),
554 proxy_protocol_client_address,
555 proxy_protocol_server_address,
556 )
557 }),
558 )
559 .with_upgrades();
560 let http_future_result = crate::runtime::select! {
561 result = &mut http_future => {
562 result
563 }
564 _ = shutdown_rx.cancelled() => {
565 std::pin::Pin::new(&mut http_future).graceful_shutdown();
566 http_future.await
567 }
568 };
569 if let Err(err) = http_future_result {
570 let error_to_log = if err.is_user() {
571 err.source().unwrap_or(&err)
572 } else {
573 &err
574 };
575 for logging_tx in configurations
576 .find_global_configuration()
577 .as_ref()
578 .map_or(&vec![], |c| &c.observability.log_channels)
579 {
580 logging_tx
581 .send(LogMessage::new(
582 format!("Error serving HTTPS connection: {error_to_log}"),
583 true,
584 ))
585 .await
586 .unwrap_or_default();
587 }
588 }
589 }
590 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
591 #[cfg(feature = "runtime-monoio")]
592 let io = MonoioIo::new(stream);
593 #[cfg(feature = "runtime-tokio")]
594 let io = TokioIo::new(stream);
595
596 #[cfg(feature = "runtime-monoio")]
597 let http1_builder = {
598 let mut http1_builder = hyper::server::conn::http1::Builder::new();
599
600 http1_builder.timer(MonoioTimer);
602
603 http1_builder
604 };
605 #[cfg(feature = "runtime-tokio")]
606 let http1_builder = {
607 let mut http1_builder = hyper::server::conn::http1::Builder::new();
608
609 http1_builder.timer(TokioTimer::new());
611
612 http1_builder
613 };
614
615 let configurations_clone = configurations.clone();
616 let mut http_future = http1_builder
617 .serve_connection(
618 io,
619 service_fn(move |request: Request<Incoming>| {
620 let (request_parts, request_body) = request.into_parts();
621 let request = Request::from_parts(
622 request_parts,
623 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
624 );
625 request_handler(
626 request,
627 client_address,
628 server_address,
629 false,
630 configurations_clone.clone(),
631 if http3_enabled {
632 Some(server_address.port())
633 } else {
634 None
635 },
636 acme_http_01_resolvers.clone(),
637 proxy_protocol_client_address,
638 proxy_protocol_server_address,
639 )
640 }),
641 )
642 .with_upgrades();
643 let http_future_result = crate::runtime::select! {
644 result = &mut http_future => {
645 result
646 }
647 _ = shutdown_rx.cancelled() => {
648 std::pin::Pin::new(&mut http_future).graceful_shutdown();
649 http_future.await
650 }
651 };
652 if let Err(err) = http_future_result {
653 let error_to_log = if err.is_user() {
654 err.source().unwrap_or(&err)
655 } else {
656 &err
657 };
658 for logging_tx in configurations
659 .find_global_configuration()
660 .as_ref()
661 .map_or(&vec![], |c| &c.observability.log_channels)
662 {
663 logging_tx
664 .send(LogMessage::new(
665 format!("Error serving HTTP connection: {error_to_log}"),
666 true,
667 ))
668 .await
669 .unwrap_or_default();
670 }
671 }
672 }
673}
674
675#[inline]
677#[allow(clippy::too_many_arguments)]
678async fn http_quic_handler_fn(
679 connection_attempt: quinn::Incoming,
680 client_address: SocketAddr,
681 server_address: SocketAddr,
682 configurations: Arc<ServerConfigurations>,
683 connection_reference: Arc<()>,
684 shutdown_rx: CancellationToken,
685) {
686 match connection_attempt.await {
687 Ok(connection) => {
688 let connection_reference = Arc::downgrade(&connection_reference);
689 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
690 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
691 Ok(h3_conn) => h3_conn,
692 Err(err) => {
693 for logging_tx in configurations
694 .find_global_configuration()
695 .as_ref()
696 .map_or(&vec![], |c| &c.observability.log_channels)
697 {
698 logging_tx
699 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
700 .await
701 .unwrap_or_default();
702 }
703 return;
704 }
705 };
706
707 loop {
708 match crate::runtime::select! {
709 biased;
710
711 _ = shutdown_rx.cancelled() => {
712 h3_conn.shutdown(0).await.unwrap_or_default();
713 return;
714 }
715 result = h3_conn.accept() => {
716 result
717 }
718 } {
719 Ok(Some(resolver)) => {
720 let configurations = configurations.clone();
721 let connection_reference = connection_reference.clone();
722 crate::runtime::spawn(async move {
723 let _connection_reference = connection_reference;
724 let (request, stream) = match resolver.resolve_request().await {
725 Ok(resolved) => resolved,
726 Err(err) => {
727 if !err.is_h3_no_error() {
728 for logging_tx in configurations
729 .find_global_configuration()
730 .as_ref()
731 .map_or(&vec![], |c| &c.observability.log_channels)
732 {
733 logging_tx
734 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
735 .await
736 .unwrap_or_default();
737 }
738 }
739 return;
740 }
741 };
742 let (mut send, receive) = stream.split();
743 let request_body_stream =
744 futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
745 loop {
746 if !is_body_finished {
747 match receive.recv_data().await {
748 Ok(Some(mut data)) => {
749 return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
750 }
751 Ok(None) => is_body_finished = true,
752 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
753 }
754 } else {
755 match receive.recv_trailers().await {
756 Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
757 Ok(None) => {
758 return None;
759 }
760 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
761 }
762 }
763 }
764 });
765 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
766 let (request_parts, _) = request.into_parts();
767 let request = Request::from_parts(request_parts, request_body);
768 let mut response = match request_handler(
769 request,
770 client_address,
771 server_address,
772 true,
773 configurations.clone(),
774 None,
775 Arc::new(tokio::sync::RwLock::new(vec![])),
776 None,
777 None,
778 )
779 .await
780 {
781 Ok(response) => response,
782 Err(err) => {
783 for logging_tx in configurations
784 .find_global_configuration()
785 .as_ref()
786 .map_or(&vec![], |c| &c.observability.log_channels)
787 {
788 logging_tx
789 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
790 .await
791 .unwrap_or_default();
792 }
793 return;
794 }
795 };
796 let response_headers = response.headers_mut();
797 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
798 response_headers.entry(hyper::header::DATE).or_insert(http_date);
799 }
800 for header in &HTTP3_INVALID_HEADERS {
801 response_headers.remove(header);
802 }
803 if let Some(connection_header) = response_headers
804 .remove(hyper::header::CONNECTION)
805 .as_ref()
806 .and_then(|v| v.to_str().ok())
807 {
808 for name in connection_header.split(',') {
809 response_headers.remove(name.trim());
810 }
811 }
812 let (response_parts, mut response_body) = response.into_parts();
813 if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
814 if !err.is_h3_no_error() {
815 for logging_tx in configurations
816 .find_global_configuration()
817 .as_ref()
818 .map_or(&vec![], |c| &c.observability.log_channels)
819 {
820 logging_tx
821 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
822 .await
823 .unwrap_or_default();
824 }
825 }
826 return;
827 }
828 let mut had_trailers = false;
829 while let Some(chunk) = response_body.frame().await {
830 match chunk {
831 Ok(frame) => {
832 if frame.is_data() {
833 match frame.into_data() {
834 Ok(data) => {
835 if let Err(err) = send.send_data(data).await {
836 if !err.is_h3_no_error() {
837 for logging_tx in configurations
838 .find_global_configuration()
839 .as_ref()
840 .map_or(&vec![], |c| &c.observability.log_channels)
841 {
842 logging_tx
843 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
844 .await
845 .unwrap_or_default();
846 }
847 }
848 return;
849 }
850 }
851 Err(_) => {
852 for logging_tx in configurations
853 .find_global_configuration()
854 .as_ref()
855 .map_or(&vec![], |c| &c.observability.log_channels)
856 {
857 logging_tx
858 .send(LogMessage::new(
859 "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
860 true,
861 ))
862 .await
863 .unwrap_or_default();
864 }
865 return;
866 }
867 }
868 } else if frame.is_trailers() {
869 match frame.into_trailers() {
870 Ok(trailers) => {
871 had_trailers = true;
872 if let Err(err) = send.send_trailers(trailers).await {
873 if !err.is_h3_no_error() {
874 for logging_tx in configurations
875 .find_global_configuration()
876 .as_ref()
877 .map_or(&vec![], |c| &c.observability.log_channels)
878 {
879 logging_tx
880 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
881 .await
882 .unwrap_or_default();
883 }
884 }
885 return;
886 }
887 }
888 Err(_) => {
889 for logging_tx in configurations
890 .find_global_configuration()
891 .as_ref()
892 .map_or(&vec![], |c| &c.observability.log_channels)
893 {
894 logging_tx
895 .send(LogMessage::new(
896 "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
897 true,
898 ))
899 .await
900 .unwrap_or_default();
901 }
902 return;
903 }
904 }
905 }
906 }
907 Err(err) => {
908 for logging_tx in configurations
909 .find_global_configuration()
910 .as_ref()
911 .map_or(&vec![], |c| &c.observability.log_channels)
912 {
913 logging_tx
914 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
915 .await
916 .unwrap_or_default();
917 }
918
919 return;
920 }
921 }
922 }
923 if !had_trailers {
924 if let Err(err) = send.finish().await {
925 if !err.is_h3_no_error() {
926 for logging_tx in configurations
927 .find_global_configuration()
928 .as_ref()
929 .map_or(&vec![], |c| &c.observability.log_channels)
930 {
931 logging_tx
932 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
933 .await
934 .unwrap_or_default();
935 }
936 }
937 }
938 }
939 });
940 }
941 Ok(None) => break,
942 Err(err) => {
943 if !err.is_h3_no_error() {
944 for logging_tx in configurations
945 .find_global_configuration()
946 .as_ref()
947 .map_or(&vec![], |c| &c.observability.log_channels)
948 {
949 logging_tx
950 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
951 .await
952 .unwrap_or_default();
953 }
954 }
955 return;
956 }
957 }
958 }
959 }
960 Err(err) => {
961 for logging_tx in configurations
962 .find_global_configuration()
963 .as_ref()
964 .map_or(&vec![], |c| &c.observability.log_channels)
965 {
966 logging_tx
967 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
968 .await
969 .unwrap_or_default();
970 }
971 }
972 }
973}