1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Display;
4use std::net::{IpAddr, SocketAddr};
5use std::sync::Arc;
6use std::time::Duration;
7#[cfg(not(feature = "runtime-vibeio"))]
8use std::time::SystemTime;
9
10use crate::acme::ACME_TLS_ALPN_NAME;
11use crate::config::ServerConfigurations;
12use crate::get_value;
13use crate::listener_handler_communication::ConnectionData;
14use crate::request_handler::request_handler;
15#[cfg(feature = "runtime-monoio")]
16use crate::util::SendAsyncIo;
17use crate::util::{read_proxy_header, MultiCancel};
18use arc_swap::ArcSwap;
19use async_channel::{Receiver, Sender};
20#[cfg(not(feature = "runtime-vibeio"))]
21use bytes::{Buf, Bytes};
22#[cfg(feature = "runtime-vibeio")]
23use core_affinity::CoreId;
24use ferron_common::logging::LogMessage;
25use http_body_util::BodyExt;
26#[cfg(not(feature = "runtime-vibeio"))]
27use http_body_util::StreamBody;
28#[cfg(not(feature = "runtime-vibeio"))]
29use hyper::body::{Frame, Incoming};
30#[cfg(not(feature = "runtime-vibeio"))]
31use hyper::service::service_fn;
32use hyper::Request;
33#[cfg(not(feature = "runtime-vibeio"))]
34use hyper::Response;
35#[cfg(feature = "runtime-tokio")]
36use hyper_util::rt::{TokioIo, TokioTimer};
37#[cfg(feature = "runtime-monoio")]
38use monoio::io::IntoPollIo;
39#[cfg(feature = "runtime-monoio")]
40use monoio::net::tcp::stream_poll::TcpStreamPoll;
41#[cfg(feature = "runtime-monoio")]
42use monoio::net::TcpStream;
43#[cfg(feature = "runtime-monoio")]
44use monoio_compat::hyper::{MonoioExecutor, MonoioIo, MonoioTimer};
45use rustls::server::Acceptor;
46use rustls::ServerConfig;
47#[cfg(feature = "runtime-tokio")]
48use tokio::net::TcpStream;
49use tokio_rustls::server::TlsStream;
50use tokio_rustls::LazyConfigAcceptor;
51use tokio_util::sync::CancellationToken;
52#[cfg(feature = "runtime-vibeio")]
53use vibeio::net::PollTcpStream;
54#[cfg(feature = "runtime-vibeio")]
55use vibeio::net::TcpStream;
56
57#[cfg(not(feature = "runtime-vibeio"))]
58static HTTP3_INVALID_HEADERS: [hyper::header::HeaderName; 5] = [
59 hyper::header::HeaderName::from_static("keep-alive"),
60 hyper::header::HeaderName::from_static("proxy-connection"),
61 hyper::header::TRANSFER_ENCODING,
62 hyper::header::TE,
63 hyper::header::UPGRADE,
64];
65
66#[allow(clippy::type_complexity)]
68pub struct ReloadableHandlerData {
69 pub acme_tls_alpn_01_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<ServerConfig>>>,
71 pub acme_http_01_resolvers: AcmeHttp01Resolvers,
73 pub configurations: Arc<ServerConfigurations>,
75 pub tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<ServerConfig>>>,
77 pub http3_enabled: bool,
79 pub enable_proxy_protocol: bool,
81 pub quic_tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<quinn::ServerConfig>>>,
83}
84
85type AcmeHttp01Resolvers = Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>;
86
87#[cfg(feature = "runtime-tokio")]
89#[derive(Clone, Copy, Debug)]
90pub struct TokioLocalExecutor;
91
92#[cfg(feature = "runtime-tokio")]
93impl<F> hyper::rt::Executor<F> for TokioLocalExecutor
94where
95 F: std::future::Future + 'static,
96 F::Output: 'static,
97{
98 #[inline]
99 fn execute(&self, fut: F) {
100 tokio::task::spawn_local(fut);
101 }
102}
103
104pub fn create_http_handler(
106 reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
107 rx: Receiver<ConnectionData>,
108 enable_uring: Option<bool>,
109 io_uring_disabled: Sender<Option<std::io::Error>>,
110 multi_cancel: Arc<MultiCancel>,
111 #[cfg(feature = "runtime-vibeio")] core_affinity: Option<CoreId>,
112) -> Result<(CancellationToken, Sender<()>), Box<dyn Error + Send + Sync>> {
113 let shutdown_tx = CancellationToken::new();
114 let shutdown_rx = shutdown_tx.clone();
115 let (handler_init_tx, listen_error_rx) = async_channel::unbounded();
116 let (graceful_tx, graceful_rx) = async_channel::unbounded();
117 std::thread::Builder::new()
118 .name("Request handler".to_string())
119 .spawn(move || {
120 #[cfg(feature = "runtime-vibeio")]
121 if let Some(affinity) = core_affinity {
122 core_affinity::set_for_current(affinity);
123 }
124 let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
125 Ok(rt) => rt,
126 Err(error) => {
127 handler_init_tx
128 .send_blocking(Some(
129 anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
130 ))
131 .unwrap_or_default();
132 return;
133 }
134 };
135 io_uring_disabled
136 .send_blocking(rt.return_io_uring_error())
137 .unwrap_or_default();
138 rt.run(async move {
139 if let Some(error) = http_handler_fn(
140 reloadable_data,
141 rx,
142 &handler_init_tx,
143 shutdown_rx,
144 graceful_rx,
145 multi_cancel,
146 )
147 .await
148 .err()
149 {
150 handler_init_tx.send(Some(error)).await.unwrap_or_default();
151 }
152 });
153 })?;
154
155 if let Some(error) = listen_error_rx.recv_blocking()? {
156 Err(error)?;
157 }
158
159 Ok((shutdown_tx, graceful_tx))
160}
161
162#[inline]
164async fn http_handler_fn(
165 reloadable_data: Arc<ArcSwap<ReloadableHandlerData>>,
166 rx: Receiver<ConnectionData>,
167 handler_init_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
168 shutdown_rx: CancellationToken,
169 graceful_rx: Receiver<()>,
170 multi_cancel: Arc<MultiCancel>,
171) -> Result<(), Box<dyn Error + Send + Sync>> {
172 handler_init_tx.send(None).await.unwrap_or_default();
173
174 let connections_references = Arc::new(());
175 let graceful_shutdown_token = Arc::new(ArcSwap::from_pointee(CancellationToken::new()));
176 let graceful_shutdown_token_clone = graceful_shutdown_token.clone();
177
178 let mut graceful_rx_recv_future = Box::pin(async move {
179 while graceful_rx.recv().await.is_ok() {
180 graceful_shutdown_token_clone
181 .swap(Arc::new(CancellationToken::new()))
182 .cancel();
183 }
184
185 futures_util::future::pending::<()>().await;
186 });
187
188 loop {
189 let conn_data = crate::runtime::select! {
190 biased;
191
192 _ = &mut graceful_rx_recv_future => {
193 break;
195 }
196 _ = shutdown_rx.cancelled() => {
197 break;
198 }
199 result = rx.recv() => {
200 if let Ok(recv_data) = result {
201 recv_data
202 } else {
203 break;
204 }
205 }
206 };
207 let ReloadableHandlerData {
208 configurations,
209 tls_configs,
210 http3_enabled,
211 acme_tls_alpn_01_configs,
212 acme_http_01_resolvers,
213 enable_proxy_protocol,
214 quic_tls_configs,
215 } = &**reloadable_data.load();
216 let quic_tls_configs = quic_tls_configs.clone();
217 let configurations = configurations.clone();
218 let tls_config = if matches!(
219 conn_data.connection,
220 crate::listener_handler_communication::Connection::Quic(..)
221 ) {
222 None
223 } else {
224 tls_configs
225 .get(&(
226 Some(conn_data.server_address.ip().to_canonical()),
227 conn_data.server_address.port(),
228 ))
229 .cloned()
230 .or_else(|| tls_configs.get(&(None, conn_data.server_address.port())).cloned())
231 };
232 let acme_tls_alpn_01_config = if matches!(
233 conn_data.connection,
234 crate::listener_handler_communication::Connection::Quic(..)
235 ) {
236 None
237 } else {
238 acme_tls_alpn_01_configs
239 .get(&(
240 Some(conn_data.server_address.ip().to_canonical()),
241 conn_data.server_address.port(),
242 ))
243 .cloned()
244 .or_else(|| {
245 acme_tls_alpn_01_configs
246 .get(&(None, conn_data.server_address.port()))
247 .cloned()
248 })
249 };
250 let acme_http_01_resolvers = acme_http_01_resolvers.clone();
251 let connections_references_cloned = connections_references.clone();
252 let shutdown_rx_clone = shutdown_rx.clone();
253 let http3_enabled = *http3_enabled;
254 let enable_proxy_protocol = *enable_proxy_protocol;
255 let graceful_shutdown_token = graceful_shutdown_token.load().clone();
256 crate::runtime::spawn(async move {
257 match conn_data.connection {
258 crate::listener_handler_communication::Connection::Tcp(tcp_stream) => {
259 #[cfg(feature = "runtime-monoio")]
262 let _ = tcp_stream.set_nonblocking(monoio::utils::is_legacy());
263 #[cfg(feature = "runtime-vibeio")]
264 let _ = tcp_stream.set_nonblocking(vibeio::util::supports_completion());
265
266 #[cfg(any(feature = "runtime-vibeio", feature = "runtime-monoio"))]
267 let tcp_stream = match TcpStream::from_std(tcp_stream) {
268 Ok(stream) => stream,
269 Err(err) => {
270 log_connection_accept_error(&configurations, err).await;
271 return;
272 }
273 };
274 let encrypted = tls_config.is_some();
275 http_tcp_handler_fn(
276 tcp_stream,
277 conn_data.client_address,
278 conn_data.server_address,
279 configurations,
280 tls_config,
281 http3_enabled && encrypted,
282 connections_references_cloned,
283 acme_tls_alpn_01_config,
284 acme_http_01_resolvers,
285 enable_proxy_protocol,
286 shutdown_rx_clone,
287 graceful_shutdown_token,
288 )
289 .await;
290 }
291 crate::listener_handler_communication::Connection::Quic(quic_incoming) => {
292 http_quic_handler_fn(
293 quic_incoming,
294 conn_data.client_address,
295 conn_data.server_address,
296 configurations,
297 quic_tls_configs,
298 connections_references_cloned,
299 shutdown_rx_clone,
300 graceful_shutdown_token,
301 )
302 .await;
303 }
304 }
305 });
306 }
307
308 while Arc::weak_count(&connections_references) > 0 {
309 crate::runtime::sleep(Duration::from_millis(100)).await;
310 }
311
312 multi_cancel.cancel().await;
314
315 Ok(())
316}
317
318#[cfg(feature = "runtime-monoio")]
320type HttpTcpStream = SendAsyncIo<TcpStreamPoll>;
321
322#[cfg(feature = "runtime-vibeio")]
323type HttpTcpStream = PollTcpStream;
324
325#[cfg(feature = "runtime-tokio")]
326type HttpTcpStream = TcpStream;
327
328#[allow(clippy::large_enum_variant)]
330enum MaybeTlsStream {
331 Tls(TlsStream<HttpTcpStream>),
333
334 Plain(HttpTcpStream),
336}
337
338#[derive(Clone, Copy, Default)]
339struct Http2Settings {
340 initial_window_size: Option<u32>,
341 max_frame_size: Option<u32>,
342 max_concurrent_streams: Option<u32>,
343 max_header_list_size: Option<u32>,
344 enable_connect_protocol: bool,
345}
346
347#[inline]
348fn get_http2_settings(configurations: &ServerConfigurations) -> Http2Settings {
349 let global_configuration = configurations.find_global_configuration();
350
351 Http2Settings {
352 initial_window_size: global_configuration
353 .as_deref()
354 .and_then(|c| get_value!("h2_initial_window_size", c))
355 .and_then(|v| v.as_i128())
356 .map(|v| v as u32),
357 max_frame_size: global_configuration
358 .as_deref()
359 .and_then(|c| get_value!("h2_max_frame_size", c))
360 .and_then(|v| v.as_i128())
361 .map(|v| v as u32),
362 max_concurrent_streams: global_configuration
363 .as_deref()
364 .and_then(|c| get_value!("h2_max_concurrent_streams", c))
365 .and_then(|v| v.as_i128())
366 .map(|v| v as u32),
367 max_header_list_size: global_configuration
368 .as_deref()
369 .and_then(|c| get_value!("h2_max_header_list_size", c))
370 .and_then(|v| v.as_i128())
371 .map(|v| v as u32),
372 enable_connect_protocol: global_configuration
373 .as_deref()
374 .and_then(|c| get_value!("h2_enable_connect_protocol", c))
375 .and_then(|v| v.as_bool())
376 .unwrap_or(false),
377 }
378}
379
380#[inline]
381fn get_http3_port(http3_enabled: bool, server_address: SocketAddr) -> Option<u16> {
382 if http3_enabled {
383 Some(server_address.port())
384 } else {
385 None
386 }
387}
388
389#[inline]
390fn empty_acme_http_01_resolvers() -> AcmeHttp01Resolvers {
391 Arc::new(tokio::sync::RwLock::new(Vec::new()))
392}
393
394#[inline]
395async fn log_handler_error(configurations: &ServerConfigurations, message: impl Into<String>) {
396 let message = message.into();
397 let global_configuration = configurations.find_global_configuration();
398 let log_channels = global_configuration
399 .as_deref()
400 .map_or(&[][..], |c| c.observability.log_channels.as_slice());
401 for logging_tx in log_channels {
402 logging_tx
403 .send(LogMessage::new(message.clone(), true))
404 .await
405 .unwrap_or_default();
406 }
407}
408
409#[inline]
410async fn log_connection_accept_error(configurations: &ServerConfigurations, err: impl Display) {
411 log_handler_error(configurations, format!("Cannot accept a connection: {err}")).await;
412}
413
414#[inline]
415async fn log_http_connection_error(configurations: &ServerConfigurations, protocol: &str, err: impl Display) {
416 log_handler_error(configurations, format!("Error serving {protocol} connection: {err}")).await;
417}
418
419#[cfg(feature = "runtime-monoio")]
420#[inline]
421async fn convert_tcp_stream_for_runtime(
422 tcp_stream: TcpStream,
423 configurations: &Arc<ServerConfigurations>,
424) -> Option<HttpTcpStream> {
425 match tcp_stream.into_poll_io() {
426 Ok(stream) => Some(SendAsyncIo::new(stream)),
427 Err(err) => {
428 log_connection_accept_error(configurations, err).await;
429 None
430 }
431 }
432}
433
434#[cfg(feature = "runtime-vibeio")]
435#[inline]
436async fn convert_tcp_stream_for_runtime(
437 tcp_stream: TcpStream,
438 configurations: &Arc<ServerConfigurations>,
439) -> Option<HttpTcpStream> {
440 match tcp_stream.into_poll() {
441 Ok(stream) => Some(stream),
442 Err(err) => {
443 log_connection_accept_error(configurations, err).await;
444 None
445 }
446 }
447}
448
449#[cfg(feature = "runtime-tokio")]
450#[inline]
451async fn convert_tcp_stream_for_runtime(
452 tcp_stream: TcpStream,
453 _configurations: &Arc<ServerConfigurations>,
454) -> Option<HttpTcpStream> {
455 Some(tcp_stream)
456}
457
458#[inline]
459async fn maybe_read_proxy_protocol_header(
460 tcp_stream: HttpTcpStream,
461 enable_proxy_protocol: bool,
462 configurations: &Arc<ServerConfigurations>,
463) -> Option<(HttpTcpStream, Option<SocketAddr>, Option<SocketAddr>)> {
464 if !enable_proxy_protocol {
465 return Some((tcp_stream, None, None));
466 }
467
468 match read_proxy_header(tcp_stream).await {
469 Ok((stream, client_address, server_address)) => Some((stream, client_address, server_address)),
470 Err(err) => {
471 log_handler_error(configurations, format!("Error reading PROXY protocol header: {err}")).await;
472 None
473 }
474 }
475}
476
477#[inline]
478async fn maybe_accept_tls_stream(
479 tcp_stream: HttpTcpStream,
480 tls_config: Option<Arc<ServerConfig>>,
481 acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
482 configurations: &Arc<ServerConfigurations>,
483) -> Option<MaybeTlsStream> {
484 let Some(tls_config) = tls_config else {
485 return Some(MaybeTlsStream::Plain(tcp_stream));
486 };
487
488 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), tcp_stream).await {
489 Ok(start_handshake) => start_handshake,
490 Err(err) => {
491 log_handler_error(configurations, format!("Error during TLS handshake: {err}")).await;
492 return None;
493 }
494 };
495
496 if let Some(acme_config) = acme_tls_alpn_01_config {
497 if start_handshake
498 .client_hello()
499 .alpn()
500 .into_iter()
501 .flatten()
502 .eq([ACME_TLS_ALPN_NAME])
503 {
504 if let Err(err) = start_handshake.into_stream(acme_config).await {
505 log_handler_error(configurations, format!("Error during TLS handshake: {err}")).await;
506 }
507 return None;
508 }
509 }
510
511 match start_handshake.into_stream(tls_config).await {
512 Ok(tls_stream) => Some(MaybeTlsStream::Tls(tls_stream)),
513 Err(err) => {
514 log_handler_error(configurations, format!("Error during TLS handshake: {err}")).await;
515 None
516 }
517 }
518}
519
520#[cfg(not(feature = "runtime-vibeio"))]
521#[inline]
522fn sanitize_http3_response_headers(response_headers: &mut hyper::HeaderMap) {
523 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
524 response_headers.entry(hyper::header::DATE).or_insert(http_date);
525 }
526 for header in &HTTP3_INVALID_HEADERS {
527 response_headers.remove(header);
528 }
529 if let Some(connection_header) = response_headers
530 .remove(hyper::header::CONNECTION)
531 .as_ref()
532 .and_then(|v| v.to_str().ok())
533 {
534 for name in connection_header.split(',') {
535 response_headers.remove(name.trim());
536 }
537 }
538}
539
540#[allow(clippy::too_many_arguments)]
542#[inline]
543async fn http_tcp_handler_fn(
544 tcp_stream: TcpStream,
545 client_address: SocketAddr,
546 server_address: SocketAddr,
547 configurations: Arc<ServerConfigurations>,
548 tls_config: Option<Arc<ServerConfig>>,
549 http3_enabled: bool,
550 connection_reference: Arc<()>,
551 acme_tls_alpn_01_config: Option<Arc<ServerConfig>>,
552 acme_http_01_resolvers: AcmeHttp01Resolvers,
553 enable_proxy_protocol: bool,
554 shutdown_rx: CancellationToken,
555 graceful_shutdown_token: Arc<CancellationToken>,
556) {
557 let _connection_reference = Arc::downgrade(&connection_reference);
558 let Some(tcp_stream) = convert_tcp_stream_for_runtime(tcp_stream, &configurations).await else {
559 return;
560 };
561 let Some((tcp_stream, proxy_protocol_client_address, proxy_protocol_server_address)) =
562 maybe_read_proxy_protocol_header(tcp_stream, enable_proxy_protocol, &configurations).await
563 else {
564 return;
565 };
566 let Some(maybe_tls_stream) =
567 maybe_accept_tls_stream(tcp_stream, tls_config, acme_tls_alpn_01_config, &configurations).await
568 else {
569 return;
570 };
571
572 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
573 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
574 let is_http2 = alpn_protocol == Some("h2".as_bytes());
575
576 #[cfg(feature = "runtime-tokio")]
577 let io = TokioIo::new(tls_stream);
578
579 #[cfg(feature = "runtime-vibeio")]
581 if is_http2 {
582 use vibeio_http::{Http2Options, HttpProtocol};
583
584 let mut h2_options = Http2Options::default();
585 let http2_builder = h2_options.h2_builder();
586 let http2_settings = get_http2_settings(&configurations);
587 if let Some(initial_window_size) = http2_settings.initial_window_size {
588 http2_builder.initial_window_size(initial_window_size);
589 }
590 if let Some(max_frame_size) = http2_settings.max_frame_size {
591 http2_builder.max_frame_size(max_frame_size);
592 }
593 if let Some(max_concurrent_streams) = http2_settings.max_concurrent_streams {
594 http2_builder.max_concurrent_streams(max_concurrent_streams);
595 }
596 if let Some(max_header_list_size) = http2_settings.max_header_list_size {
597 http2_builder.max_header_list_size(max_header_list_size);
598 }
599 if http2_settings.enable_connect_protocol {
600 http2_builder.enable_connect_protocol();
601 }
602
603 let configurations_clone = configurations.clone();
604 let graceful_shutdown_token2 = CancellationToken::new();
605 let connection_reference = _connection_reference.clone();
606 let http_future = vibeio_http::Http2::new(tls_stream, h2_options)
607 .graceful_shutdown_token(graceful_shutdown_token2.clone())
608 .handle(move |request: Request<vibeio_http::Incoming>| {
609 let (request_parts, request_body) = request.into_parts();
610 let request = Request::from_parts(
611 request_parts,
612 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
613 );
614 let fut = request_handler(
615 request,
616 client_address,
617 server_address,
618 true,
619 configurations_clone.clone(),
620 get_http3_port(http3_enabled, server_address),
621 acme_http_01_resolvers.clone(),
622 proxy_protocol_client_address,
623 proxy_protocol_server_address,
624 );
625 let connection_reference = connection_reference.clone();
626 async move {
627 let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
628 drop(connection_reference);
629 r
630 }
631 });
632 let mut http_future_pin = std::pin::pin!(http_future);
633 let http_future_result = crate::runtime::select! {
634 result = &mut http_future_pin => {
635 result
636 }
637 _ = shutdown_rx.cancelled() => {
638 graceful_shutdown_token2.cancel();
639 http_future_pin.await
640 }
641 _ = graceful_shutdown_token.cancelled() => {
642 graceful_shutdown_token2.cancel();
643 http_future_pin.await
644 }
645 };
646 if let Err(err) = http_future_result {
647 log_http_connection_error(&configurations, "HTTPS", err).await;
648 }
649 } else {
650 use vibeio_http::{Http1Options, HttpProtocol};
651
652 let configurations_clone = configurations.clone();
653 let graceful_shutdown_token2 = CancellationToken::new();
654 let connection_reference = _connection_reference.clone();
655 let mut http_future = Box::pin(
656 vibeio_http::Http1::new(tls_stream, Http1Options::default())
657 .graceful_shutdown_token(graceful_shutdown_token2.clone())
658 .handle(move |request: Request<vibeio_http::Incoming>| {
659 let (request_parts, request_body) = request.into_parts();
660 let request = Request::from_parts(
661 request_parts,
662 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
663 );
664 let fut = request_handler(
665 request,
666 client_address,
667 server_address,
668 true,
669 configurations_clone.clone(),
670 get_http3_port(http3_enabled, server_address),
671 acme_http_01_resolvers.clone(),
672 proxy_protocol_client_address,
673 proxy_protocol_server_address,
674 );
675 let connection_reference = connection_reference.clone();
676 async move {
677 let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
678 drop(connection_reference);
679 r
680 }
681 }),
682 );
683 let http_future_result = crate::runtime::select! {
684 result = &mut http_future => {
685 result
686 }
687 _ = shutdown_rx.cancelled() => {
688 graceful_shutdown_token2.cancel();
689 http_future.await
690 }
691 _ = graceful_shutdown_token.cancelled() => {
692 graceful_shutdown_token2.cancel();
693 http_future.await
694 }
695 };
696 if let Err(err) = http_future_result {
697 log_http_connection_error(&configurations, "HTTPS", err).await;
698 }
699 }
700
701 #[cfg(not(feature = "runtime-vibeio"))]
702 if is_http2 {
703 #[cfg(feature = "runtime-monoio")]
705 let io = MonoioIo::new(tls_stream);
706
707 #[cfg(feature = "runtime-monoio")]
708 let mut http2_builder = {
709 let mut http2_builder = hyper::server::conn::http2::Builder::new(MonoioExecutor);
710 http2_builder.timer(MonoioTimer);
711 http2_builder
712 };
713 #[cfg(feature = "runtime-tokio")]
714 let mut http2_builder = {
715 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioLocalExecutor);
716 http2_builder.timer(TokioTimer::new());
717 http2_builder
718 };
719
720 let http2_settings = get_http2_settings(&configurations);
721 if let Some(initial_window_size) = http2_settings.initial_window_size {
722 http2_builder.initial_stream_window_size(initial_window_size);
723 }
724 if let Some(max_frame_size) = http2_settings.max_frame_size {
725 http2_builder.max_frame_size(max_frame_size);
726 }
727 if let Some(max_concurrent_streams) = http2_settings.max_concurrent_streams {
728 http2_builder.max_concurrent_streams(max_concurrent_streams);
729 }
730 if let Some(max_header_list_size) = http2_settings.max_header_list_size {
731 http2_builder.max_header_list_size(max_header_list_size);
732 }
733 if http2_settings.enable_connect_protocol {
734 http2_builder.enable_connect_protocol();
735 }
736
737 let configurations_clone = configurations.clone();
738 let mut http_future = http2_builder.serve_connection(
739 io,
740 service_fn(move |request: Request<Incoming>| {
741 let (request_parts, request_body) = request.into_parts();
742 let request = Request::from_parts(
743 request_parts,
744 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
745 );
746 request_handler(
747 request,
748 client_address,
749 server_address,
750 true,
751 configurations_clone.clone(),
752 get_http3_port(http3_enabled, server_address),
753 acme_http_01_resolvers.clone(),
754 proxy_protocol_client_address,
755 proxy_protocol_server_address,
756 )
757 }),
758 );
759 let http_future_result = crate::runtime::select! {
760 result = &mut http_future => {
761 result
762 }
763 _ = shutdown_rx.cancelled() => {
764 std::pin::Pin::new(&mut http_future).graceful_shutdown();
765 http_future.await
766 }
767 _ = graceful_shutdown_token.cancelled() => {
768 std::pin::Pin::new(&mut http_future).graceful_shutdown();
769 http_future.await
770 }
771 };
772 if let Err(err) = http_future_result {
773 let error_to_log = if err.is_user() {
774 err.source().unwrap_or(&err)
775 } else {
776 &err
777 };
778 log_http_connection_error(&configurations, "HTTPS", error_to_log).await;
779 }
780 } else {
781 #[cfg(feature = "runtime-monoio")]
782 let io = MonoioIo::new(tls_stream);
783
784 #[cfg(feature = "runtime-monoio")]
785 let http1_builder = {
786 let mut http1_builder = hyper::server::conn::http1::Builder::new();
787
788 http1_builder.timer(MonoioTimer);
790
791 http1_builder
792 };
793 #[cfg(feature = "runtime-tokio")]
794 let http1_builder = {
795 let mut http1_builder = hyper::server::conn::http1::Builder::new();
796
797 http1_builder.timer(TokioTimer::new());
799
800 http1_builder
801 };
802
803 let configurations_clone = configurations.clone();
804 let mut http_future = http1_builder
805 .serve_connection(
806 io,
807 service_fn(move |request: Request<Incoming>| {
808 let (request_parts, request_body) = request.into_parts();
809 let request = Request::from_parts(
810 request_parts,
811 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
812 );
813 request_handler(
814 request,
815 client_address,
816 server_address,
817 true,
818 configurations_clone.clone(),
819 get_http3_port(http3_enabled, server_address),
820 acme_http_01_resolvers.clone(),
821 proxy_protocol_client_address,
822 proxy_protocol_server_address,
823 )
824 }),
825 )
826 .with_upgrades();
827 let http_future_result = crate::runtime::select! {
828 result = &mut http_future => {
829 result
830 }
831 _ = shutdown_rx.cancelled() => {
832 std::pin::Pin::new(&mut http_future).graceful_shutdown();
833 http_future.await
834 }
835 _ = graceful_shutdown_token.cancelled() => {
836 std::pin::Pin::new(&mut http_future).graceful_shutdown();
837 http_future.await
838 }
839 };
840 if let Err(err) = http_future_result {
841 let error_to_log = if err.is_user() {
842 err.source().unwrap_or(&err)
843 } else {
844 &err
845 };
846 log_http_connection_error(&configurations, "HTTPS", error_to_log).await;
847 }
848 }
849 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
850 #[cfg(feature = "runtime-vibeio")]
851 {
852 use vibeio_http::{Http1Options, HttpProtocol};
853
854 let configurations_clone = configurations.clone();
855 let connection_reference = _connection_reference.clone();
856 let graceful_shutdown_token2 = CancellationToken::new();
857 let http1 = vibeio_http::Http1::new(stream, Http1Options::default())
858 .graceful_shutdown_token(graceful_shutdown_token2.clone());
859
860 #[cfg(target_os = "linux")]
861 let mut http_future = Box::pin(http1.zerocopy().handle(move |request: Request<vibeio_http::Incoming>| {
862 let (request_parts, request_body) = request.into_parts();
863 let request = Request::from_parts(
864 request_parts,
865 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
866 );
867 let fut = request_handler(
868 request,
869 client_address,
870 server_address,
871 false,
872 configurations_clone.clone(),
873 get_http3_port(http3_enabled, server_address),
874 acme_http_01_resolvers.clone(),
875 proxy_protocol_client_address,
876 proxy_protocol_server_address,
877 );
878 let connection_reference = connection_reference.clone();
879 async move {
880 let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
881 drop(connection_reference);
882 r
883 }
884 }));
885 #[cfg(not(target_os = "linux"))]
886 let mut http_future = Box::pin(http1.handle(move |request: Request<vibeio_http::Incoming>| {
887 let (request_parts, request_body) = request.into_parts();
888 let request = Request::from_parts(
889 request_parts,
890 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
891 );
892 let fut = request_handler(
893 request,
894 client_address,
895 server_address,
896 true,
897 configurations_clone.clone(),
898 get_http3_port(http3_enabled, server_address),
899 acme_http_01_resolvers.clone(),
900 proxy_protocol_client_address,
901 proxy_protocol_server_address,
902 );
903 let connection_reference = connection_reference.clone();
904 async move {
905 let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
906 drop(connection_reference);
907 r
908 }
909 }));
910 let http_future_result = crate::runtime::select! {
911 result = &mut http_future => {
912 result
913 }
914 _ = shutdown_rx.cancelled() => {
915 graceful_shutdown_token2.cancel();
916 http_future.await
917 }
918 _ = graceful_shutdown_token.cancelled() => {
919 graceful_shutdown_token2.cancel();
920 http_future.await
921 }
922 };
923 if let Err(err) = http_future_result {
924 log_http_connection_error(&configurations, "HTTP", err).await;
925 }
926 }
927 #[cfg(not(feature = "runtime-vibeio"))]
928 {
929 #[cfg(feature = "runtime-monoio")]
930 let io = MonoioIo::new(stream);
931 #[cfg(feature = "runtime-tokio")]
932 let io = TokioIo::new(stream);
933
934 #[cfg(feature = "runtime-monoio")]
935 let http1_builder = {
936 let mut http1_builder = hyper::server::conn::http1::Builder::new();
937
938 http1_builder.timer(MonoioTimer);
940
941 http1_builder
942 };
943 #[cfg(feature = "runtime-tokio")]
944 let http1_builder = {
945 let mut http1_builder = hyper::server::conn::http1::Builder::new();
946
947 http1_builder.timer(TokioTimer::new());
949
950 http1_builder
951 };
952
953 let configurations_clone = configurations.clone();
954 let mut http_future = http1_builder
955 .serve_connection(
956 io,
957 service_fn(move |request: Request<Incoming>| {
958 let (request_parts, request_body) = request.into_parts();
959 let request = Request::from_parts(
960 request_parts,
961 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
962 );
963 request_handler(
964 request,
965 client_address,
966 server_address,
967 false,
968 configurations_clone.clone(),
969 get_http3_port(http3_enabled, server_address),
970 acme_http_01_resolvers.clone(),
971 proxy_protocol_client_address,
972 proxy_protocol_server_address,
973 )
974 }),
975 )
976 .with_upgrades();
977 let http_future_result = crate::runtime::select! {
978 result = &mut http_future => {
979 result
980 }
981 _ = shutdown_rx.cancelled() => {
982 std::pin::Pin::new(&mut http_future).graceful_shutdown();
983 http_future.await
984 }
985 _ = graceful_shutdown_token.cancelled() => {
986 std::pin::Pin::new(&mut http_future).graceful_shutdown();
987 http_future.await
988 }
989 };
990 if let Err(err) = http_future_result {
991 let error_to_log = if err.is_user() {
992 err.source().unwrap_or(&err)
993 } else {
994 &err
995 };
996 log_http_connection_error(&configurations, "HTTP", error_to_log).await;
997 }
998 }
999 }
1000}
1001
1002#[inline]
1004#[cfg(feature = "runtime-vibeio")]
1005#[allow(clippy::too_many_arguments)]
1006#[allow(clippy::type_complexity)]
1007async fn http_quic_handler_fn(
1008 connection_attempt: quinn::Incoming,
1009 client_address: SocketAddr,
1010 server_address: SocketAddr,
1011 configurations: Arc<ServerConfigurations>,
1012 quic_tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<quinn::ServerConfig>>>,
1013 connection_reference: Arc<()>,
1014 shutdown_rx: CancellationToken,
1015 graceful_shutdown_token: Arc<CancellationToken>,
1016) {
1017 use vibeio_http::{Http3Options, HttpProtocol};
1018
1019 let connection = if let Some(tls_config) = quic_tls_configs
1020 .get(&(Some(server_address.ip().to_canonical()), server_address.port()))
1021 .cloned()
1022 .or_else(|| quic_tls_configs.get(&(None, server_address.port())).cloned())
1023 {
1024 match connection_attempt.accept_with(tls_config) {
1025 Ok(connecting) => match connecting.await {
1026 Ok(connection) => connection,
1027 Err(err) => {
1028 log_connection_accept_error(&configurations, err).await;
1029 return;
1030 }
1031 },
1032 Err(err) => {
1033 log_connection_accept_error(&configurations, err).await;
1034 return;
1035 }
1036 }
1037 } else {
1038 match connection_attempt.await {
1039 Ok(connection) => connection,
1040 Err(err) => {
1041 log_connection_accept_error(&configurations, err).await;
1042 return;
1043 }
1044 }
1045 };
1046
1047 let _connection_reference = Arc::downgrade(&connection_reference);
1048 let configurations_clone = configurations.clone();
1049 let graceful_shutdown_token2 = CancellationToken::new();
1050 let mut http_future = Box::pin(
1051 vibeio_http::Http3::new(h3_quinn::Connection::new(connection), Http3Options::default())
1052 .graceful_shutdown_token(graceful_shutdown_token2.clone())
1053 .handle(move |request: Request<vibeio_http::Incoming>| {
1054 let (request_parts, request_body) = request.into_parts();
1055 let request = Request::from_parts(
1056 request_parts,
1057 request_body.map_err(|e| std::io::Error::other(e.to_string())).boxed(),
1058 );
1059 let fut = request_handler(
1060 request,
1061 client_address,
1062 server_address,
1063 true,
1064 configurations_clone.clone(),
1065 None,
1066 empty_acme_http_01_resolvers(),
1067 None,
1068 None,
1069 );
1070 let connection_reference = connection_reference.clone();
1071 async move {
1072 let r = fut.await.map_err(|e| std::io::Error::other(e.to_string()));
1073 drop(connection_reference);
1074 r
1075 }
1076 }),
1077 );
1078 let http_future_result = crate::runtime::select! {
1079 result = &mut http_future => {
1080 result
1081 }
1082 _ = shutdown_rx.cancelled() => {
1083 graceful_shutdown_token2.cancel();
1084 http_future.await
1085 }
1086 _ = graceful_shutdown_token.cancelled() => {
1087 graceful_shutdown_token2.cancel();
1088 http_future.await
1089 }
1090 };
1091 if let Err(err) = http_future_result {
1092 log_http_connection_error(&configurations, "HTTP/3", err).await;
1093 }
1094}
1095
1096#[cfg(not(feature = "runtime-vibeio"))]
1098#[inline]
1099#[allow(clippy::too_many_arguments)]
1100#[allow(clippy::type_complexity)]
1101async fn http_quic_handler_fn(
1102 connection_attempt: quinn::Incoming,
1103 client_address: SocketAddr,
1104 server_address: SocketAddr,
1105 configurations: Arc<ServerConfigurations>,
1106 quic_tls_configs: Arc<HashMap<(Option<IpAddr>, u16), Arc<quinn::ServerConfig>>>,
1107 connection_reference: Arc<()>,
1108 shutdown_rx: CancellationToken,
1109 graceful_shutdown_token: Arc<CancellationToken>,
1110) {
1111 let connection = if let Some(tls_config) = quic_tls_configs
1112 .get(&(Some(server_address.ip().to_canonical()), server_address.port()))
1113 .cloned()
1114 .or_else(|| quic_tls_configs.get(&(None, server_address.port())).cloned())
1115 {
1116 match connection_attempt.accept_with(tls_config) {
1117 Ok(connecting) => match connecting.await {
1118 Ok(connection) => connection,
1119 Err(err) => {
1120 log_connection_accept_error(&configurations, err).await;
1121 return;
1122 }
1123 },
1124 Err(err) => {
1125 log_connection_accept_error(&configurations, err).await;
1126 return;
1127 }
1128 }
1129 } else {
1130 match connection_attempt.await {
1131 Ok(connection) => connection,
1132 Err(err) => {
1133 log_connection_accept_error(&configurations, err).await;
1134 return;
1135 }
1136 }
1137 };
1138
1139 let connection_reference = Arc::downgrade(&connection_reference);
1140 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
1141 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
1142 Ok(h3_conn) => h3_conn,
1143 Err(err) => {
1144 log_http_connection_error(&configurations, "HTTP/3", err).await;
1145 return;
1146 }
1147 };
1148
1149 loop {
1150 match crate::runtime::select! {
1151 biased;
1152
1153 _ = shutdown_rx.cancelled() => {
1154 h3_conn.shutdown(0).await.unwrap_or_default();
1155 return;
1156 }
1157 _ = graceful_shutdown_token.cancelled() => {
1158 h3_conn.shutdown(0).await.unwrap_or_default();
1159 return;
1160 }
1161 result = h3_conn.accept() => {
1162 result
1163 }
1164 } {
1165 Ok(Some(resolver)) => {
1166 let configurations = configurations.clone();
1167 let connection_reference = connection_reference.clone();
1168 crate::runtime::spawn(async move {
1169 let _connection_reference = connection_reference;
1170 let (request, stream) = match resolver.resolve_request().await {
1171 Ok(resolved) => resolved,
1172 Err(err) => {
1173 if !err.is_h3_no_error() {
1174 log_http_connection_error(&configurations, "HTTP/3", err).await;
1175 }
1176 return;
1177 }
1178 };
1179
1180 let (mut send, receive) = stream.split();
1181 let request_body_stream =
1182 futures_util::stream::unfold((receive, false), |(mut receive, mut is_body_finished)| async move {
1183 loop {
1184 if !is_body_finished {
1185 match receive.recv_data().await {
1186 Ok(Some(mut data)) => {
1187 return Some((Ok(Frame::data(data.copy_to_bytes(data.remaining()))), (receive, false)));
1188 }
1189 Ok(None) => is_body_finished = true,
1190 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, false))),
1191 }
1192 } else {
1193 match receive.recv_trailers().await {
1194 Ok(Some(trailers)) => return Some((Ok(Frame::trailers(trailers)), (receive, true))),
1195 Ok(None) => return None,
1196 Err(err) => return Some((Err(std::io::Error::other(err.to_string())), (receive, true))),
1197 }
1198 }
1199 }
1200 });
1201 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
1202 let (request_parts, _) = request.into_parts();
1203 let request = Request::from_parts(request_parts, request_body);
1204 let mut response = match request_handler(
1205 request,
1206 client_address,
1207 server_address,
1208 true,
1209 configurations.clone(),
1210 None,
1211 empty_acme_http_01_resolvers(),
1212 None,
1213 None,
1214 )
1215 .await
1216 {
1217 Ok(response) => response,
1218 Err(err) => {
1219 log_http_connection_error(&configurations, "HTTP/3", err).await;
1220 return;
1221 }
1222 };
1223
1224 sanitize_http3_response_headers(response.headers_mut());
1225
1226 let (response_parts, mut response_body) = response.into_parts();
1227 if let Err(err) = send.send_response(Response::from_parts(response_parts, ())).await {
1228 if !err.is_h3_no_error() {
1229 log_http_connection_error(&configurations, "HTTP/3", err).await;
1230 }
1231 return;
1232 }
1233
1234 let mut had_trailers = false;
1235 while let Some(chunk) = response_body.frame().await {
1236 match chunk {
1237 Ok(frame) if frame.is_data() => match frame.into_data() {
1238 Ok(data) => {
1239 if let Err(err) = send.send_data(data).await {
1240 if !err.is_h3_no_error() {
1241 log_http_connection_error(&configurations, "HTTP/3", err).await;
1242 }
1243 return;
1244 }
1245 }
1246 Err(_) => {
1247 log_handler_error(
1248 &configurations,
1249 "Error serving HTTP/3 connection: the frame isn't really a data frame",
1250 )
1251 .await;
1252 return;
1253 }
1254 },
1255 Ok(frame) if frame.is_trailers() => match frame.into_trailers() {
1256 Ok(trailers) => {
1257 had_trailers = true;
1258 if let Err(err) = send.send_trailers(trailers).await {
1259 if !err.is_h3_no_error() {
1260 log_http_connection_error(&configurations, "HTTP/3", err).await;
1261 }
1262 return;
1263 }
1264 }
1265 Err(_) => {
1266 log_handler_error(
1267 &configurations,
1268 "Error serving HTTP/3 connection: the frame isn't really a trailers frame",
1269 )
1270 .await;
1271 return;
1272 }
1273 },
1274 Ok(_) => {}
1275 Err(err) => {
1276 log_http_connection_error(&configurations, "HTTP/3", err).await;
1277 return;
1278 }
1279 }
1280 }
1281
1282 if !had_trailers {
1283 if let Err(err) = send.finish().await {
1284 if !err.is_h3_no_error() {
1285 log_http_connection_error(&configurations, "HTTP/3", err).await;
1286 }
1287 }
1288 }
1289 });
1290 }
1291 Ok(None) => break,
1292 Err(err) => {
1293 if !err.is_h3_no_error() {
1294 log_http_connection_error(&configurations, "HTTP/3", err).await;
1295 }
1296 return;
1297 }
1298 }
1299 }
1300}