1use std::collections::HashMap;
2use std::error::Error;
3use std::net::SocketAddr;
4use std::sync::Arc;
5use std::time::{Duration, SystemTime};
6
7use async_channel::{Receiver, Sender};
8use bytes::{Buf, Bytes};
9use http_body_util::{BodyExt, StreamBody};
10use hyper::body::{Frame, Incoming};
11use hyper::service::service_fn;
12use hyper::{Request, Response};
13#[cfg(feature = "runtime-tokio")]
14use hyper_util::rt::{TokioIo, TokioTimer};
15#[cfg(feature = "runtime-monoio")]
16use monoio::io::IntoPollIo;
17#[cfg(feature = "runtime-monoio")]
18use monoio::net::tcp::stream_poll::TcpStreamPoll;
19#[cfg(feature = "runtime-monoio")]
20use monoio::net::TcpStream;
21#[cfg(feature = "runtime-monoio")]
22use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
23use rustls::server::Acceptor;
24use rustls::ServerConfig;
25#[cfg(feature = "runtime-tokio")]
26use tokio::net::TcpStream;
27use tokio_rustls::server::TlsStream;
28use tokio_rustls::LazyConfigAcceptor;
29use tokio_util::sync::CancellationToken;
30
31use crate::acme::ACME_TLS_ALPN_NAME;
32use crate::config::ServerConfigurations;
33use crate::get_value;
34use crate::listener_handler_communication::ConnectionData;
35use crate::logging::{LogMessage, Loggers};
36use crate::request_handler::request_handler;
37use crate::util::read_proxy_header;
38#[cfg(feature = "runtime-monoio")]
39use crate::util::SendAsyncIo;
40
41#[cfg(feature = "runtime-tokio")]
43#[derive(Clone, Copy, Debug)]
44pub struct TokioLocalExecutor;
45
46#[cfg(feature = "runtime-tokio")]
47impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
48where
49 F: std::future::Future + 'static,
50 F::Output: 'static,
51{
52 #[inline]
53 fn execute(&self, fut: F) {
54 tokio::task::spawn_local(fut);
55 }
56}
57
58#[allow(clippy::too_many_arguments)]
60pub fn create_http_handler(
61 configurations: Arc<ServerConfigurations>,
62 rx: Receiver<ConnectionData>,
63 enable_uring: bool,
64 loggers: Loggers,
65 tls_configs: HashMap<u16, Arc<ServerConfig>>,
66 http3_enabled: bool,
67 acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
68 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
69 enable_proxy_protocol: bool,
70) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
71 let shutdown_tx = CancellationToken::new();
72 let shutdown_rx = shutdown_tx.clone();
73 let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
74 std::thread::Builder::new()
75 .name("Request handler".to_string())
76 .spawn(move || {
77 crate::runtime::new_runtime(
78 async move {
79 if let Some(error) = http_handler_fn(
80 configurations,
81 rx,
82 &handler_init_tx,
83 loggers,
84 shutdown_rx,
85 tls_configs,
86 http3_enabled,
87 acme_tls_alpn_01_configs,
88 acme_http_01_resolvers,
89 enable_proxy_protocol,
90 )
91 .await
92 .err()
93 {
94 handler_init_tx.send(Some(error)).await.unwrap_or_default();
95 }
96 },
97 enable_uring,
98 )
99 .unwrap();
100 })?;
101
102 if let Some(error) = listen_error_rx.recv_blocking()? {
103 Err(error)?;
104 }
105
106 Ok(shutdown_tx)
107}
108
109#[allow(clippy::too_many_arguments)]
111async fn http_handler_fn(
112 configurations: Arc<ServerConfigurations>,
113 rx: Receiver<ConnectionData>,
114 handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
115 loggers: Loggers,
116 shutdown_rx: CancellationToken,
117 tls_configs: HashMap<u16, Arc<ServerConfig>>,
118 http3_enabled: bool,
119 acme_tls_alpn_01_configs: HashMap<u16, Arc<ServerConfig>>,
120 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
121 enable_proxy_protocol: bool,
122) -> Result<(), Box<dyn Error + Send + Sync>> {
123 handler_init_tx.send(None).await.unwrap_or_default();
124
125 let connections_references = Arc::new(());
126
127 loop {
128 let conn_data = crate::runtime::select! {
129 result = rx.recv() => {
130 if let Ok(recv_data) = result {
131 recv_data
132 } else {
133 break;
134 }
135 }
136 _ = shutdown_rx.cancelled() => {
137 break;
138 }
139 };
140 let configurations = configurations.clone();
141 let tls_config = if matches!(
142 conn_data.connection,
143 crate::listener_handler_communication::Connection::Quic(..)
144 ) {
145 None
146 } else {
147 tls_configs.get(&conn_data.server_address.port()).cloned()
148 };
149 let acme_tls_alpn_01_config = if matches!(
150 conn_data.connection,
151 crate::listener_handler_communication::Connection::Quic(..)
152 ) {
153 None
154 } else {
155 acme_tls_alpn_01_configs.get(&conn_data.server_address.port()).cloned()
156 };
157 let acme_http_01_resolvers = acme_http_01_resolvers.clone();
158 let loggers = loggers.clone();
159 let connections_references_cloned = connections_references.clone();
160 let shutdown_rx_clone = shutdown_rx.clone();
161 crate::runtime::spawn(async move {
162 match conn_data.connection {
163 crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
164 #[cfg(feature = "runtime-monoio")]
165 let tcp_stream = match TcpStream::from_std(tcp_stream) {
166 Ok(stream) => stream,
167 Err(err) => {
168 if let Some(logging_tx) = loggers.find_global_logger() {
169 logging_tx
170 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
171 .await
172 .unwrap_or_default();
173 }
174 return;
175 }
176 };
177 let encrypted = tls_config.is_some();
178 http_tcp_handler_fn(
179 tcp_stream,
180 conn_data.client_address,
181 conn_data.server_address,
182 loggers,
183 configurations,
184 tls_config,
185 http3_enabled && encrypted,
186 connections_references_cloned,
187 acme_tls_alpn_01_config,
188 acme_http_01_resolvers,
189 enable_proxy_protocol,
190 shutdown_rx_clone,
191 )
192 .await;
193 }
194 crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
195 http_quic_handler_fn(
196 quic_incoming,
197 conn_data.client_address,
198 conn_data.server_address,
199 loggers,
200 configurations,
201 connections_references_cloned,
202 shutdown_rx_clone,
203 )
204 .await;
205 }
206 }
207 });
208 }
209
210 while Arc::weak_count(&connections_references) > 0 {
211 crate::runtime::sleep(Duration::from_millis(100)).await;
212 }
213
214 Ok(())
215}
216
217#[allow(clippy::large_enum_variant)]
219#[cfg(feature = "runtime-monoio")]
220enum MaybeTlsStream {
221 Tls(TlsStream<TcpStreamPoll>),
223
224 Plain(TcpStreamPoll),
226}
227
228#[allow(clippy::large_enum_variant)]
229#[cfg(feature = "runtime-tokio")]
230enum MaybeTlsStream {
231 Tls(TlsStream<TcpStream>),
233
234 Plain(TcpStream),
236}
237
238#[allow(clippy::too_many_arguments)]
240async fn http_tcp_handler_fn(
241 tcp_stream: TcpStream,
242 client_address: SocketAddr,
243 server_address: SocketAddr,
244 loggers: Loggers,
245 configurations: Arc<ServerConfigurations>,
246 tls_config: Option<Arc<ServerConfig>>,
247 http3_enabled: bool,
248 connection_reference: Arc<()>,
249 acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
250 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
251 enable_proxy_protocol: bool,
252 shutdown_rx: CancellationToken,
253) {
254 let _connection_reference = Arc::downgrade(&connection_reference);
255 #[cfg(feature = "runtime-monoio")]
256 let tcp_stream = match tcp_stream.into_poll_io() {
257 Ok(stream) => stream,
258 Err(err) => {
259 if let Some(logging_tx) = loggers.find_global_logger() {
260 logging_tx
261 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
262 .await
263 .unwrap_or_default();
264 }
265 return;
266 }
267 };
268
269 let (tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address) = if enable_proxy_protocol {
271 match read_proxy_header(tcp_stream).await {
273 Ok((tcp_stream, client_ip, server_ip)) => (tcp_stream, client_ip, server_ip),
274 Err(err) => {
275 if let Some(logging_tx) = loggers.find_global_logger() {
276 logging_tx
277 .send(LogMessage::new(
278 format!("Error reading PROXY protocol header: {err}"),
279 true,
280 ))
281 .await
282 .unwrap_or_default();
283 }
284 return;
285 }
286 }
287 } else {
288 (tcp_stream, None, None)
289 };
290
291 let maybe_tls_stream = if let Some(tls_config) = tls_config {
292 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
293 Ok(start_handshake) => start_handshake,
294 Err(err) => {
295 if let Some(logging_tx) = loggers.find_global_logger() {
296 logging_tx
297 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
298 .await
299 .unwrap_or_default();
300 }
301 return;
302 }
303 };
304
305 if let Some(acme_config) = acme_tls_alpn_01_config {
306 if start_handshake
307 .client_hello()
308 .alpn()
309 .into_iter()
310 .flatten()
311 .eq([ACME_TLS_ALPN_NAME])
312 {
313 match start_handshake.into_stream(acme_config).await {
314 Ok(_) => (),
315 Err(err) => {
316 if let Some(logging_tx) = loggers.find_global_logger() {
317 logging_tx
318 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
319 .await
320 .unwrap_or_default();
321 }
322 return;
323 }
324 };
325 return;
326 }
327 }
328
329 let tls_stream = match start_handshake.into_stream(tls_config).await {
330 Ok(tls_stream) => tls_stream,
331 Err(err) => {
332 if let Some(logging_tx) = loggers.find_global_logger() {
333 logging_tx
334 .send(LogMessage::new(format!("Error during TLS handshake: {err}"), true))
335 .await
336 .unwrap_or_default();
337 }
338 return;
339 }
340 };
341
342 MaybeTlsStream::Tls(tls_stream)
343 } else {
344 MaybeTlsStream::Plain(tcp_stream)
345 };
346
347 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
348 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
349 let is_http2 = alpn_protocol == Some("h2".as_bytes());
350
351 #[cfg(feature = "runtime-tokio")]
352 let io = TokioIo::new(tls_stream);
353
354 if is_http2 {
355 #[cfg(feature = "runtime-monoio")]
357 let io = MonoioIo::new(tls_stream);
358
359 #[cfg(feature = "runtime-monoio")]
360 let mut http2_builder = {
361 let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
362 http2_builder.timer(MonoioTimer);
363 http2_builder
364 };
365 #[cfg(feature = "runtime-tokio")]
366 let mut http2_builder = {
367 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
368 http2_builder.timer(TokioTimer::new());
369 http2_builder
370 };
371
372 let global_configuration = configurations.find_global_configuration();
373
374 if let Some(initial_window_size) = global_configuration
375 .as_deref()
376 .and_then(|c| get_value!("h2_initial_window_size", c))
377 .and_then(|v| v.as_i128())
378 {
379 http2_builder.initial_stream_window_size(initial_window_size as u32);
380 }
381 if let Some(max_frame_size) = global_configuration
382 .as_deref()
383 .and_then(|c| get_value!("h2_max_frame_size", c))
384 .and_then(|v| v.as_i128())
385 {
386 http2_builder.max_frame_size(max_frame_size as u32);
387 }
388 if let Some(max_concurrent_streams) = global_configuration
389 .as_deref()
390 .and_then(|c| get_value!("h2_max_concurrent_streams", c))
391 .and_then(|v| v.as_i128())
392 {
393 http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
394 }
395 if let Some(max_header_list_size) = global_configuration
396 .as_deref()
397 .and_then(|c| get_value!("h2_max_header_list_size", c))
398 .and_then(|v| v.as_i128())
399 {
400 http2_builder.max_header_list_size(max_header_list_size as u32);
401 }
402 if let Some(enable_connect_protocol) = global_configuration
403 .as_deref()
404 .and_then(|c| get_value!("h2_enable_connect_protocol", c))
405 .and_then(|v| v.as_bool())
406 {
407 if enable_connect_protocol {
408 http2_builder.enable_connect_protocol();
409 }
410 }
411
412 let loggers_clone = loggers.clone();
413 let mut http_future = http2_builder.serve_connection(
414 io,
415 service_fn(move |request: Request<Incoming>| {
416 let (request_parts, request_body) = request.into_parts();
417 let request = Request::from_parts(
418 request_parts,
419 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
420 );
421 request_handler(
422 request,
423 client_address,
424 server_address,
425 true,
426 configurations.clone(),
427 loggers_clone.clone(),
428 if http3_enabled {
429 Some(server_address.port())
430 } else {
431 None
432 },
433 acme_http_01_resolvers.clone(),
434 proxy_protocol_client_address,
435 proxy_protocol_server_address,
436 )
437 }),
438 );
439 let http_future_result = crate::runtime::select! {
440 result = &mut http_future => {
441 result
442 }
443 _ = shutdown_rx.cancelled() => {
444 std::pin::Pin::new(&mut http_future).graceful_shutdown();
445 http_future.await
446 }
447 };
448 if let Err(err) = http_future_result {
449 if let Some(logging_tx) = loggers.find_global_logger() {
450 logging_tx
451 .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
452 .await
453 .unwrap_or_default();
454 }
455 }
456 } else {
457 #[cfg(feature = "runtime-monoio")]
458 let io = MonoioIo::new(SendAsyncIo::new(tls_stream));
459
460 #[cfg(feature = "runtime-monoio")]
461 let http1_builder = {
462 let mut http1_builder = hyper::server::conn::http1::Builder::new();
463
464 http1_builder.timer(MonoioTimer);
466
467 http1_builder
468 };
469 #[cfg(feature = "runtime-tokio")]
470 let http1_builder = {
471 let mut http1_builder = hyper::server::conn::http1::Builder::new();
472
473 http1_builder.timer(TokioTimer::new());
475
476 http1_builder
477 };
478
479 let loggers_clone = loggers.clone();
480 let mut http_future = http1_builder
481 .serve_connection(
482 io,
483 service_fn(move |request: Request<Incoming>| {
484 let (request_parts, request_body) = request.into_parts();
485 let request = Request::from_parts(
486 request_parts,
487 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
488 );
489 request_handler(
490 request,
491 client_address,
492 server_address,
493 true,
494 configurations.clone(),
495 loggers_clone.clone(),
496 if http3_enabled {
497 Some(server_address.port())
498 } else {
499 None
500 },
501 acme_http_01_resolvers.clone(),
502 proxy_protocol_client_address,
503 proxy_protocol_server_address,
504 )
505 }),
506 )
507 .with_upgrades();
508 let http_future_result = crate::runtime::select! {
509 result = &mut http_future => {
510 result
511 }
512 _ = shutdown_rx.cancelled() => {
513 std::pin::Pin::new(&mut http_future).graceful_shutdown();
514 http_future.await
515 }
516 };
517 if let Err(err) = http_future_result {
518 if let Some(logging_tx) = loggers.find_global_logger() {
519 logging_tx
520 .send(LogMessage::new(format!("Error serving HTTPS connection: {err}"), true))
521 .await
522 .unwrap_or_default();
523 }
524 }
525 }
526 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
527 #[cfg(feature = "runtime-monoio")]
528 let io = MonoioIo::new(SendAsyncIo::new(stream));
529 #[cfg(feature = "runtime-tokio")]
530 let io = TokioIo::new(stream);
531
532 #[cfg(feature = "runtime-monoio")]
533 let http1_builder = {
534 let mut http1_builder = hyper::server::conn::http1::Builder::new();
535
536 http1_builder.timer(MonoioTimer);
538
539 http1_builder
540 };
541 #[cfg(feature = "runtime-tokio")]
542 let http1_builder = {
543 let mut http1_builder = hyper::server::conn::http1::Builder::new();
544
545 http1_builder.timer(TokioTimer::new());
547
548 http1_builder
549 };
550
551 let loggers_clone = loggers.clone();
552 let mut http_future = http1_builder
553 .serve_connection(
554 io,
555 service_fn(move |request: Request<Incoming>| {
556 let (request_parts, request_body) = request.into_parts();
557 let request = Request::from_parts(
558 request_parts,
559 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
560 );
561 request_handler(
562 request,
563 client_address,
564 server_address,
565 false,
566 configurations.clone(),
567 loggers_clone.clone(),
568 if http3_enabled {
569 Some(server_address.port())
570 } else {
571 None
572 },
573 acme_http_01_resolvers.clone(),
574 proxy_protocol_client_address,
575 proxy_protocol_server_address,
576 )
577 }),
578 )
579 .with_upgrades();
580 let http_future_result = crate::runtime::select! {
581 result = &mut http_future => {
582 result
583 }
584 _ = shutdown_rx.cancelled() => {
585 std::pin::Pin::new(&mut http_future).graceful_shutdown();
586 http_future.await
587 }
588 };
589 if let Err(err) = http_future_result {
590 if let Some(logging_tx) = loggers.find_global_logger() {
591 logging_tx
592 .send(LogMessage::new(format!("Error serving HTTP connection: {err}"), true))
593 .await
594 .unwrap_or_default();
595 }
596 }
597 }
598}
599
600#[allow(clippy::too_many_arguments)]
602async fn http_quic_handler_fn(
603 connection_attempt: quinn::Incoming,
604 client_address: SocketAddr,
605 server_address: SocketAddr,
606 loggers: Loggers,
607 configurations: Arc<ServerConfigurations>,
608 connection_reference: Arc<()>,
609 shutdown_rx: CancellationToken,
610) {
611 match connection_attempt.await {
612 Ok(connection) => {
613 let _connection_reference = Arc::downgrade(&connection_reference);
614 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
615 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
616 Ok(h3_conn) => h3_conn,
617 Err(err) => {
618 if let Some(logging_tx) = loggers.find_global_logger() {
619 logging_tx
620 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
621 .await
622 .unwrap_or_default();
623 }
624 return;
625 }
626 };
627
628 loop {
629 match h3_conn.accept().await {
630 Ok(Some(resolver)) => {
631 let configurations = configurations.clone();
632 let loggers = loggers.clone();
633 crate::runtime::spawn(async move {
634 let (request, stream) = match resolver.resolve_request().await {
635 Ok(resolved) => resolved,
636 Err(err) => {
637 if let Some(logging_tx) = loggers.find_global_logger() {
638 logging_tx
639 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
640 .await
641 .unwrap_or_default();
642 }
643 return;
644 }
645 };
646 let (mut send, receive) = stream.split();
647 let request_body_stream =
648 futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
649 loop {
650 if !is_body_finished {
651 match receive.recv_data().await {
652 Ok(Some(mut data)) => {
653 return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)))
654 }
655 Ok(None) => is_body_finished = true,
656 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
657 }
658 } else {
659 match receive.recv_trailers().await {
660 Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
661 Ok(None) => {
662 return None;
663 }
664 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
665 }
666 }
667 }
668 });
669 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
670 let (request_parts, _) = request.into_parts();
671 let request = Request::from_parts(request_parts, request_body);
672 let mut response = match request_handler(
673 request,
674 client_address,
675 server_address,
676 true,
677 configurations.clone(),
678 loggers.clone(),
679 None,
680 Arc::new(tokio::sync::RwLock::new(vec![])),
681 None,
682 None,
683 )
684 .await
685 {
686 Ok(response) => response,
687 Err(err) => {
688 if let Some(logging_tx) = loggers.find_global_logger() {
689 logging_tx
690 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
691 .await
692 .unwrap_or_default();
693 }
694 return;
695 }
696 };
697 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
698 response.headers_mut().entry(hyper::header::DATE).or_insert(http_date);
699 }
700 let (response_parts, mut response_body) = response.into_parts();
701 if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
702 if let Some(logging_tx) = loggers.find_global_logger() {
703 logging_tx
704 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
705 .await
706 .unwrap_or_default();
707 }
708 return;
709 }
710 let mut had_trailers = false;
711 while let Some(chunk) = response_body.frame().await {
712 match chunk {
713 Ok(frame) => {
714 if frame.is_data() {
715 match frame.into_data() {
716 Ok(data) => {
717 if let Err(err) = send.send_data(data).await {
718 if let Some(logging_tx) = loggers.find_global_logger() {
719 logging_tx
720 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
721 .await
722 .unwrap_or_default();
723 }
724 return;
725 }
726 }
727 Err(_) => {
728 if let Some(logging_tx) = loggers.find_global_logger() {
729 logging_tx
730 .send(LogMessage::new(
731 "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
732 true,
733 ))
734 .await
735 .unwrap_or_default();
736 }
737 return;
738 }
739 }
740 } else if frame.is_trailers() {
741 match frame.into_trailers() {
742 Ok(trailers) => {
743 had_trailers = true;
744 if let Err(err) = send.send_trailers(trailers).await {
745 if let Some(logging_tx) = loggers.find_global_logger() {
746 logging_tx
747 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
748 .await
749 .unwrap_or_default();
750 }
751 return;
752 }
753 }
754 Err(_) => {
755 if let Some(logging_tx) = loggers.find_global_logger() {
756 logging_tx
757 .send(LogMessage::new(
758 "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
759 true,
760 ))
761 .await
762 .unwrap_or_default();
763 }
764 return;
765 }
766 }
767 }
768 }
769 Err(err) => {
770 if let Some(logging_tx) = loggers.find_global_logger() {
771 logging_tx
772 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
773 .await
774 .unwrap_or_default();
775 }
776 return;
777 }
778 }
779 }
780 if !had_trailers {
781 if let Err(err) = send.finish().await {
782 if let Some(logging_tx) = loggers.find_global_logger() {
783 logging_tx
784 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
785 .await
786 .unwrap_or_default();
787 }
788 }
789 }
790 });
791 }
792 Ok(None) => break,
793 Err(err) => {
794 if let Some(logging_tx) = loggers.find_global_logger() {
795 logging_tx
796 .send(LogMessage::new(format!("Error serving HTTP/3 connection: {err}"), true))
797 .await
798 .unwrap_or_default();
799 }
800 return;
801 }
802 }
803 if shutdown_rx.is_cancelled() {
804 h3_conn.shutdown(0).await.unwrap_or_default();
805 }
806 }
807 }
808 Err(err) => {
809 if let Some(logging_tx) = loggers.find_global_logger() {
810 logging_tx
811 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
812 .await
813 .unwrap_or_default();
814 }
815 }
816 }
817}