1use std::net::SocketAddr;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_channel::Sender;
7use chrono::{DateTime, Local};
8use ferron_common::logging::{ErrorLogger, LogMessage};
9use ferron_common::observability::{MetricsMultiSender, TraceSignal};
10use futures_util::stream::TryStreamExt;
11use http_body_util::combinators::BoxBody;
12use http_body_util::{BodyExt, Empty, Full, StreamBody};
13use hyper::body::{Body, Bytes, Frame};
14use hyper::header::{HeaderName, HeaderValue};
15use hyper::{header, HeaderMap, Method, Request, Response, StatusCode};
16#[cfg(feature = "runtime-tokio")]
17use tokio::io::BufReader;
18#[cfg(feature = "runtime-tokio")]
19use tokio_util::io::ReaderStream;
20
21use crate::config::{ServerConfiguration, ServerConfigurations};
22use crate::get_value;
23use crate::runtime::timeout;
24#[cfg(feature = "runtime-monoio")]
25use crate::util::MonoioFileStreamNoSpawn;
26use crate::util::{
27 generate_default_error_page, replace_header_placeholders, replace_log_placeholders, sanitize_url, SERVER_SOFTWARE,
28};
29
30use ferron_common::modules::{ModuleHandlers, RequestData, SocketData};
31use ferron_common::{get_entries, get_entry};
32
33async fn generate_error_response(
35 status_code: StatusCode,
36 config: &ServerConfiguration,
37 headers: &Option<HeaderMap>,
38) -> Response<BoxBody<Bytes, std::io::Error>> {
39 let bare_body = generate_default_error_page(
40 status_code,
41 get_value!("server_administrator_email", config).and_then(|v| v.as_str()),
42 );
43 let mut content_length: Option<u64> = bare_body.len().try_into().ok();
44 let mut response_body = Full::new(Bytes::from(bare_body)).map_err(|e| match e {}).boxed();
45
46 if let Some(error_pages) = get_entries!("error_page", config) {
47 for error_page in &error_pages.inner {
48 if let Some(page_status_code) = error_page.values.first().and_then(|v| v.as_i128()) {
49 if page_status_code
50 .try_into()
51 .ok()
52 .and_then(|s| StatusCode::from_u16(s).ok())
53 .is_none_or(|s| s != status_code)
54 {
55 continue;
56 }
57 if let Some(page_path) = error_page.values.get(1).and_then(|v| v.as_str()) {
58 #[cfg(feature = "runtime-monoio")]
59 let file = monoio::fs::File::open(page_path).await;
60 #[cfg(feature = "runtime-tokio")]
61 let file = tokio::fs::File::open(page_path).await;
62
63 let Ok(file) = file else {
64 continue;
65 };
66
67 #[cfg(any(feature = "runtime-tokio", all(feature = "runtime-monoio", unix)))]
69 let metadata = file.metadata().await;
70 #[cfg(all(feature = "runtime-monoio", windows))]
71 let metadata = {
72 let page_path = page_path.to_owned();
73 monoio::spawn_blocking(move || std::fs::metadata(page_path))
74 .await
75 .unwrap_or(Err(std::io::Error::other(
76 "Can't spawn a blocking task to obtain the file metadata",
77 )))
78 };
79
80 content_length = metadata.ok().map(|m| m.len());
81
82 #[cfg(feature = "runtime-monoio")]
83 let file_stream = MonoioFileStreamNoSpawn::new(file, None, content_length);
84 #[cfg(feature = "runtime-tokio")]
85 let file_stream = ReaderStream::new(BufReader::with_capacity(12800, file));
86
87 let stream_body = StreamBody::new(file_stream.map_ok(Frame::data));
88 let boxed_body = stream_body.boxed();
89
90 response_body = boxed_body;
91
92 break;
93 }
94 }
95 }
96 }
97
98 let mut response_builder = Response::builder().status(status_code);
99
100 if let Some(headers) = headers {
101 let headers_iter = headers.iter();
102 for (name, value) in headers_iter {
103 if name != header::CONTENT_TYPE && name != header::CONTENT_LENGTH {
104 response_builder = response_builder.header(name, value);
105 }
106 }
107 }
108
109 if let Some(content_length) = content_length {
110 response_builder = response_builder.header(header::CONTENT_LENGTH, content_length);
111 }
112 response_builder = response_builder.header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"));
113
114 response_builder.body(response_body).unwrap_or_default()
115}
116
117#[allow(clippy::too_many_arguments)]
119async fn log_access(
120 loggers: &[Sender<LogMessage>],
121 request_parts: &hyper::http::request::Parts,
122 socket_data: &SocketData,
123 auth_user: Option<&str>,
124 status_code: u16,
125 content_length: Option<u64>,
126 date_format: Option<&str>,
127 log_format: Option<&str>,
128) {
129 let now: DateTime<Local> = Local::now();
130 let formatted_time = now.format(date_format.unwrap_or("%d/%b/%Y:%H:%M:%S %z")).to_string();
131 let log_message_string = replace_log_placeholders(
132 log_format.unwrap_or(
133 "{client_ip} - {auth_user} [{timestamp}] \"{method} {path_and_query} {version}\" \
134 {status_code} {content_length} \"{header:Referer}\" \"{header:User-Agent}\"",
135 ),
136 request_parts,
137 socket_data,
138 auth_user,
139 &formatted_time,
140 status_code,
141 content_length,
142 );
143 for logger in loggers {
144 logger
145 .send(LogMessage::new(log_message_string.clone(), false))
146 .await
147 .unwrap_or_default();
148 }
149}
150
151#[inline]
153fn add_custom_headers(
154 response_parts: &mut hyper::http::response::Parts,
155 headers_to_add: &HeaderMap,
156 headers_to_replace: &HeaderMap,
157 headers_to_remove: &[HeaderName],
158) {
159 for (header_name, header_value) in headers_to_add {
160 if !response_parts.headers.contains_key(header_name) {
161 response_parts.headers.insert(header_name, header_value.to_owned());
162 }
163 }
164
165 for (header_name, header_value) in headers_to_replace {
166 response_parts.headers.insert(header_name, header_value.to_owned());
167 }
168
169 for header_to_remove in headers_to_remove.iter().rev() {
170 if response_parts.headers.contains_key(header_to_remove) {
171 while response_parts.headers.remove(header_to_remove).is_some() {}
172 }
173 }
174}
175
176#[inline]
178fn add_http3_alt_svc_header(response_parts: &mut hyper::http::response::Parts, http3_alt_port: Option<u16>) {
179 if let Some(http3_alt_port) = http3_alt_port {
180 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
181 Some(value) => {
182 let header_value_old = String::from_utf8_lossy(value.as_bytes());
183 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
184
185 if header_value_old != header_value_new {
186 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
187 } else {
188 HeaderValue::from_bytes(header_value_old.as_bytes())
189 }
190 }
191 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
192 } {
193 response_parts.headers.insert(header::ALT_SVC, header_value);
194 }
195 }
196}
197
198#[inline]
200fn add_server_header(response_parts: &mut hyper::http::response::Parts) {
201 response_parts
202 .headers
203 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
204}
205
206fn extract_content_length(response: &Response<BoxBody<Bytes, std::io::Error>>) -> Option<u64> {
208 response
209 .headers()
210 .get(header::CONTENT_LENGTH)
211 .and_then(|header_value| {
212 header_value.to_str().ok().and_then(|header_value| {
213 header_value
214 .parse::<u64>()
215 .ok()
216 .or_else(|| response.body().size_hint().exact())
217 })
218 })
219 .or_else(|| response.body().size_hint().exact())
220}
221
222fn basic_error_response(status_code: StatusCode) -> Response<BoxBody<Bytes, std::io::Error>> {
224 Response::builder()
225 .status(status_code)
226 .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
227 .body(
228 Full::new(Bytes::from(generate_default_error_page(status_code, None)))
229 .map_err(|e| match e {})
230 .boxed(),
231 )
232 .unwrap_or_default()
233}
234
235fn build_custom_headers(
237 configuration: &ServerConfiguration,
238 request_parts: &hyper::http::request::Parts,
239) -> (HeaderMap, HeaderMap, Vec<HeaderName>) {
240 let mut headers_to_add = HeaderMap::new();
241 let mut headers_to_replace = HeaderMap::new();
242 let mut headers_to_remove = Vec::new();
243
244 if let Some(custom_headers) = get_entries!("header", configuration) {
245 for custom_header in custom_headers.inner.iter().rev() {
246 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
247 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
248 if !headers_to_add.contains_key(header_name) {
249 if let Ok(header_name) = HeaderName::from_str(header_name) {
250 if let Ok(header_value) =
251 HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
252 {
253 headers_to_add.insert(header_name, header_value);
254 }
255 }
256 }
257 }
258 }
259 }
260 }
261
262 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
263 for custom_header in custom_headers.inner.iter().rev() {
264 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
265 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
266 if let Ok(header_name) = HeaderName::from_str(header_name) {
267 if let Ok(header_value) =
268 HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
269 {
270 headers_to_replace.insert(header_name, header_value);
271 }
272 }
273 }
274 }
275 }
276 }
277
278 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
279 for custom_header in custom_headers_to_remove.inner.iter().rev() {
280 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
281 if let Ok(header_name) = HeaderName::from_str(header_name) {
282 headers_to_remove.push(header_name);
283 }
284 }
285 }
286 }
287
288 (headers_to_add, headers_to_replace, headers_to_remove)
289}
290
291#[allow(clippy::too_many_arguments)]
293async fn finalize_response_and_log(
294 response: Response<BoxBody<Bytes, std::io::Error>>,
295 http3_alt_port: Option<u16>,
296 headers_to_add: HeaderMap,
297 headers_to_replace: HeaderMap,
298 headers_to_remove: Vec<HeaderName>,
299 loggers: &[Sender<LogMessage>],
300 request_parts: &Option<hyper::http::request::Parts>,
301 socket_data: &SocketData,
302 latest_auth_data: Option<&str>,
303 date_format: Option<&str>,
304 log_format: Option<&str>,
305) -> Response<BoxBody<Bytes, std::io::Error>> {
306 let (mut response_parts, response_body) = response.into_parts();
307
308 add_custom_headers(
309 &mut response_parts,
310 &headers_to_add,
311 &headers_to_replace,
312 &headers_to_remove,
313 );
314 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
315 add_server_header(&mut response_parts);
316
317 let response = Response::from_parts(response_parts, response_body);
318
319 if let Some(request_parts) = request_parts {
320 if !loggers.is_empty() {
321 log_access(
322 loggers,
323 request_parts,
324 socket_data,
325 latest_auth_data,
326 response.status().as_u16(),
327 extract_content_length(&response),
328 date_format,
329 log_format,
330 )
331 .await;
332 }
333 }
334
335 response
336}
337
338#[allow(clippy::too_many_arguments)]
340async fn finalize_basic_error_response(
341 response: Response<BoxBody<Bytes, std::io::Error>>,
342 request_parts: hyper::http::request::Parts,
343 http3_alt_port: Option<u16>,
344 loggers: &[Sender<LogMessage>],
345 socket_data: &SocketData,
346 date_format: Option<&str>,
347 log_format: Option<&str>,
348) -> Response<BoxBody<Bytes, std::io::Error>> {
349 let request_parts = Some(request_parts);
350 finalize_response_and_log(
351 response,
352 http3_alt_port,
353 HeaderMap::new(),
354 HeaderMap::new(),
355 Vec::new(),
356 loggers,
357 &request_parts,
358 socket_data,
359 None,
360 date_format,
361 log_format,
362 )
363 .await
364}
365
366#[allow(clippy::too_many_arguments)]
368#[inline]
369async fn execute_response_modifying_handlers(
370 mut response: Response<BoxBody<Bytes, std::io::Error>>,
371 mut executed_handlers: Vec<Box<dyn ModuleHandlers>>,
372 configuration: &ServerConfiguration,
373 http3_alt_port: Option<u16>,
374 headers_to_add: HeaderMap,
375 headers_to_replace: HeaderMap,
376 headers_to_remove: Vec<HeaderName>,
377 loggers: &[Sender<LogMessage>],
378 request_parts: &Option<hyper::http::request::Parts>,
379 socket_data: &SocketData,
380 latest_auth_data: Option<&str>,
381 date_format: Option<&str>,
382 log_format: Option<&str>,
383 metrics_sender: MetricsMultiSender,
384 metrics_enabled: bool,
385 traces_senders: Vec<Sender<TraceSignal>>,
386 traces_enabled: bool,
387 timeout_instant: std::time::Instant,
388 timeout_duration: Option<std::time::Duration>,
389) -> Result<Result<Response<BoxBody<Bytes, std::io::Error>>, Response<BoxBody<Bytes, std::io::Error>>>, anyhow::Error> {
390 while let Some(mut executed_handler) = executed_handlers.pop() {
391 if traces_enabled {
392 for trace_sender in &traces_senders {
393 trace_sender
394 .send(TraceSignal::StartSpan(format!(
395 "{}::response_modifying_handler",
396 executed_handler.get_name()
397 )))
398 .await
399 .unwrap_or_default();
400 }
401 }
402 let (response_status, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
403 let elapsed = timeout_instant.elapsed();
404 if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
405 match timeout(
406 timeout_cur_duration,
407 executed_handler.response_modifying_handler(response),
408 )
409 .await
410 {
411 Ok(result) => (result, false),
412 Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
413 }
414 } else {
415 (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
416 }
417 } else {
418 (executed_handler.response_modifying_handler(response).await, false)
419 };
420 if traces_enabled {
421 for trace_sender in &traces_senders {
422 trace_sender
423 .send(TraceSignal::EndSpan(
424 format!("{}::response_modifying_handler", executed_handler.get_name()),
425 response_status.as_ref().err().map(|e| e.to_string()),
426 ))
427 .await
428 .unwrap_or_default();
429 }
430 }
431 if is_timeout {
432 if metrics_enabled {
433 while let Some(mut executed_handler) = executed_handlers.pop() {
434 executed_handler.metric_data_after_handler(&metrics_sender).await;
435 }
436 }
437 Err(anyhow::anyhow!("The client or server has timed out"))?;
438 }
439 response = match response_status {
440 Ok(response) => response,
441 Err(err) => {
442 for logger in loggers {
443 logger
444 .send(LogMessage::new(
445 format!("Unexpected error while serving a request: {err}"),
446 true,
447 ))
448 .await
449 .unwrap_or_default();
450 }
451
452 let error_response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, configuration, &None).await;
453
454 let final_response = finalize_response_and_log(
455 error_response,
456 http3_alt_port,
457 headers_to_add,
458 headers_to_replace,
459 headers_to_remove,
460 loggers,
461 request_parts,
462 socket_data,
463 latest_auth_data,
464 date_format,
465 log_format,
466 )
467 .await;
468
469 if metrics_enabled {
470 while let Some(mut executed_handler) = executed_handlers.pop() {
471 executed_handler.metric_data_after_handler(&metrics_sender).await;
472 }
473 }
474
475 return Ok(Err(final_response));
476 }
477 };
478 if metrics_enabled {
479 executed_handler.metric_data_after_handler(&metrics_sender).await;
480 }
481 }
482 Ok(Ok(response))
483}
484
485#[allow(clippy::too_many_arguments)]
487async fn finalize_with_modifying_handlers(
488 response: Response<BoxBody<Bytes, std::io::Error>>,
489 executed_handlers: Vec<Box<dyn ModuleHandlers>>,
490 configuration: &ServerConfiguration,
491 http3_alt_port: Option<u16>,
492 headers_to_add: HeaderMap,
493 headers_to_replace: HeaderMap,
494 headers_to_remove: Vec<HeaderName>,
495 log_request_parts: &Option<hyper::http::request::Parts>,
496 socket_data: &SocketData,
497 latest_auth_data: Option<&str>,
498 log_date_format: Option<&str>,
499 log_format: Option<&str>,
500 metrics_sender: MetricsMultiSender,
501 metrics_enabled: bool,
502 traces_senders: Vec<Sender<TraceSignal>>,
503 traces_enabled: bool,
504 timeout_instant: std::time::Instant,
505 timeout_duration: Option<std::time::Duration>,
506) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
507 let (mut response_parts, response_body) = response.into_parts();
508 add_custom_headers(
509 &mut response_parts,
510 &headers_to_add,
511 &headers_to_replace,
512 &headers_to_remove,
513 );
514 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
515 add_server_header(&mut response_parts);
516
517 let response = Response::from_parts(response_parts, response_body);
518
519 match execute_response_modifying_handlers(
520 response,
521 executed_handlers,
522 configuration,
523 http3_alt_port,
524 headers_to_add,
525 headers_to_replace,
526 headers_to_remove,
527 &configuration.observability.log_channels,
528 log_request_parts,
529 socket_data,
530 latest_auth_data,
531 log_date_format,
532 log_format,
533 metrics_sender,
534 metrics_enabled,
535 traces_senders,
536 traces_enabled,
537 timeout_instant,
538 timeout_duration,
539 )
540 .await?
541 {
542 Ok(response) => {
543 if let Some(request_parts) = log_request_parts.as_ref() {
544 log_access(
545 &configuration.observability.log_channels,
546 request_parts,
547 socket_data,
548 latest_auth_data,
549 response.status().as_u16(),
550 extract_content_length(&response),
551 log_date_format,
552 log_format,
553 )
554 .await;
555 }
556 Ok(response)
557 }
558 Err(error_response) => Ok(error_response),
559 }
560}
561
562#[allow(clippy::type_complexity)]
563#[allow(clippy::result_large_err)]
564fn sanitize_url_request(
565 configuration: &ServerConfiguration,
566 mut request: Request<BoxBody<Bytes, std::io::Error>>,
567) -> Result<
568 (Request<BoxBody<Bytes, std::io::Error>>, bool),
569 (
570 Request<BoxBody<Bytes, std::io::Error>>,
571 Box<dyn std::error::Error + Send + Sync>,
572 ),
573> {
574 let url_pathname = request.uri().path();
575 let sanitized_url_pathname = match sanitize_url(
576 url_pathname,
577 get_value!("allow_double_slashes", configuration)
578 .and_then(|v| v.as_bool())
579 .unwrap_or(false),
580 ) {
581 Ok(sanitized_url_pathname) => sanitized_url_pathname,
582 Err(err) => {
583 return Err((request, err.into()));
584 }
585 };
586
587 if sanitized_url_pathname != url_pathname {
588 let (mut parts, body) = request.into_parts();
589 let orig_uri = parts.uri.clone();
590 let mut url_parts = parts.uri.into_parts();
591 url_parts.path_and_query = Some(
592 match format!(
593 "{}{}",
594 sanitized_url_pathname,
595 url_parts
596 .path_and_query
597 .as_ref()
598 .and_then(|pq| pq.query())
599 .map_or("".to_string(), |q| format!("?{q}"))
600 )
601 .parse()
602 {
603 Ok(path_and_query) => path_and_query,
604 Err(e) => {
605 parts.uri = orig_uri;
606 request = Request::from_parts(parts, body);
607 return Err((request, e.into()));
608 }
609 },
610 );
611 parts.uri = match hyper::Uri::from_parts(url_parts) {
612 Ok(uri) => uri,
613 Err(e) => {
614 parts.uri = orig_uri;
615 request = Request::from_parts(parts, body);
616 return Err((request, e.into()));
617 }
618 };
619 request = Request::from_parts(parts, body);
620 return Ok((request, true));
621 }
622 Ok((request, false))
623}
624
625#[allow(clippy::too_many_arguments)]
627pub async fn request_handler(
628 mut request: Request<BoxBody<Bytes, std::io::Error>>,
629 client_address: SocketAddr,
630 server_address: SocketAddr,
631 encrypted: bool,
632 configurations: Arc<ServerConfigurations>,
633 http3_alt_port: Option<u16>,
634 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
635 proxy_protocol_client_address: Option<SocketAddr>,
636 proxy_protocol_server_address: Option<SocketAddr>,
637) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
638 let global_configuration = configurations.find_global_configuration();
640 let global_loggers: &[Sender<LogMessage>] = global_configuration
641 .as_ref()
642 .map_or(&[], |c| &*c.observability.log_channels);
643 let global_log_date_format = global_configuration
644 .as_deref()
645 .and_then(|c| get_value!("log_date_format", c))
646 .and_then(|v| v.as_str());
647 let global_log_format = global_configuration
648 .as_deref()
649 .and_then(|c| get_value!("log_format", c))
650 .and_then(|v| v.as_str());
651
652 let timeout_from_config = global_configuration
654 .as_deref()
655 .and_then(|c| get_entry!("timeout", c))
656 .and_then(|e| e.values.last());
657 let timeout_duration = if timeout_from_config.is_some_and(|v| v.is_null()) {
658 None
659 } else {
660 let timeout_millis = timeout_from_config.and_then(|v| v.as_i128()).unwrap_or(300000) as u64;
661 Some(Duration::from_millis(timeout_millis))
662 };
663 let timeout_instant = std::time::Instant::now();
664
665 if matches!(request.version(), hyper::Version::HTTP_2 | hyper::Version::HTTP_3) {
667 if let Some(authority) = request.uri().authority() {
669 let authority = authority.to_owned();
670 let headers = request.headers_mut();
671 if !headers.contains_key(header::HOST) {
672 if let Ok(authority_value) = HeaderValue::from_bytes(authority.as_str().as_bytes()) {
673 headers.append(header::HOST, authority_value);
674 }
675 }
676 }
677
678 let mut cookie_normalized = String::new();
680 let mut cookie_set = false;
681 let headers = request.headers_mut();
682 for cookie in headers.get_all(header::COOKIE) {
683 if let Ok(cookie) = cookie.to_str() {
684 if cookie_set {
685 cookie_normalized.push_str("; ");
686 }
687 cookie_set = true;
688 cookie_normalized.push_str(cookie);
689 }
690 }
691 if cookie_set {
692 if let Ok(cookie_value) = HeaderValue::from_bytes(cookie_normalized.as_bytes()) {
693 headers.insert(header::COOKIE, cookie_value);
694 }
695 }
696 }
697
698 let mut socket_data = SocketData {
700 remote_addr: proxy_protocol_client_address.unwrap_or(client_address),
701 local_addr: proxy_protocol_server_address.unwrap_or(server_address),
702 encrypted,
703 };
704
705 let host_header_option = request.headers().get(header::HOST);
707 if let Some(header_data) = host_header_option {
708 match header_data.to_str() {
709 Ok(host_header) => {
710 let host_header_lower_case = host_header.to_lowercase();
711 let host_header_without_dot = host_header_lower_case
712 .strip_suffix('.')
713 .unwrap_or(host_header_lower_case.as_str());
714 if host_header_without_dot != host_header {
715 let host_header_value = match HeaderValue::from_str(host_header_without_dot) {
716 Ok(host_header_value) => host_header_value,
717 Err(err) => {
718 for logger in global_loggers {
719 logger
720 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
721 .await
722 .unwrap_or_default();
723 }
724 let response = basic_error_response(StatusCode::BAD_REQUEST);
725 let (request_parts, _) = request.into_parts();
726 return Ok(
727 finalize_basic_error_response(
728 response,
729 request_parts,
730 http3_alt_port,
731 global_loggers,
732 &socket_data,
733 global_log_date_format,
734 global_log_format,
735 )
736 .await,
737 );
738 }
739 };
740
741 request.headers_mut().insert(header::HOST, host_header_value);
742 }
743 }
744 Err(err) => {
745 for logger in global_loggers {
746 logger
747 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
748 .await
749 .unwrap_or_default();
750 }
751 let response = basic_error_response(StatusCode::BAD_REQUEST);
752 let (request_parts, _) = request.into_parts();
753 return Ok(
754 finalize_basic_error_response(
755 response,
756 request_parts,
757 http3_alt_port,
758 global_loggers,
759 &socket_data,
760 global_log_date_format,
761 global_log_format,
762 )
763 .await,
764 );
765 }
766 }
767 };
768
769 let hostname_determinant = request.headers().get(header::HOST).and_then(|value| {
770 value.to_str().ok().map(|h| {
771 h.rsplit_once(':')
772 .and_then(|(left, right)| {
773 if right.parse::<u16>().is_ok() {
774 Some(left.to_string())
775 } else {
776 None
777 }
778 })
779 .unwrap_or_else(|| h.to_string())
780 })
781 });
782
783 let (request_parts, request_body) = request.into_parts();
784 let mut log_request_parts = if global_configuration
785 .as_ref()
786 .is_some_and(|c| !c.observability.log_channels.is_empty())
787 {
788 Some(request_parts.clone())
789 } else {
790 None
791 };
792 let request = Request::from_parts(request_parts, request_body);
793
794 let (request_parts, request_body) = request.into_parts();
796 let configuration_option =
797 configurations.find_configuration(&request_parts, hostname_determinant.as_deref(), &socket_data);
798 let mut request = Request::from_parts(request_parts, request_body);
799 let mut configuration_error_handler_lookup = match configuration_option
800 .and_then(|c| c.ok_or_else(|| anyhow::anyhow!("No matching configuration found").into_boxed_dyn_error()))
801 {
802 Ok(configuration) => configuration,
803 Err(err) => {
804 for logger in global_loggers {
805 logger
806 .send(LogMessage::new(
807 format!("Cannot determine server configuration: {err}"),
808 true,
809 ))
810 .await
811 .unwrap_or_default()
812 }
813 let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
814 let (request_parts, _) = request.into_parts();
815 return Ok(
816 finalize_basic_error_response(
817 response,
818 request_parts,
819 http3_alt_port,
820 global_loggers,
821 &socket_data,
822 global_log_date_format,
823 global_log_format,
824 )
825 .await,
826 );
827 }
828 };
829 let mut configuration = match configuration_error_handler_lookup.get_default().cloned() {
830 Some(configuration) => configuration,
831 None => {
832 for logger in global_loggers {
833 logger
834 .send(LogMessage::new(
835 "Cannot determine server configuration: No matching configuration found".to_string(),
836 true,
837 ))
838 .await
839 .unwrap_or_default()
840 }
841 let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
842 let (request_parts, _) = request.into_parts();
843 return Ok(
844 finalize_basic_error_response(
845 response,
846 request_parts,
847 http3_alt_port,
848 global_loggers,
849 &socket_data,
850 global_log_date_format,
851 global_log_format,
852 )
853 .await,
854 );
855 }
856 };
857
858 let mut log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
860 let mut log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
861
862 if !configuration.observability.log_channels.is_empty() && log_request_parts.is_none() {
864 let (request_parts, request_body) = request.into_parts();
865 log_request_parts = Some(request_parts.clone());
866 request = Request::from_parts(request_parts, request_body);
867 }
868
869 if !get_value!("disable_url_sanitizer", configuration)
871 .and_then(|v| v.as_bool())
872 .unwrap_or(false)
873 {
874 request = match sanitize_url_request(&configuration, request) {
875 Ok((mut request, was_dirty)) => {
876 if was_dirty {
877 let (parts, body) = request.into_parts();
878 let configuration_option =
879 configurations.find_configuration(&parts, hostname_determinant.as_deref(), &socket_data);
880 request = Request::from_parts(parts, body);
881 match configuration_option {
882 Ok(Some(new_configuration)) => {
883 if let Some(new_config2) = new_configuration.get_default().cloned() {
884 configuration_error_handler_lookup = new_configuration;
885 configuration = new_config2;
886 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
887 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
888 }
889 }
890 Ok(None) => {}
891 Err(err) => {
892 for logger in &configuration.observability.log_channels {
893 logger
894 .send(LogMessage::new(
895 format!("Cannot determine server configuration: {err}"),
896 true,
897 ))
898 .await
899 .unwrap_or_default();
900 }
901 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
902
903 let (request_parts, _) = request.into_parts();
904 let (headers_to_add, headers_to_replace, headers_to_remove) =
905 build_custom_headers(&configuration, &request_parts);
906
907 return Ok(
908 finalize_response_and_log(
909 response,
910 http3_alt_port,
911 headers_to_add,
912 headers_to_replace,
913 headers_to_remove,
914 &configuration.observability.log_channels,
915 &log_request_parts,
916 &socket_data,
917 None,
918 log_date_format,
919 log_format,
920 )
921 .await,
922 );
923 }
924 }
925 }
926 request
927 }
928 Err((request, error)) => {
929 for logger in &configuration.observability.log_channels {
930 logger
931 .send(LogMessage::new(format!("URL sanitation error: {error}"), true))
932 .await
933 .unwrap_or_default();
934 }
935
936 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
937
938 let (parts, _) = request.into_parts();
939 let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &parts);
940
941 return Ok(
942 finalize_response_and_log(
943 response,
944 http3_alt_port,
945 headers_to_add,
946 headers_to_replace,
947 headers_to_remove,
948 &configuration.observability.log_channels,
949 &log_request_parts,
950 &socket_data,
951 None,
952 log_date_format,
953 log_format,
954 )
955 .await,
956 );
957 }
958 };
959 }
960
961 let (request_parts, request_body) = request.into_parts();
962 let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &request_parts);
963 let mut request = Request::from_parts(request_parts, request_body);
964
965 if request.uri().path() == "*" {
966 let response = match request.method() {
967 &Method::OPTIONS => Response::builder()
968 .status(StatusCode::NO_CONTENT)
969 .header(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"))
970 .body(Empty::new().map_err(|e| match e {}).boxed())
971 .unwrap_or_default(),
972 _ => {
973 let mut header_map = HeaderMap::new();
974 header_map.insert(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"));
975 generate_error_response(StatusCode::BAD_REQUEST, &configuration, &Some(header_map)).await
976 }
977 };
978 return Ok(
979 finalize_response_and_log(
980 response,
981 http3_alt_port,
982 headers_to_add,
983 headers_to_replace,
984 headers_to_remove,
985 &configuration.observability.log_channels,
986 &log_request_parts,
987 &socket_data,
988 None,
989 log_date_format,
990 log_format,
991 )
992 .await,
993 );
994 }
995
996 let acme_http_01_resolvers_inner = acme_http_01_resolvers.read().await;
998 if !acme_http_01_resolvers_inner.is_empty() {
999 if let Some(challenge_token) = request.uri().path().strip_prefix("/.well-known/acme-challenge/") {
1000 for acme_http01_resolver in &*acme_http_01_resolvers_inner {
1001 if let Some(http01_acme_data) = &*acme_http01_resolver.read().await {
1002 let acme_response = http01_acme_data.1.clone();
1003 if challenge_token == http01_acme_data.0 {
1004 let response = Response::builder()
1005 .status(StatusCode::OK)
1006 .header(
1007 header::CONTENT_TYPE,
1008 HeaderValue::from_static("application/octet-stream"),
1009 )
1010 .body(Full::new(Bytes::from(acme_response)).map_err(|e| match e {}).boxed())
1011 .unwrap_or_default();
1012
1013 return Ok(
1014 finalize_response_and_log(
1015 response,
1016 http3_alt_port,
1017 headers_to_add,
1018 headers_to_replace,
1019 headers_to_remove,
1020 &configuration.observability.log_channels,
1021 &log_request_parts,
1022 &socket_data,
1023 None,
1024 log_date_format,
1025 log_format,
1026 )
1027 .await,
1028 );
1029 }
1030 }
1031 }
1032 }
1033 };
1034 drop(acme_http_01_resolvers_inner);
1035
1036 let mut error_logger = if !configuration.observability.log_channels.is_empty() {
1037 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1038 } else {
1039 ErrorLogger::without_logger()
1040 };
1041 let mut metrics_enabled = !configuration.observability.metric_channels.is_empty();
1042 let mut metrics_sender = if metrics_enabled {
1043 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1044 } else {
1045 MetricsMultiSender::without_sender()
1046 };
1047 let mut traces_enabled = !configuration.observability.trace_channels.is_empty();
1048 let mut traces_senders = if traces_enabled {
1049 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1050 for channel in &configuration.observability.trace_channels {
1051 channel.0.send(()).await.unwrap_or_default();
1052 if let Ok(channel2) = channel.1.recv().await {
1053 traces_senders.push(channel2);
1054 }
1055 }
1056 traces_senders
1057 } else {
1058 vec![]
1059 };
1060
1061 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1063 for module in &configuration.modules {
1064 module_handlers.push(module.get_module_handlers());
1065 }
1066
1067 request.extensions_mut().insert(RequestData {
1069 auth_user: None,
1070 original_url: None,
1071 error_status_code: None,
1072 });
1073 let mut executed_handlers = Vec::new();
1074 let (request_parts, request_body) = request.into_parts();
1075 let request_parts_cloned = if configuration_error_handler_lookup.has_status_codes() {
1076 let mut request_parts_cloned = request_parts.clone();
1077 request_parts_cloned
1078 .headers
1079 .insert(header::CONTENT_LENGTH, HeaderValue::from_static("0"));
1080 Some(request_parts_cloned)
1081 } else {
1082 None
1084 };
1085 let mut request = Request::from_parts(request_parts, request_body);
1086 let mut latest_auth_data = None;
1087 let mut is_error_handler = false;
1088 let mut handlers_iter: Box<dyn Iterator<Item = Box<dyn ModuleHandlers>>> = Box::new(module_handlers.into_iter());
1089 while let Some(mut handlers) = handlers_iter.next() {
1090 if metrics_enabled {
1091 handlers
1092 .metric_data_before_handler(&request, &socket_data, &metrics_sender)
1093 .await;
1094 }
1095
1096 if traces_enabled {
1097 for trace_sender in &traces_senders {
1098 trace_sender
1099 .send(TraceSignal::StartSpan(format!(
1100 "{}::request_handler",
1101 handlers.get_name()
1102 )))
1103 .await
1104 .unwrap_or_default();
1105 }
1106 }
1107
1108 let (response_result, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
1109 let elapsed = timeout_instant.elapsed();
1110 if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
1111 match timeout(
1112 timeout_cur_duration,
1113 handlers.request_handler(request, &configuration, &socket_data, &error_logger),
1114 )
1115 .await
1116 {
1117 Ok(result) => (result, false),
1118 Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
1119 }
1120 } else {
1121 (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
1122 }
1123 } else {
1124 (
1125 handlers
1126 .request_handler(request, &configuration, &socket_data, &error_logger)
1127 .await,
1128 false,
1129 )
1130 };
1131
1132 if traces_enabled {
1133 for trace_sender in &traces_senders {
1134 trace_sender
1135 .send(TraceSignal::EndSpan(
1136 format!("{}::request_handler", handlers.get_name()),
1137 response_result.as_ref().err().map(|e| e.to_string()),
1138 ))
1139 .await
1140 .unwrap_or_default();
1141 }
1142 }
1143
1144 executed_handlers.push(handlers);
1145
1146 if is_timeout {
1147 if metrics_enabled {
1148 while let Some(mut executed_handler) = executed_handlers.pop() {
1149 executed_handler.metric_data_after_handler(&metrics_sender).await;
1150 }
1151 }
1152 Err(anyhow::anyhow!("The client or server has timed out"))?;
1153 }
1154
1155 match response_result {
1156 Ok(response) => {
1157 let status = response.response_status;
1158 let headers = response.response_headers;
1159 let new_remote_address = response.new_remote_address;
1160 let request_option = response.request;
1161 let response = response.response;
1162 let request_extensions = request_option
1163 .as_ref()
1164 .and_then(|r| r.extensions().get::<RequestData>());
1165 if let Some(request_extensions) = request_extensions {
1166 latest_auth_data = request_extensions.auth_user.clone();
1167 }
1168 if let Some(new_remote_address) = new_remote_address {
1169 socket_data.remote_addr = new_remote_address;
1170 };
1171
1172 match response {
1173 Some(response) => {
1174 return finalize_with_modifying_handlers(
1175 response,
1176 executed_handlers,
1177 &configuration,
1178 http3_alt_port,
1179 headers_to_add,
1180 headers_to_replace,
1181 headers_to_remove,
1182 &log_request_parts,
1183 &socket_data,
1184 latest_auth_data.as_deref(),
1185 log_date_format,
1186 log_format,
1187 metrics_sender,
1188 metrics_enabled,
1189 traces_senders,
1190 traces_enabled,
1191 timeout_instant,
1192 timeout_duration,
1193 )
1194 .await;
1195 }
1196 None => match status {
1197 Some(status) => {
1198 if !is_error_handler {
1199 if let Some(error_configuration) = configuration_error_handler_lookup.get(status.as_u16()).cloned() {
1200 let request_option = if let Some(request) = request_option {
1201 Some(request)
1202 } else {
1203 request_parts_cloned.clone().map(|request_parts_cloned| {
1204 Request::from_parts(request_parts_cloned, Empty::new().map_err(|e| match e {}).boxed())
1205 })
1206 };
1207 if let Some(request_cloned) = request_option {
1208 configuration = error_configuration;
1209 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1210 for module in &configuration.modules {
1211 module_handlers.push(module.get_module_handlers());
1212 }
1213 handlers_iter = Box::new(module_handlers.into_iter());
1214 if metrics_enabled {
1215 while let Some(mut executed_handler) = executed_handlers.pop() {
1216 executed_handler.metric_data_after_handler(&metrics_sender).await;
1217 }
1218 }
1219 executed_handlers = Vec::new();
1220 request = request_cloned;
1221 if let Some(request_data) = request.extensions_mut().get_mut::<RequestData>() {
1222 request_data.error_status_code = Some(status);
1223 }
1224 is_error_handler = true;
1225 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1226 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1227 error_logger = if !configuration.observability.log_channels.is_empty() {
1228 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1229 } else {
1230 ErrorLogger::without_logger()
1231 };
1232 metrics_enabled = !configuration.observability.metric_channels.is_empty();
1233 metrics_sender = if metrics_enabled {
1234 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1235 } else {
1236 MetricsMultiSender::without_sender()
1237 };
1238 traces_enabled = !configuration.observability.trace_channels.is_empty();
1239 traces_senders = if traces_enabled {
1240 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1241 for channel in &configuration.observability.trace_channels {
1242 channel.0.send(()).await.unwrap_or_default();
1243 if let Ok(channel2) = channel.1.recv().await {
1244 traces_senders.push(channel2);
1245 }
1246 }
1247 traces_senders
1248 } else {
1249 vec![]
1250 };
1251 continue;
1252 }
1253 }
1254 }
1255 let response = generate_error_response(status, &configuration, &headers).await;
1256 return finalize_with_modifying_handlers(
1257 response,
1258 executed_handlers,
1259 &configuration,
1260 http3_alt_port,
1261 headers_to_add,
1262 headers_to_replace,
1263 headers_to_remove,
1264 &log_request_parts,
1265 &socket_data,
1266 latest_auth_data.as_deref(),
1267 log_date_format,
1268 log_format,
1269 metrics_sender,
1270 metrics_enabled,
1271 traces_senders,
1272 traces_enabled,
1273 timeout_instant,
1274 timeout_duration,
1275 )
1276 .await;
1277 }
1278 None => match request_option {
1279 Some(request_obtained) => {
1280 request = request_obtained;
1281 continue;
1282 }
1283 None => {
1284 break;
1285 }
1286 },
1287 },
1288 }
1289 }
1290 Err(err) => {
1291 let response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, &configuration, &None).await;
1292
1293 for logger in &configuration.observability.log_channels {
1294 logger
1295 .send(LogMessage::new(
1296 format!("Unexpected error while serving a request: {err}"),
1297 true,
1298 ))
1299 .await
1300 .unwrap_or_default();
1301 }
1302
1303 return finalize_with_modifying_handlers(
1304 response,
1305 executed_handlers,
1306 &configuration,
1307 http3_alt_port,
1308 headers_to_add,
1309 headers_to_replace,
1310 headers_to_remove,
1311 &log_request_parts,
1312 &socket_data,
1313 latest_auth_data.as_deref(),
1314 log_date_format,
1315 log_format,
1316 metrics_sender,
1317 metrics_enabled,
1318 traces_senders,
1319 traces_enabled,
1320 timeout_instant,
1321 timeout_duration,
1322 )
1323 .await;
1324 }
1325 }
1326 }
1327
1328 let response = generate_error_response(StatusCode::NOT_FOUND, &configuration, &None).await;
1329
1330 finalize_with_modifying_handlers(
1331 response,
1332 executed_handlers,
1333 &configuration,
1334 http3_alt_port,
1335 headers_to_add,
1336 headers_to_replace,
1337 headers_to_remove,
1338 &log_request_parts,
1339 &socket_data,
1340 latest_auth_data.as_deref(),
1341 log_date_format,
1342 log_format,
1343 metrics_sender,
1344 metrics_enabled,
1345 traces_senders,
1346 traces_enabled,
1347 timeout_instant,
1348 timeout_duration,
1349 )
1350 .await
1351}