1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use arc_swap::ArcSwap;
8use async_channel::{Receiver, Sender};
9use bytes::{Buf, Bytes};
10use ferron_common::logging::LogMessage;
11use http_body_util::{BodyExt, StreamBody};
12use hyper::body::{Frame, Incoming};
13use hyper::service::service_fn;
14use hyper::{Request, Response};
15#[cfg(feature = "runtime-tokio")]
16use hyper_util::rt::{TokioIo, TokioTimer};
17#[cfg(feature = "runtime-monoio")]
18use monoio::io::IntoPollIo;
19#[cfg(feature = "runtime-monoio")]
20use monoio::net::tcp::stream_poll::TcpStreamPoll;
21#[cfg(feature = "runtime-monoio")]
22use monoio::net::TcpStream;
23#[cfg(feature = "runtime-monoio")]
24use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
25use rustls::server::Acceptor;
26use rustls::ServerConfig;
27#[cfg(feature = "runtime-tokio")]
28use tokio::net::TcpStream;
29use tokio_rustls::server::TlsStream;
30use tokio_rustls::LazyConfigAcceptor;
31use tokio_util::sync::CancellationToken;
32
33use crate::acme::ACME_TLS_ALPN_NAME;
34use crate::config::ServerConfigurations;
35use crate::get_value;
36use crate::listener_handler_communication::ConnectionData;
37use crate::request_handler::request_handler;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40use crate::util::{read_proxy_header, MultiCancel};
41
42static HTTP3_INVALID_HEADERS: [hyper::header::HeaderName; 5] = [
43 hyper::header::HeaderName::from_static("keep-alive"),
44 hyper::header::HeaderName::from_static("proxy-connection"),
45 hyper::header::TRANSFER_ENCODING,
46 hyper::header::TE,
47 hyper::header::UPGRADE,
48];
49
50pub struct ReloadableHandlerData {
52 pub acme_tls_alpn_01_configs: Arc<HashMap<u16, Arc<ServerConfig>>>,
54 pub acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
56 pub configurations: Arc<ServerConfigurations>,
58 pub tls_configs: Arc<HashMap<u16, Arc<ServerConfig>>>,
60 pub http3_enabled: bool,
62 pub enable_proxy_protocol: bool,
64}
65
66#[cfg(feature = "runtime-tokio")]
68#[derive(Clone, Copy, Debug)]
69pub struct TokioLocalExecutor;
70
71#[cfg(feature = "runtime-tokio")]
72impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
73where
74 F: std::future::Future + 'static,
75 F::Output: 'static,
76{
77 #[inline]
78 fn execute(&self, fut: F) {
79 tokio::task::spawn_local(fut);
80 }
81}
82
83pub fn create_http_handler(
85 reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
86 rx: Receiver<ConnectionData>,
87 enable_uring: Option<bool>,
88 io_uring_disabled: Sender<Option<std::io::Error>>,
89 multi_cancel: Arc<MultiCancel>,
90) -> Result<(CancellationToken, Sender<()>), Box<dyn Error + Send + Sync>> {
91 let shutdown_tx = CancellationToken::new();
92 let shutdown_rx = shutdown_tx.clone();
93 let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
94 let (graceful_tx, graceful_rx) = async_channel::unbounded();
95 std::thread::Builder::new()
96 .name("Request handler".to_string())
97 .spawn(move || {
98 let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
99 Ok(rt) => rt,
100 Err(error) => {
101 handler_init_tx
102 .send_blocking(Some(
103 anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
104 ))
105 .unwrap_or_default();
106 return;
107 }
108 };
109 io_uring_disabled
110 .send_blocking(rt.return_io_uring_error())
111 .unwrap_or_default();
112 rt.run(async move {
113 if let Some(error) = http_handler_fn(
114 reloadable_data,
115 rx,
116 &handler_init_tx,
117 shutdown_rx,
118 graceful_rx,
119 multi_cancel,
120 )
121 .await
122 .err()
123 {
124 handler_init_tx.send(Some(error)).await.unwrap_or_default();
125 }
126 });
127 })?;
128
129 if let Some(error) = listen_error_rx.recv_blocking()? {
130 Err(error)?;
131 }
132
133 Ok((shutdown_tx, graceful_tx))
134}
135
136#[inline]
138async fn http_handler_fn(
139 reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
140 rx: Receiver<ConnectionData>,
141 handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
142 shutdown_rx: CancellationToken,
143 graceful_rx: Receiver<()>,
144 multi_cancel: Arc<MultiCancel>,
145) -> Result<(), Box<dyn Error + Send + Sync>> {
146 handler_init_tx.send(None).await.unwrap_or_default();
147
148 let connections_references = Arc::new(());
149 let graceful_shutdown_token = Arc::new(ArcSwap::from_pointee(CancellationToken::new()));
150 let graceful_shutdown_token_clone = graceful_shutdown_token.clone();
151
152 let mut graceful_rx_recv_future = Box::pin(async move {
153 while graceful_rx.recv().await.is_ok() {
154 graceful_shutdown_token_clone
155 .swap(Arc::new(CancellationToken::new()))
156 .cancel();
157 }
158
159 futures_util::future::pending::<()>().await;
160 });
161
162 loop {
163 let conn_data = crate::runtime::select! {
164 biased;
165
166 _ = &mut graceful_rx_recv_future => {
167 break;
169 }
170 _ = shutdown_rx.cancelled() => {
171 break;
172 }
173 result = rx.recv() => {
174 if let Ok(recv_data) = result {
175 recv_data
176 } else {
177 break;
178 }
179 }
180 };
181 let ReloadableHandlerData {
182 configurations,
183 tls_configs,
184 http3_enabled,
185 acme_tls_alpn_01_configs,
186 acme_http_01_resolvers,
187 enable_proxy_protocol,
188 } = &**reloadable_data.load();
189 let configurations = configurations.clone();
190 let tls_config = if matches!(
191 conn_data.connection,
192 crate::listener_handler_communication::Connection::Quic(..)
193 ) {
194 None
195 } else {
196 tls_configs.get(&conn_data.server_address.port()).cloned()
197 };
198 let acme_tls_alpn_01_config = if matches!(
199 conn_data.connection,
200 crate::listener_handler_communication::Connection::Quic(..)
201 ) {
202 None
203 } else {
204 acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
205 };
206 let acme_http_01_resolvers = acme_http_01_resolvers.clone();
207 let connections_references_cloned = connections_references.clone();
208 let shutdown_rx_clone = shutdown_rx.clone();
209 let http3_enabled = *http3_enabled;
210 let enable_proxy_protocol = *enable_proxy_protocol;
211 let graceful_shutdown_token = graceful_shutdown_token.load().clone();
212 crate::runtime::spawn(async move {
213 match conn_data.connection {
214 crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
215 #[cfg(feature = "runtime-monoio")]
218 let _ = tcp_stream.set_nonblocking(monoio::utils::is_legacy());
219
220 #[cfg(feature = "runtime-monoio")]
221 let tcp_stream = match TcpStream::from_std(tcp_stream) {
222 Ok(stream) => stream,
223 Err(err) => {
224 for logging_tx in configurations
225 .find_global_configuration()
226 .as_ref()
227 .map_or(&vec![], |c| &c.observability.log_channels)
228 {
229 logging_tx
230 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
231 .await
232 .unwrap_or_default();
233 }
234 return;
235 }
236 };
237 let encrypted = tls_config.is_some();
238 http_tcp_handler_fn(
239 tcp_stream,
240 conn_data.client_address,
241 conn_data.server_address,
242 configurations,
243 tls_config,
244 http3_enabled && encrypted,
245 connections_references_cloned,
246 acme_tls_alpn_01_config,
247 acme_http_01_resolvers,
248 enable_proxy_protocol,
249 shutdown_rx_clone,
250 graceful_shutdown_token,
251 )
252 .await;
253 }
254 crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
255 http_quic_handler_fn(
256 quic_incoming,
257 conn_data.client_address,
258 conn_data.server_address,
259 configurations,
260 connections_references_cloned,
261 shutdown_rx_clone,
262 graceful_shutdown_token,
263 )
264 .await;
265 }
266 }
267 });
268 }
269
270 while Arc::weak_count(&connections_references) > 0 {
271 crate::runtime::sleep(Duration::from_millis(100)).await;
272 }
273
274 multi_cancel.cancel().await;
276
277 Ok(())
278}
279
280#[allow(clippy::large_enum_variant)]
282#[cfg(feature = "runtime-monoio")]
283enum MaybeTlsStream {
284 Tls(TlsStream<SendAsyncIo<TcpStreamPoll>>),
286
287 Plain(SendAsyncIo<TcpStreamPoll>),
289}
290
291#[allow(clippy::large_enum_variant)]
292#[cfg(feature = "runtime-tokio")]
293enum MaybeTlsStream {
294 Tls(TlsStream<TcpStream>),
296
297 Plain(TcpStream),
299}
300
301#[allow(clippy::too_many_arguments)]
303#[inline]
304async fn http_tcp_handler_fn(
305 tcp_stream: TcpStream,
306 client_address: SocketAddr,
307 server_address: SocketAddr,
308 configurations: Arc<ServerConfigurations>,
309 tls_config: Option<Arc<ServerConfig>>,
310 http3_enabled: bool,
311 connection_reference: Arc<()>,
312 acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
313 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
314 enable_proxy_protocol: bool,
315 shutdown_rx: CancellationToken,
316 graceful_shutdown_token: Arc<CancellationToken>,
317) {
318 let _connection_reference = Arc::downgrade(&connection_reference);
319 #[cfg(feature = "runtime-monoio")]
320 let tcp_stream = match tcp_stream.into_poll_io() {
321 Ok(stream) => SendAsyncIo::new(stream),
322 Err(err) => {
323 for logging_tx in configurations
324 .find_global_configuration()
325 .as_ref()
326 .map_or(&vec![], |c| &c.observability.log_channels)
327 {
328 logging_tx
329 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
330 .await
331 .unwrap_or_default();
332 }
333 return;
334 }
335 };
336
337 let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
339 match read_proxy_header(tcp_stream).await {
341 Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
342 Err(err) => {
343 for logging_tx in configurations
344 .find_global_configuration()
345 .as_ref()
346 .map_or(&vec![], |c| &c.observability.log_channels)
347 {
348 logging_tx
349 .send(LogMessage::new(
350 format!("Error reading PROXY protocol header: {err}"),
351 true,
352 ))
353 .await
354 .unwrap_or_default();
355 }
356 return;
357 }
358 }
359 } else {
360 (tcp_stream, None, None)
361 };
362
363 let maybe_tls_stream = if let Some(tls_config) = tls_config {
364 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
365 Ok(start_handshake) => start_handshake,
366 Err(err) => {
367 for logging_tx in configurations
368 .find_global_configuration()
369 .as_ref()
370 .map_or(&vec![], |c| &c.observability.log_channels)
371 {
372 logging_tx
373 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
374 .await
375 .unwrap_or_default();
376 }
377 return;
378 }
379 };
380
381 if let Some(acme_config) = acme_tls_alpn_01_config {
382 if start_handshake
383 .client_hello()
384 .alpn()
385 .into_iter()
386 .flatten()
387 .eq([ACME_TLS_ALPN_NAME])
388 {
389 if let Err(err) = start_handshake.into_stream(acme_config).await {
390 for logging_tx in configurations
391 .find_global_configuration()
392 .as_ref()
393 .map_or(&vec![], |c| &c.observability.log_channels)
394 {
395 logging_tx
396 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
397 .await
398 .unwrap_or_default();
399 }
400 return;
401 };
402 return;
403 }
404 }
405
406 let tls_stream = match start_handshake.into_stream(tls_config).await {
407 Ok(tls_stream) => tls_stream,
408 Err(err) => {
409 for logging_tx in configurations
410 .find_global_configuration()
411 .as_ref()
412 .map_or(&vec![], |c| &c.observability.log_channels)
413 {
414 logging_tx
415 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
416 .await
417 .unwrap_or_default();
418 }
419 return;
420 }
421 };
422
423 MaybeTlsStream::Tls(tls_stream)
424 } else {
425 MaybeTlsStream::Plain(tcp_stream)
426 };
427
428 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
429 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
430 let is_http2 = alpn_protocol == Some("h2".as_bytes());
431
432 #[cfg(feature = "runtime-tokio")]
433 let io = TokioIo::new(tls_stream);
434
435 if is_http2 {
436 #[cfg(feature = "runtime-monoio")]
438 let io = MonoioIo::new(tls_stream);
439
440 #[cfg(feature = "runtime-monoio")]
441 let mut http2_builder = {
442 let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
443 http2_builder.timer(MonoioTimer);
444 http2_builder
445 };
446 #[cfg(feature = "runtime-tokio")]
447 let mut http2_builder = {
448 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
449 http2_builder.timer(TokioTimer::new());
450 http2_builder
451 };
452
453 let global_configuration = configurations.find_global_configuration();
454
455 if let Some(initial_window_size) = global_configuration
456 .as_deref()
457 .and_then(|c| get_value!("h2_initial_window_size", c))
458 .and_then(|v| v.as_i128())
459 {
460 http2_builder.initial_stream_window_size(initial_window_size as u32);
461 }
462 if let Some(max_frame_size) = global_configuration
463 .as_deref()
464 .and_then(|c| get_value!("h2_max_frame_size", c))
465 .and_then(|v| v.as_i128())
466 {
467 http2_builder.max_frame_size(max_frame_size as u32);
468 }
469 if let Some(max_concurrent_streams) = global_configuration
470 .as_deref()
471 .and_then(|c| get_value!("h2_max_concurrent_streams", c))
472 .and_then(|v| v.as_i128())
473 {
474 http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
475 }
476 if let Some(max_header_list_size) = global_configuration
477 .as_deref()
478 .and_then(|c| get_value!("h2_max_header_list_size", c))
479 .and_then(|v| v.as_i128())
480 {
481 http2_builder.max_header_list_size(max_header_list_size as u32);
482 }
483 if let Some(enable_connect_protocol) = global_configuration
484 .as_deref()
485 .and_then(|c| get_value!("h2_enable_connect_protocol", c))
486 .and_then(|v| v.as_bool())
487 {
488 if enable_connect_protocol {
489 http2_builder.enable_connect_protocol();
490 }
491 }
492
493 let configurations_clone = configurations.clone();
494 let mut http_future = http2_builder.serve_connection(
495 io,
496 service_fn(move |request: Request<Incoming>| {
497 let (request_parts, request_body) = request.into_parts();
498 let request = Request::from_parts(
499 request_parts,
500 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
501 );
502 request_handler(
503 request,
504 client_address,
505 server_address,
506 true,
507 configurations_clone.clone(),
508 if http3_enabled {
509 Some(server_address.port())
510 } else {
511 None
512 },
513 acme_http_01_resolvers.clone(),
514 proxy_protocol_client_address,
515 proxy_protocol_server_address,
516 )
517 }),
518 );
519 let http_future_result = crate::runtime::select! {
520 result = &mut http_future => {
521 result
522 }
523 _ = shutdown_rx.cancelled() => {
524 std::pin::Pin::new(&mut http_future).graceful_shutdown();
525 http_future.await
526 }
527 _ = graceful_shutdown_token.cancelled() => {
528 std::pin::Pin::new(&mut http_future).graceful_shutdown();
529 http_future.await
530 }
531 };
532 if let Err(err) = http_future_result {
533 let error_to_log = if err.is_user() {
534 err.source().unwrap_or(&err)
535 } else {
536 &err
537 };
538 for logging_tx in configurations
539 .find_global_configuration()
540 .as_ref()
541 .map_or(&vec![], |c| &c.observability.log_channels)
542 {
543 logging_tx
544 .send(LogMessage::new(
545 format!("Error serving HTTPS connection: {error_to_log}"),
546 true,
547 ))
548 .await
549 .unwrap_or_default();
550 }
551 }
552 } else {
553 #[cfg(feature = "runtime-monoio")]
554 let io = MonoioIo::new(tls_stream);
555
556 #[cfg(feature = "runtime-monoio")]
557 let http1_builder = {
558 let mut http1_builder = hyper::server::conn::http1::Builder::new();
559
560 http1_builder.timer(MonoioTimer);
562
563 http1_builder
564 };
565 #[cfg(feature = "runtime-tokio")]
566 let http1_builder = {
567 let mut http1_builder = hyper::server::conn::http1::Builder::new();
568
569 http1_builder.timer(TokioTimer::new());
571
572 http1_builder
573 };
574
575 let configurations_clone = configurations.clone();
576 let mut http_future = http1_builder
577 .serve_connection(
578 io,
579 service_fn(move |request: Request<Incoming>| {
580 let (request_parts, request_body) = request.into_parts();
581 let request = Request::from_parts(
582 request_parts,
583 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
584 );
585 request_handler(
586 request,
587 client_address,
588 server_address,
589 true,
590 configurations_clone.clone(),
591 if http3_enabled {
592 Some(server_address.port())
593 } else {
594 None
595 },
596 acme_http_01_resolvers.clone(),
597 proxy_protocol_client_address,
598 proxy_protocol_server_address,
599 )
600 }),
601 )
602 .with_upgrades();
603 let http_future_result = crate::runtime::select! {
604 result = &mut http_future => {
605 result
606 }
607 _ = shutdown_rx.cancelled() => {
608 std::pin::Pin::new(&mut http_future).graceful_shutdown();
609 http_future.await
610 }
611 _ = graceful_shutdown_token.cancelled() => {
612 std::pin::Pin::new(&mut http_future).graceful_shutdown();
613 http_future.await
614 }
615 };
616 if let Err(err) = http_future_result {
617 let error_to_log = if err.is_user() {
618 err.source().unwrap_or(&err)
619 } else {
620 &err
621 };
622 for logging_tx in configurations
623 .find_global_configuration()
624 .as_ref()
625 .map_or(&vec![], |c| &c.observability.log_channels)
626 {
627 logging_tx
628 .send(LogMessage::new(
629 format!("Error serving HTTPS connection: {error_to_log}"),
630 true,
631 ))
632 .await
633 .unwrap_or_default();
634 }
635 }
636 }
637 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
638 #[cfg(feature = "runtime-monoio")]
639 let io = MonoioIo::new(stream);
640 #[cfg(feature = "runtime-tokio")]
641 let io = TokioIo::new(stream);
642
643 #[cfg(feature = "runtime-monoio")]
644 let http1_builder = {
645 let mut http1_builder = hyper::server::conn::http1::Builder::new();
646
647 http1_builder.timer(MonoioTimer);
649
650 http1_builder
651 };
652 #[cfg(feature = "runtime-tokio")]
653 let http1_builder = {
654 let mut http1_builder = hyper::server::conn::http1::Builder::new();
655
656 http1_builder.timer(TokioTimer::new());
658
659 http1_builder
660 };
661
662 let configurations_clone = configurations.clone();
663 let mut http_future = http1_builder
664 .serve_connection(
665 io,
666 service_fn(move |request: Request<Incoming>| {
667 let (request_parts, request_body) = request.into_parts();
668 let request = Request::from_parts(
669 request_parts,
670 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
671 );
672 request_handler(
673 request,
674 client_address,
675 server_address,
676 false,
677 configurations_clone.clone(),
678 if http3_enabled {
679 Some(server_address.port())
680 } else {
681 None
682 },
683 acme_http_01_resolvers.clone(),
684 proxy_protocol_client_address,
685 proxy_protocol_server_address,
686 )
687 }),
688 )
689 .with_upgrades();
690 let http_future_result = crate::runtime::select! {
691 result = &mut http_future => {
692 result
693 }
694 _ = shutdown_rx.cancelled() => {
695 std::pin::Pin::new(&mut http_future).graceful_shutdown();
696 http_future.await
697 }
698 _ = graceful_shutdown_token.cancelled() => {
699 std::pin::Pin::new(&mut http_future).graceful_shutdown();
700 http_future.await
701 }
702 };
703 if let Err(err) = http_future_result {
704 let error_to_log = if err.is_user() {
705 err.source().unwrap_or(&err)
706 } else {
707 &err
708 };
709 for logging_tx in configurations
710 .find_global_configuration()
711 .as_ref()
712 .map_or(&vec![], |c| &c.observability.log_channels)
713 {
714 logging_tx
715 .send(LogMessage::new(
716 format!("Error serving HTTP connection: {error_to_log}"),
717 true,
718 ))
719 .await
720 .unwrap_or_default();
721 }
722 }
723 }
724}
725
726#[inline]
728#[allow(clippy::too_many_arguments)]
729async fn http_quic_handler_fn(
730 connection_attempt: quinn::Incoming,
731 client_address: SocketAddr,
732 server_address: SocketAddr,
733 configurations: Arc<ServerConfigurations>,
734 connection_reference: Arc<()>,
735 shutdown_rx: CancellationToken,
736 graceful_shutdown_token: Arc<CancellationToken>,
737) {
738 match connection_attempt.await {
739 Ok(connection) => {
740 let connection_reference = Arc::downgrade(&connection_reference);
741 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
742 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
743 Ok(h3_conn) => h3_conn,
744 Err(err) => {
745 for logging_tx in configurations
746 .find_global_configuration()
747 .as_ref()
748 .map_or(&vec![], |c| &c.observability.log_channels)
749 {
750 logging_tx
751 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
752 .await
753 .unwrap_or_default();
754 }
755 return;
756 }
757 };
758
759 loop {
760 match crate::runtime::select! {
761 biased;
762
763 _ = shutdown_rx.cancelled() => {
764 h3_conn.shutdown(0).await.unwrap_or_default();
765 return;
766 }
767 _ = graceful_shutdown_token.cancelled() => {
768 h3_conn.shutdown(0).await.unwrap_or_default();
769 return;
770 }
771 result = h3_conn.accept() => {
772 result
773 }
774 } {
775 Ok(Some(resolver)) => {
776 let configurations = configurations.clone();
777 let connection_reference = connection_reference.clone();
778 crate::runtime::spawn(async move {
779 let _connection_reference = connection_reference;
780 let (request, stream) = match resolver.resolve_request().await {
781 Ok(resolved) => resolved,
782 Err(err) => {
783 if !err.is_h3_no_error() {
784 for logging_tx in configurations
785 .find_global_configuration()
786 .as_ref()
787 .map_or(&vec![], |c| &c.observability.log_channels)
788 {
789 logging_tx
790 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
791 .await
792 .unwrap_or_default();
793 }
794 }
795 return;
796 }
797 };
798 let (mut send, receive) = stream.split();
799 let request_body_stream =
800 futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
801 loop {
802 if !is_body_finished {
803 match receive.recv_data().await {
804 Ok(Some(mut data)) => {
805 return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
806 }
807 Ok(None) => is_body_finished = true,
808 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
809 }
810 } else {
811 match receive.recv_trailers().await {
812 Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
813 Ok(None) => {
814 return None;
815 }
816 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
817 }
818 }
819 }
820 });
821 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
822 let (request_parts, _) = request.into_parts();
823 let request = Request::from_parts(request_parts, request_body);
824 let mut response = match request_handler(
825 request,
826 client_address,
827 server_address,
828 true,
829 configurations.clone(),
830 None,
831 Arc::new(tokio::sync::RwLock::new(vec![])),
832 None,
833 None,
834 )
835 .await
836 {
837 Ok(response) => response,
838 Err(err) => {
839 for logging_tx in configurations
840 .find_global_configuration()
841 .as_ref()
842 .map_or(&vec![], |c| &c.observability.log_channels)
843 {
844 logging_tx
845 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
846 .await
847 .unwrap_or_default();
848 }
849 return;
850 }
851 };
852 let response_headers = response.headers_mut();
853 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
854 response_headers.entry(hyper::header::DATE).or_insert(http_date);
855 }
856 for header in &HTTP3_INVALID_HEADERS {
857 response_headers.remove(header);
858 }
859 if let Some(connection_header) = response_headers
860 .remove(hyper::header::CONNECTION)
861 .as_ref()
862 .and_then(|v| v.to_str().ok())
863 {
864 for name in connection_header.split(',') {
865 response_headers.remove(name.trim());
866 }
867 }
868 let (response_parts, mut response_body) = response.into_parts();
869 if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
870 if !err.is_h3_no_error() {
871 for logging_tx in configurations
872 .find_global_configuration()
873 .as_ref()
874 .map_or(&vec![], |c| &c.observability.log_channels)
875 {
876 logging_tx
877 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
878 .await
879 .unwrap_or_default();
880 }
881 }
882 return;
883 }
884 let mut had_trailers = false;
885 while let Some(chunk) = response_body.frame().await {
886 match chunk {
887 Ok(frame) => {
888 if frame.is_data() {
889 match frame.into_data() {
890 Ok(data) => {
891 if let Err(err) = send.send_data(data).await {
892 if !err.is_h3_no_error() {
893 for logging_tx in configurations
894 .find_global_configuration()
895 .as_ref()
896 .map_or(&vec![], |c| &c.observability.log_channels)
897 {
898 logging_tx
899 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
900 .await
901 .unwrap_or_default();
902 }
903 }
904 return;
905 }
906 }
907 Err(_) => {
908 for logging_tx in configurations
909 .find_global_configuration()
910 .as_ref()
911 .map_or(&vec![], |c| &c.observability.log_channels)
912 {
913 logging_tx
914 .send(LogMessage::new(
915 "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
916 true,
917 ))
918 .await
919 .unwrap_or_default();
920 }
921 return;
922 }
923 }
924 } else if frame.is_trailers() {
925 match frame.into_trailers() {
926 Ok(trailers) => {
927 had_trailers = true;
928 if let Err(err) = send.send_trailers(trailers).await {
929 if !err.is_h3_no_error() {
930 for logging_tx in configurations
931 .find_global_configuration()
932 .as_ref()
933 .map_or(&vec![], |c| &c.observability.log_channels)
934 {
935 logging_tx
936 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
937 .await
938 .unwrap_or_default();
939 }
940 }
941 return;
942 }
943 }
944 Err(_) => {
945 for logging_tx in configurations
946 .find_global_configuration()
947 .as_ref()
948 .map_or(&vec![], |c| &c.observability.log_channels)
949 {
950 logging_tx
951 .send(LogMessage::new(
952 "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
953 true,
954 ))
955 .await
956 .unwrap_or_default();
957 }
958 return;
959 }
960 }
961 }
962 }
963 Err(err) => {
964 for logging_tx in configurations
965 .find_global_configuration()
966 .as_ref()
967 .map_or(&vec![], |c| &c.observability.log_channels)
968 {
969 logging_tx
970 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
971 .await
972 .unwrap_or_default();
973 }
974
975 return;
976 }
977 }
978 }
979 if !had_trailers {
980 if let Err(err) = send.finish().await {
981 if !err.is_h3_no_error() {
982 for logging_tx in configurations
983 .find_global_configuration()
984 .as_ref()
985 .map_or(&vec![], |c| &c.observability.log_channels)
986 {
987 logging_tx
988 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
989 .await
990 .unwrap_or_default();
991 }
992 }
993 }
994 }
995 });
996 }
997 Ok(None) => break,
998 Err(err) => {
999 if !err.is_h3_no_error() {
1000 for logging_tx in configurations
1001 .find_global_configuration()
1002 .as_ref()
1003 .map_or(&vec![], |c| &c.observability.log_channels)
1004 {
1005 logging_tx
1006 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
1007 .await
1008 .unwrap_or_default();
1009 }
1010 }
1011 return;
1012 }
1013 }
1014 }
1015 }
1016 Err(err) => {
1017 for logging_tx in configurations
1018 .find_global_configuration()
1019 .as_ref()
1020 .map_or(&vec![], |c| &c.observability.log_channels)
1021 {
1022 logging_tx
1023 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
1024 .await
1025 .unwrap_or_default();
1026 }
1027 }
1028 }
1029}