1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_channel::Sender;
8use chrono::{DateTime, Local};
9use ferron_common::logging::{ErrorLogger, LogMessage};
10use ferron_common::observability::{MetricsMultiSender, TraceSignal};
11#[cfg(feature = "runtime-vibeio")]
12use ferron_common::util::FileStream;
13use futures_util::stream::TryStreamExt;
14use http_body_util::combinators::BoxBody;
15use http_body_util::{BodyExt, Empty, Full, StreamBody};
16use hyper::body::{Body, Bytes, Frame};
17use hyper::header::{HeaderName, HeaderValue};
18use hyper::{header, HeaderMap, Method, Request, Response, StatusCode};
19#[cfg(feature = "runtime-tokio")]
20use tokio::io::BufReader;
21#[cfg(feature = "runtime-tokio")]
22use tokio_util::io::ReaderStream;
23
24use crate::config::{ServerConfiguration, ServerConfigurationValue, ServerConfigurations};
25use crate::get_value;
26use crate::runtime::timeout;
27#[cfg(feature = "runtime-monoio")]
28use crate::util::MonoioFileStreamNoSpawn;
29use crate::util::{
30 generate_access_log_message, generate_default_error_page, replace_header_placeholders, sanitize_url, SERVER_SOFTWARE,
31};
32
33use ferron_common::modules::{ModuleHandlers, RequestData, SocketData};
34use ferron_common::{get_entries, get_entry};
35
36async fn generate_error_response(
38 status_code: StatusCode,
39 config: &ServerConfiguration,
40 headers: &Option<HeaderMap>,
41) -> Response<BoxBody<Bytes, std::io::Error>> {
42 let bare_body = generate_default_error_page(
43 status_code,
44 get_value!("server_administrator_email", config).and_then(|v| v.as_str()),
45 );
46 let mut content_length: Option<u64> = bare_body.len().try_into().ok();
47 let mut response_body = Full::new(Bytes::from(bare_body)).map_err(|e| match e {}).boxed();
48
49 if let Some(error_pages) = get_entries!("error_page", config) {
50 for error_page in &error_pages.inner {
51 if let Some(page_status_code) = error_page.values.first().and_then(|v| v.as_i128()) {
52 if page_status_code
53 .try_into()
54 .ok()
55 .and_then(|s| StatusCode::from_u16(s).ok())
56 .is_none_or(|s| s != status_code)
57 {
58 continue;
59 }
60 if let Some(page_path) = error_page.values.get(1).and_then(|v| v.as_str()) {
61 #[cfg(feature = "runtime-monoio")]
62 let file = monoio::fs::File::open(page_path).await;
63 #[cfg(feature = "runtime-tokio")]
64 let file = tokio::fs::File::open(page_path).await;
65 #[cfg(feature = "runtime-vibeio")]
66 let file = vibeio::fs::File::open(page_path).await;
67
68 let Ok(file) = file else {
69 continue;
70 };
71
72 #[cfg(any(
74 feature = "runtime-tokio",
75 feature = "runtime-vibeio",
76 all(feature = "runtime-monoio", unix)
77 ))]
78 let metadata = file.metadata().await;
79 #[cfg(all(feature = "runtime-monoio", windows))]
80 let metadata = {
81 let page_path = page_path.to_owned();
82 monoio::spawn_blocking(move || std::fs::metadata(page_path))
83 .await
84 .unwrap_or(Err(std::io::Error::other(
85 "Can't spawn a blocking task to obtain the file metadata",
86 )))
87 };
88
89 content_length = metadata.ok().map(|m| m.len());
90
91 #[cfg(feature = "runtime-monoio")]
92 let file_stream = MonoioFileStreamNoSpawn::new(file, None, content_length);
93 #[cfg(feature = "runtime-vibeio")]
94 let file_stream = FileStream::new(file, None, content_length);
95 #[cfg(feature = "runtime-tokio")]
96 let file_stream = ReaderStream::new(BufReader::with_capacity(12800, file));
97
98 let stream_body = StreamBody::new(file_stream.map_ok(Frame::data));
99 let boxed_body = stream_body.boxed();
100
101 response_body = boxed_body;
102
103 break;
104 }
105 }
106 }
107 }
108
109 let mut response_builder = Response::builder().status(status_code);
110
111 if let Some(headers) = headers {
112 let headers_iter = headers.iter();
113 for (name, value) in headers_iter {
114 if name != header::CONTENT_TYPE && name != header::CONTENT_LENGTH {
115 response_builder = response_builder.header(name, value);
116 }
117 }
118 }
119
120 if let Some(content_length) = content_length {
121 response_builder = response_builder.header(header::CONTENT_LENGTH, content_length);
122 }
123 response_builder = response_builder.header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"));
124
125 response_builder.body(response_body).unwrap_or_default()
126}
127
128#[allow(clippy::too_many_arguments)]
130async fn log_access(
131 loggers: &[Sender<LogMessage>],
132 request_parts: &hyper::http::request::Parts,
133 socket_data: &SocketData,
134 auth_user: Option<&str>,
135 status_code: u16,
136 content_length: Option<u64>,
137 date_format: Option<&str>,
138 log_format: Option<&str>,
139 log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
140) {
141 let now: DateTime<Local> = Local::now();
142 let formatted_time = now.format(date_format.unwrap_or("%d/%b/%Y:%H:%M:%S %z")).to_string();
143 let log_message_string = generate_access_log_message(
144 request_parts,
145 socket_data,
146 auth_user,
147 &formatted_time,
148 status_code,
149 content_length,
150 log_format,
151 log_json_props,
152 );
153 for logger in loggers {
154 logger
155 .send(LogMessage::new(log_message_string.clone(), false))
156 .await
157 .unwrap_or_default();
158 }
159}
160
161#[inline]
163fn add_custom_headers(
164 response_parts: &mut hyper::http::response::Parts,
165 headers_to_add: &HeaderMap,
166 headers_to_replace: &HeaderMap,
167 headers_to_remove: &[HeaderName],
168) {
169 for (header_name, header_value) in headers_to_add {
170 if !response_parts.headers.contains_key(header_name) {
171 response_parts.headers.insert(header_name, header_value.to_owned());
172 }
173 }
174
175 for (header_name, header_value) in headers_to_replace {
176 response_parts.headers.insert(header_name, header_value.to_owned());
177 }
178
179 for header_to_remove in headers_to_remove.iter().rev() {
180 if response_parts.headers.contains_key(header_to_remove) {
181 while response_parts.headers.remove(header_to_remove).is_some() {}
182 }
183 }
184}
185
186#[inline]
188fn add_http3_alt_svc_header(response_parts: &mut hyper::http::response::Parts, http3_alt_port: Option<u16>) {
189 if let Some(http3_alt_port) = http3_alt_port {
190 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
191 Some(value) => {
192 let header_value_old = String::from_utf8_lossy(value.as_bytes());
193 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
194
195 if header_value_old != header_value_new {
196 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
197 } else {
198 HeaderValue::from_bytes(header_value_old.as_bytes())
199 }
200 }
201 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
202 } {
203 response_parts.headers.insert(header::ALT_SVC, header_value);
204 }
205 }
206}
207
208#[inline]
210fn add_server_header(response_parts: &mut hyper::http::response::Parts) {
211 response_parts
212 .headers
213 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
214}
215
216fn extract_content_length(response: &Response<BoxBody<Bytes, std::io::Error>>) -> Option<u64> {
218 response
219 .headers()
220 .get(header::CONTENT_LENGTH)
221 .and_then(|header_value| {
222 header_value.to_str().ok().and_then(|header_value| {
223 header_value
224 .parse::<u64>()
225 .ok()
226 .or_else(|| response.body().size_hint().exact())
227 })
228 })
229 .or_else(|| response.body().size_hint().exact())
230}
231
232fn basic_error_response(status_code: StatusCode) -> Response<BoxBody<Bytes, std::io::Error>> {
234 Response::builder()
235 .status(status_code)
236 .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
237 .body(
238 Full::new(Bytes::from(generate_default_error_page(status_code, None)))
239 .map_err(|e| match e {})
240 .boxed(),
241 )
242 .unwrap_or_default()
243}
244
245fn build_custom_headers(
247 configuration: &ServerConfiguration,
248 request_parts: &hyper::http::request::Parts,
249) -> (HeaderMap, HeaderMap, Vec<HeaderName>) {
250 let mut headers_to_add = HeaderMap::new();
251 let mut headers_to_replace = HeaderMap::new();
252 let mut headers_to_remove = Vec::new();
253
254 if let Some(custom_headers) = get_entries!("header", configuration) {
255 for custom_header in custom_headers.inner.iter().rev() {
256 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
257 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
258 if !headers_to_add.contains_key(header_name) {
259 if let Ok(header_name) = HeaderName::from_str(header_name) {
260 if let Ok(header_value) =
261 HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
262 {
263 headers_to_add.insert(header_name, header_value);
264 }
265 }
266 }
267 }
268 }
269 }
270 }
271
272 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
273 for custom_header in custom_headers.inner.iter().rev() {
274 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
275 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
276 if let Ok(header_name) = HeaderName::from_str(header_name) {
277 if let Ok(header_value) =
278 HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
279 {
280 headers_to_replace.insert(header_name, header_value);
281 }
282 }
283 }
284 }
285 }
286 }
287
288 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
289 for custom_header in custom_headers_to_remove.inner.iter().rev() {
290 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
291 if let Ok(header_name) = HeaderName::from_str(header_name) {
292 headers_to_remove.push(header_name);
293 }
294 }
295 }
296 }
297
298 (headers_to_add, headers_to_replace, headers_to_remove)
299}
300
301#[allow(clippy::too_many_arguments)]
303async fn finalize_response_and_log(
304 response: Response<BoxBody<Bytes, std::io::Error>>,
305 http3_alt_port: Option<u16>,
306 headers_to_add: HeaderMap,
307 headers_to_replace: HeaderMap,
308 headers_to_remove: Vec<HeaderName>,
309 loggers: &[Sender<LogMessage>],
310 request_parts: &Option<hyper::http::request::Parts>,
311 socket_data: &SocketData,
312 latest_auth_data: Option<&str>,
313 date_format: Option<&str>,
314 log_format: Option<&str>,
315 log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
316) -> Response<BoxBody<Bytes, std::io::Error>> {
317 let (mut response_parts, response_body) = response.into_parts();
318
319 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
320 add_server_header(&mut response_parts);
321 add_custom_headers(
322 &mut response_parts,
323 &headers_to_add,
324 &headers_to_replace,
325 &headers_to_remove,
326 );
327
328 let response = Response::from_parts(response_parts, response_body);
329
330 if let Some(request_parts) = request_parts {
331 if !loggers.is_empty() {
332 log_access(
333 loggers,
334 request_parts,
335 socket_data,
336 latest_auth_data,
337 response.status().as_u16(),
338 extract_content_length(&response),
339 date_format,
340 log_format,
341 log_json_props,
342 )
343 .await;
344 }
345 }
346
347 response
348}
349
350#[allow(clippy::too_many_arguments)]
352async fn finalize_basic_error_response(
353 response: Response<BoxBody<Bytes, std::io::Error>>,
354 request_parts: hyper::http::request::Parts,
355 http3_alt_port: Option<u16>,
356 loggers: &[Sender<LogMessage>],
357 socket_data: &SocketData,
358 date_format: Option<&str>,
359 log_format: Option<&str>,
360 log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
361) -> Response<BoxBody<Bytes, std::io::Error>> {
362 let request_parts = Some(request_parts);
363 finalize_response_and_log(
364 response,
365 http3_alt_port,
366 HeaderMap::new(),
367 HeaderMap::new(),
368 Vec::new(),
369 loggers,
370 &request_parts,
371 socket_data,
372 None,
373 date_format,
374 log_format,
375 log_json_props,
376 )
377 .await
378}
379
380#[allow(clippy::too_many_arguments)]
382#[inline]
383async fn execute_response_modifying_handlers(
384 mut response: Response<BoxBody<Bytes, std::io::Error>>,
385 mut executed_handlers: Vec<Box<dyn ModuleHandlers>>,
386 configuration: &ServerConfiguration,
387 http3_alt_port: Option<u16>,
388 headers_to_add: HeaderMap,
389 headers_to_replace: HeaderMap,
390 headers_to_remove: Vec<HeaderName>,
391 loggers: &[Sender<LogMessage>],
392 request_parts: &Option<hyper::http::request::Parts>,
393 socket_data: &SocketData,
394 latest_auth_data: Option<&str>,
395 date_format: Option<&str>,
396 log_format: Option<&str>,
397 log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
398 metrics_sender: MetricsMultiSender,
399 metrics_enabled: bool,
400 traces_senders: Vec<Sender<TraceSignal>>,
401 traces_enabled: bool,
402 timeout_instant: std::time::Instant,
403 timeout_duration: Option<std::time::Duration>,
404) -> Result<Result<Response<BoxBody<Bytes, std::io::Error>>, Response<BoxBody<Bytes, std::io::Error>>>, anyhow::Error> {
405 while let Some(mut executed_handler) = executed_handlers.pop() {
406 if traces_enabled {
407 for trace_sender in &traces_senders {
408 trace_sender
409 .send(TraceSignal::StartSpan(format!(
410 "{}::response_modifying_handler",
411 executed_handler.get_name()
412 )))
413 .await
414 .unwrap_or_default();
415 }
416 }
417 let (response_status, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
418 let elapsed = timeout_instant.elapsed();
419 if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
420 match timeout(
421 timeout_cur_duration,
422 executed_handler.response_modifying_handler(response),
423 )
424 .await
425 {
426 Ok(result) => (result, false),
427 Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
428 }
429 } else {
430 (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
431 }
432 } else {
433 (executed_handler.response_modifying_handler(response).await, false)
434 };
435 if traces_enabled {
436 for trace_sender in &traces_senders {
437 trace_sender
438 .send(TraceSignal::EndSpan(
439 format!("{}::response_modifying_handler", executed_handler.get_name()),
440 response_status.as_ref().err().map(|e| e.to_string()),
441 ))
442 .await
443 .unwrap_or_default();
444 }
445 }
446 if is_timeout {
447 if metrics_enabled {
448 while let Some(mut executed_handler) = executed_handlers.pop() {
449 executed_handler.metric_data_after_handler(&metrics_sender).await;
450 }
451 }
452 Err(anyhow::anyhow!("The client or server has timed out"))?;
453 }
454 response = match response_status {
455 Ok(response) => response,
456 Err(err) => {
457 for logger in loggers {
458 logger
459 .send(LogMessage::new(
460 format!("Unexpected error while serving a request: {err}"),
461 true,
462 ))
463 .await
464 .unwrap_or_default();
465 }
466
467 let error_response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, configuration, &None).await;
468
469 let final_response = finalize_response_and_log(
470 error_response,
471 http3_alt_port,
472 headers_to_add,
473 headers_to_replace,
474 headers_to_remove,
475 loggers,
476 request_parts,
477 socket_data,
478 latest_auth_data,
479 date_format,
480 log_format,
481 log_json_props,
482 )
483 .await;
484
485 if metrics_enabled {
486 while let Some(mut executed_handler) = executed_handlers.pop() {
487 executed_handler.metric_data_after_handler(&metrics_sender).await;
488 }
489 }
490
491 return Ok(Err(final_response));
492 }
493 };
494 if metrics_enabled {
495 executed_handler.metric_data_after_handler(&metrics_sender).await;
496 }
497 }
498 Ok(Ok(response))
499}
500
501#[allow(clippy::too_many_arguments)]
503async fn finalize_with_modifying_handlers(
504 response: Response<BoxBody<Bytes, std::io::Error>>,
505 executed_handlers: Vec<Box<dyn ModuleHandlers>>,
506 configuration: &ServerConfiguration,
507 http3_alt_port: Option<u16>,
508 headers_to_add: HeaderMap,
509 headers_to_replace: HeaderMap,
510 headers_to_remove: Vec<HeaderName>,
511 log_request_parts: &Option<hyper::http::request::Parts>,
512 socket_data: &SocketData,
513 latest_auth_data: Option<&str>,
514 log_date_format: Option<&str>,
515 log_format: Option<&str>,
516 log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
517 metrics_sender: MetricsMultiSender,
518 metrics_enabled: bool,
519 traces_senders: Vec<Sender<TraceSignal>>,
520 traces_enabled: bool,
521 timeout_instant: std::time::Instant,
522 timeout_duration: Option<std::time::Duration>,
523) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
524 let (mut response_parts, response_body) = response.into_parts();
525
526 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
527 add_server_header(&mut response_parts);
528 add_custom_headers(
529 &mut response_parts,
530 &headers_to_add,
531 &headers_to_replace,
532 &headers_to_remove,
533 );
534
535 let response = Response::from_parts(response_parts, response_body);
536
537 match execute_response_modifying_handlers(
538 response,
539 executed_handlers,
540 configuration,
541 http3_alt_port,
542 headers_to_add,
543 headers_to_replace,
544 headers_to_remove,
545 &configuration.observability.log_channels,
546 log_request_parts,
547 socket_data,
548 latest_auth_data,
549 log_date_format,
550 log_format,
551 log_json_props,
552 metrics_sender,
553 metrics_enabled,
554 traces_senders,
555 traces_enabled,
556 timeout_instant,
557 timeout_duration,
558 )
559 .await?
560 {
561 Ok(response) => {
562 if let Some(request_parts) = log_request_parts.as_ref() {
563 log_access(
564 &configuration.observability.log_channels,
565 request_parts,
566 socket_data,
567 latest_auth_data,
568 response.status().as_u16(),
569 extract_content_length(&response),
570 log_date_format,
571 log_format,
572 log_json_props,
573 )
574 .await;
575 }
576 Ok(response)
577 }
578 Err(error_response) => Ok(error_response),
579 }
580}
581
582#[allow(clippy::type_complexity)]
583#[allow(clippy::result_large_err)]
584fn sanitize_url_request(
585 configuration: &ServerConfiguration,
586 mut request: Request<BoxBody<Bytes, std::io::Error>>,
587) -> Result<
588 (Request<BoxBody<Bytes, std::io::Error>>, bool),
589 (
590 Request<BoxBody<Bytes, std::io::Error>>,
591 Box<dyn std::error::Error + Send + Sync>,
592 ),
593> {
594 let url_pathname = request.uri().path();
595 let sanitized_url_pathname = match sanitize_url(
596 url_pathname,
597 get_value!("allow_double_slashes", configuration)
598 .and_then(|v| v.as_bool())
599 .unwrap_or(false),
600 ) {
601 Ok(sanitized_url_pathname) => sanitized_url_pathname,
602 Err(err) => {
603 return Err((request, err.into()));
604 }
605 };
606
607 if sanitized_url_pathname != url_pathname {
608 let (mut parts, body) = request.into_parts();
609 let orig_uri = parts.uri.clone();
610 let mut url_parts = parts.uri.into_parts();
611 url_parts.path_and_query = Some(
612 match format!(
613 "{}{}",
614 sanitized_url_pathname,
615 url_parts
616 .path_and_query
617 .as_ref()
618 .and_then(|pq| pq.query())
619 .map_or("".to_string(), |q| format!("?{q}"))
620 )
621 .parse()
622 {
623 Ok(path_and_query) => path_and_query,
624 Err(e) => {
625 parts.uri = orig_uri;
626 request = Request::from_parts(parts, body);
627 return Err((request, e.into()));
628 }
629 },
630 );
631 parts.uri = match hyper::Uri::from_parts(url_parts) {
632 Ok(uri) => uri,
633 Err(e) => {
634 parts.uri = orig_uri;
635 request = Request::from_parts(parts, body);
636 return Err((request, e.into()));
637 }
638 };
639 request = Request::from_parts(parts, body);
640 return Ok((request, true));
641 }
642 Ok((request, false))
643}
644
645#[allow(clippy::too_many_arguments)]
647pub async fn request_handler(
648 mut request: Request<BoxBody<Bytes, std::io::Error>>,
649 client_address: SocketAddr,
650 server_address: SocketAddr,
651 encrypted: bool,
652 configurations: Arc<ServerConfigurations>,
653 http3_alt_port: Option<u16>,
654 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
655 proxy_protocol_client_address: Option<SocketAddr>,
656 proxy_protocol_server_address: Option<SocketAddr>,
657) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
658 let global_configuration = configurations.find_global_configuration();
660 let global_loggers: &[Sender<LogMessage>] = global_configuration
661 .as_ref()
662 .map_or(&[], |c| &*c.observability.log_channels);
663 let global_log_date_format = global_configuration
664 .as_deref()
665 .and_then(|c| get_value!("log_date_format", c))
666 .and_then(|v| v.as_str());
667 let global_log_format = global_configuration
668 .as_deref()
669 .and_then(|c| get_value!("log_format", c))
670 .and_then(|v| v.as_str());
671 let global_log_json = global_configuration
672 .as_deref()
673 .and_then(|c| c.entries.get("log_json"))
674 .and_then(|entries| entries.get_entry())
675 .map(|entry| entry.props.clone());
676
677 let timeout_from_config = global_configuration
679 .as_deref()
680 .and_then(|c| get_entry!("timeout", c))
681 .and_then(|e| e.values.last());
682 let timeout_duration = if timeout_from_config.is_some_and(|v| v.is_null()) {
683 None
684 } else {
685 let timeout_millis = timeout_from_config.and_then(|v| v.as_i128()).unwrap_or(300000) as u64;
686 Some(Duration::from_millis(timeout_millis))
687 };
688 let timeout_instant = std::time::Instant::now();
689
690 if matches!(request.version(), hyper::Version::HTTP_2 | hyper::Version::HTTP_3) {
692 if let Some(authority) = request.uri().authority() {
694 let authority = authority.to_owned();
695 let headers = request.headers_mut();
696 if !headers.contains_key(header::HOST) {
697 if let Ok(authority_value) = HeaderValue::from_bytes(authority.as_str().as_bytes()) {
698 headers.append(header::HOST, authority_value);
699 }
700 }
701 }
702
703 let mut cookie_normalized = String::new();
705 let mut cookie_set = false;
706 let headers = request.headers_mut();
707 for cookie in headers.get_all(header::COOKIE) {
708 if let Ok(cookie) = cookie.to_str() {
709 if cookie_set {
710 cookie_normalized.push_str("; ");
711 }
712 cookie_set = true;
713 cookie_normalized.push_str(cookie);
714 }
715 }
716 if cookie_set {
717 if let Ok(cookie_value) = HeaderValue::from_bytes(cookie_normalized.as_bytes()) {
718 headers.insert(header::COOKIE, cookie_value);
719 }
720 }
721 }
722
723 let mut socket_data = SocketData {
725 remote_addr: proxy_protocol_client_address.unwrap_or(client_address),
726 local_addr: proxy_protocol_server_address.unwrap_or(server_address),
727 encrypted,
728 };
729
730 let host_header_option = request.headers().get(header::HOST);
732 if let Some(header_data) = host_header_option {
733 match header_data.to_str() {
734 Ok(host_header) => {
735 let host_header_lower_case = host_header.to_lowercase();
736 let host_header_without_dot = host_header_lower_case
737 .strip_suffix('.')
738 .unwrap_or(host_header_lower_case.as_str());
739 if host_header_without_dot != host_header {
740 let host_header_value = match HeaderValue::from_str(host_header_without_dot) {
741 Ok(host_header_value) => host_header_value,
742 Err(err) => {
743 for logger in global_loggers {
744 logger
745 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
746 .await
747 .unwrap_or_default();
748 }
749 let response = basic_error_response(StatusCode::BAD_REQUEST);
750 let (request_parts, _) = request.into_parts();
751 return Ok(
752 finalize_basic_error_response(
753 response,
754 request_parts,
755 http3_alt_port,
756 global_loggers,
757 &socket_data,
758 global_log_date_format,
759 global_log_format,
760 global_log_json.as_ref(),
761 )
762 .await,
763 );
764 }
765 };
766
767 request.headers_mut().insert(header::HOST, host_header_value);
768 }
769 }
770 Err(err) => {
771 for logger in global_loggers {
772 logger
773 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
774 .await
775 .unwrap_or_default();
776 }
777 let response = basic_error_response(StatusCode::BAD_REQUEST);
778 let (request_parts, _) = request.into_parts();
779 return Ok(
780 finalize_basic_error_response(
781 response,
782 request_parts,
783 http3_alt_port,
784 global_loggers,
785 &socket_data,
786 global_log_date_format,
787 global_log_format,
788 global_log_json.as_ref(),
789 )
790 .await,
791 );
792 }
793 }
794 };
795
796 let hostname_determinant = request.headers().get(header::HOST).and_then(|value| {
797 value.to_str().ok().map(|h| {
798 h.rsplit_once(':')
799 .and_then(|(left, right)| {
800 if right.parse::<u16>().is_ok() {
801 Some(left.to_string())
802 } else {
803 None
804 }
805 })
806 .unwrap_or_else(|| h.to_string())
807 })
808 });
809
810 let (request_parts, request_body) = request.into_parts();
811 let mut log_request_parts = if global_configuration
812 .as_ref()
813 .is_some_and(|c| !c.observability.log_channels.is_empty())
814 {
815 Some(request_parts.clone())
816 } else {
817 None
818 };
819 let request = Request::from_parts(request_parts, request_body);
820
821 let (request_parts, request_body) = request.into_parts();
823 let configuration_option =
824 configurations.find_configuration(&request_parts, hostname_determinant.as_deref(), &socket_data);
825 let mut request = Request::from_parts(request_parts, request_body);
826 let mut configuration_error_handler_lookup = match configuration_option
827 .and_then(|c| c.ok_or_else(|| anyhow::anyhow!("No matching configuration found").into_boxed_dyn_error()))
828 {
829 Ok(configuration) => configuration,
830 Err(err) => {
831 for logger in global_loggers {
832 logger
833 .send(LogMessage::new(
834 format!("Cannot determine server configuration: {err}"),
835 true,
836 ))
837 .await
838 .unwrap_or_default()
839 }
840 let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
841 let (request_parts, _) = request.into_parts();
842 return Ok(
843 finalize_basic_error_response(
844 response,
845 request_parts,
846 http3_alt_port,
847 global_loggers,
848 &socket_data,
849 global_log_date_format,
850 global_log_format,
851 global_log_json.as_ref(),
852 )
853 .await,
854 );
855 }
856 };
857 let mut configuration = match configuration_error_handler_lookup.get_default().cloned() {
858 Some(configuration) => configuration,
859 None => {
860 for logger in global_loggers {
861 logger
862 .send(LogMessage::new(
863 "Cannot determine server configuration: No matching configuration found".to_string(),
864 true,
865 ))
866 .await
867 .unwrap_or_default()
868 }
869 let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
870 let (request_parts, _) = request.into_parts();
871 return Ok(
872 finalize_basic_error_response(
873 response,
874 request_parts,
875 http3_alt_port,
876 global_loggers,
877 &socket_data,
878 global_log_date_format,
879 global_log_format,
880 global_log_json.as_ref(),
881 )
882 .await,
883 );
884 }
885 };
886
887 let mut log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
889 let mut log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
890 let mut log_json_props = configuration
891 .entries
892 .get("log_json")
893 .and_then(|entries| entries.get_entry())
894 .map(|entry| entry.props.clone());
895
896 if !configuration.observability.log_channels.is_empty() && log_request_parts.is_none() {
898 let (request_parts, request_body) = request.into_parts();
899 log_request_parts = Some(request_parts.clone());
900 request = Request::from_parts(request_parts, request_body);
901 }
902
903 if !get_value!("disable_url_sanitizer", configuration)
905 .and_then(|v| v.as_bool())
906 .unwrap_or(false)
907 {
908 request = match sanitize_url_request(&configuration, request) {
909 Ok((mut request, was_dirty)) => {
910 if was_dirty {
911 let (parts, body) = request.into_parts();
912 let configuration_option =
913 configurations.find_configuration(&parts, hostname_determinant.as_deref(), &socket_data);
914 request = Request::from_parts(parts, body);
915 match configuration_option {
916 Ok(Some(new_configuration)) => {
917 if let Some(new_config2) = new_configuration.get_default().cloned() {
918 configuration_error_handler_lookup = new_configuration;
919 configuration = new_config2;
920 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
921 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
922 log_json_props = configuration
923 .entries
924 .get("log_json")
925 .and_then(|entries| entries.get_entry())
926 .map(|entry| entry.props.clone());
927 }
928 }
929 Ok(None) => {}
930 Err(err) => {
931 for logger in &configuration.observability.log_channels {
932 logger
933 .send(LogMessage::new(
934 format!("Cannot determine server configuration: {err}"),
935 true,
936 ))
937 .await
938 .unwrap_or_default();
939 }
940 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
941
942 let (request_parts, _) = request.into_parts();
943 let (headers_to_add, headers_to_replace, headers_to_remove) =
944 build_custom_headers(&configuration, &request_parts);
945
946 return Ok(
947 finalize_response_and_log(
948 response,
949 http3_alt_port,
950 headers_to_add,
951 headers_to_replace,
952 headers_to_remove,
953 &configuration.observability.log_channels,
954 &log_request_parts,
955 &socket_data,
956 None,
957 log_date_format,
958 log_format,
959 log_json_props.as_ref(),
960 )
961 .await,
962 );
963 }
964 }
965 }
966 request
967 }
968 Err((request, error)) => {
969 for logger in &configuration.observability.log_channels {
970 logger
971 .send(LogMessage::new(format!("URL sanitation error: {error}"), true))
972 .await
973 .unwrap_or_default();
974 }
975
976 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
977
978 let (parts, _) = request.into_parts();
979 let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &parts);
980
981 return Ok(
982 finalize_response_and_log(
983 response,
984 http3_alt_port,
985 headers_to_add,
986 headers_to_replace,
987 headers_to_remove,
988 &configuration.observability.log_channels,
989 &log_request_parts,
990 &socket_data,
991 None,
992 log_date_format,
993 log_format,
994 log_json_props.as_ref(),
995 )
996 .await,
997 );
998 }
999 };
1000 }
1001
1002 let (request_parts, request_body) = request.into_parts();
1003 let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &request_parts);
1004 let mut request = Request::from_parts(request_parts, request_body);
1005
1006 if request.uri().path() == "*" {
1007 let response = match request.method() {
1008 &Method::OPTIONS => Response::builder()
1009 .status(StatusCode::NO_CONTENT)
1010 .header(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"))
1011 .body(Empty::new().map_err(|e| match e {}).boxed())
1012 .unwrap_or_default(),
1013 _ => {
1014 let mut header_map = HeaderMap::new();
1015 header_map.insert(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"));
1016 generate_error_response(StatusCode::BAD_REQUEST, &configuration, &Some(header_map)).await
1017 }
1018 };
1019 return Ok(
1020 finalize_response_and_log(
1021 response,
1022 http3_alt_port,
1023 headers_to_add,
1024 headers_to_replace,
1025 headers_to_remove,
1026 &configuration.observability.log_channels,
1027 &log_request_parts,
1028 &socket_data,
1029 None,
1030 log_date_format,
1031 log_format,
1032 log_json_props.as_ref(),
1033 )
1034 .await,
1035 );
1036 }
1037
1038 let acme_http_01_resolvers_inner = acme_http_01_resolvers.read().await;
1040 if !acme_http_01_resolvers_inner.is_empty() {
1041 if let Some(challenge_token) = request.uri().path().strip_prefix("/.well-known/acme-challenge/") {
1042 for acme_http01_resolver in &*acme_http_01_resolvers_inner {
1043 if let Some(http01_acme_data) = &*acme_http01_resolver.read().await {
1044 let acme_response = http01_acme_data.1.clone();
1045 if challenge_token == http01_acme_data.0 {
1046 let response = Response::builder()
1047 .status(StatusCode::OK)
1048 .header(
1049 header::CONTENT_TYPE,
1050 HeaderValue::from_static("application/octet-stream"),
1051 )
1052 .body(Full::new(Bytes::from(acme_response)).map_err(|e| match e {}).boxed())
1053 .unwrap_or_default();
1054
1055 return Ok(
1056 finalize_response_and_log(
1057 response,
1058 http3_alt_port,
1059 headers_to_add,
1060 headers_to_replace,
1061 headers_to_remove,
1062 &configuration.observability.log_channels,
1063 &log_request_parts,
1064 &socket_data,
1065 None,
1066 log_date_format,
1067 log_format,
1068 log_json_props.as_ref(),
1069 )
1070 .await,
1071 );
1072 }
1073 }
1074 }
1075 }
1076 };
1077 drop(acme_http_01_resolvers_inner);
1078
1079 let mut error_logger = if !configuration.observability.log_channels.is_empty() {
1080 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1081 } else {
1082 ErrorLogger::without_logger()
1083 };
1084 let mut metrics_enabled = !configuration.observability.metric_channels.is_empty();
1085 let mut metrics_sender = if metrics_enabled {
1086 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1087 } else {
1088 MetricsMultiSender::without_sender()
1089 };
1090 let mut traces_enabled = !configuration.observability.trace_channels.is_empty();
1091 let mut traces_senders = if traces_enabled {
1092 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1093 for channel in &configuration.observability.trace_channels {
1094 channel.0.send(()).await.unwrap_or_default();
1095 if let Ok(channel2) = channel.1.recv().await {
1096 traces_senders.push(channel2);
1097 }
1098 }
1099 traces_senders
1100 } else {
1101 vec![]
1102 };
1103
1104 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1106 for module in &configuration.modules {
1107 module_handlers.push(module.get_module_handlers());
1108 }
1109
1110 request.extensions_mut().insert(RequestData {
1112 auth_user: None,
1113 original_url: None,
1114 error_status_code: None,
1115 });
1116 let mut executed_handlers = Vec::new();
1117 let (request_parts, request_body) = request.into_parts();
1118 let request_parts_cloned = if configuration_error_handler_lookup.has_status_codes() {
1119 let mut request_parts_cloned = request_parts.clone();
1120 request_parts_cloned
1121 .headers
1122 .insert(header::CONTENT_LENGTH, HeaderValue::from_static("0"));
1123 Some(request_parts_cloned)
1124 } else {
1125 None
1127 };
1128 let mut request = Request::from_parts(request_parts, request_body);
1129 let mut latest_auth_data = None;
1130 let mut is_error_handler = false;
1131 let mut handlers_iter: Box<dyn Iterator<Item = Box<dyn ModuleHandlers>>> = Box::new(module_handlers.into_iter());
1132 while let Some(mut handlers) = handlers_iter.next() {
1133 if metrics_enabled {
1134 handlers
1135 .metric_data_before_handler(&request, &socket_data, &metrics_sender)
1136 .await;
1137 }
1138
1139 if traces_enabled {
1140 for trace_sender in &traces_senders {
1141 trace_sender
1142 .send(TraceSignal::StartSpan(format!(
1143 "{}::request_handler",
1144 handlers.get_name()
1145 )))
1146 .await
1147 .unwrap_or_default();
1148 }
1149 }
1150
1151 let (response_result, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
1152 let elapsed = timeout_instant.elapsed();
1153 if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
1154 match timeout(
1155 timeout_cur_duration,
1156 handlers.request_handler(request, &configuration, &socket_data, &error_logger),
1157 )
1158 .await
1159 {
1160 Ok(result) => (result, false),
1161 Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
1162 }
1163 } else {
1164 (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
1165 }
1166 } else {
1167 (
1168 handlers
1169 .request_handler(request, &configuration, &socket_data, &error_logger)
1170 .await,
1171 false,
1172 )
1173 };
1174
1175 if traces_enabled {
1176 for trace_sender in &traces_senders {
1177 trace_sender
1178 .send(TraceSignal::EndSpan(
1179 format!("{}::request_handler", handlers.get_name()),
1180 response_result.as_ref().err().map(|e| e.to_string()),
1181 ))
1182 .await
1183 .unwrap_or_default();
1184 }
1185 }
1186
1187 executed_handlers.push(handlers);
1188
1189 if is_timeout {
1190 if metrics_enabled {
1191 while let Some(mut executed_handler) = executed_handlers.pop() {
1192 executed_handler.metric_data_after_handler(&metrics_sender).await;
1193 }
1194 }
1195 Err(anyhow::anyhow!("The client or server has timed out"))?;
1196 }
1197
1198 match response_result {
1199 Ok(response) => {
1200 let status = response.response_status;
1201 let headers = response.response_headers;
1202 let new_remote_address = response.new_remote_address;
1203 let request_option = response.request;
1204 let response = response.response;
1205 let request_extensions = request_option
1206 .as_ref()
1207 .and_then(|r| r.extensions().get::<RequestData>());
1208 if let Some(request_extensions) = request_extensions {
1209 latest_auth_data = request_extensions.auth_user.clone();
1210 }
1211 if let Some(new_remote_address) = new_remote_address {
1212 socket_data.remote_addr = new_remote_address;
1213 };
1214
1215 match response {
1216 Some(response) => {
1217 return finalize_with_modifying_handlers(
1218 response,
1219 executed_handlers,
1220 &configuration,
1221 http3_alt_port,
1222 headers_to_add,
1223 headers_to_replace,
1224 headers_to_remove,
1225 &log_request_parts,
1226 &socket_data,
1227 latest_auth_data.as_deref(),
1228 log_date_format,
1229 log_format,
1230 log_json_props.as_ref(),
1231 metrics_sender,
1232 metrics_enabled,
1233 traces_senders,
1234 traces_enabled,
1235 timeout_instant,
1236 timeout_duration,
1237 )
1238 .await;
1239 }
1240 None => match status {
1241 Some(status) => {
1242 if !is_error_handler {
1243 if let Some(error_configuration) = configuration_error_handler_lookup.get(status.as_u16()).cloned() {
1244 let request_option = if let Some(request) = request_option {
1245 Some(request)
1246 } else {
1247 request_parts_cloned.clone().map(|request_parts_cloned| {
1248 Request::from_parts(request_parts_cloned, Empty::new().map_err(|e| match e {}).boxed())
1249 })
1250 };
1251 if let Some(request_cloned) = request_option {
1252 configuration = error_configuration;
1253 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1254 for module in &configuration.modules {
1255 module_handlers.push(module.get_module_handlers());
1256 }
1257 handlers_iter = Box::new(module_handlers.into_iter());
1258 if metrics_enabled {
1259 while let Some(mut executed_handler) = executed_handlers.pop() {
1260 executed_handler.metric_data_after_handler(&metrics_sender).await;
1261 }
1262 }
1263 executed_handlers = Vec::new();
1264 request = request_cloned;
1265 if let Some(request_data) = request.extensions_mut().get_mut::<RequestData>() {
1266 request_data.error_status_code = Some(status);
1267 }
1268 is_error_handler = true;
1269 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1270 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1271 log_json_props = configuration
1272 .entries
1273 .get("log_json")
1274 .and_then(|entries| entries.get_entry())
1275 .map(|entry| entry.props.clone());
1276 error_logger = if !configuration.observability.log_channels.is_empty() {
1277 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1278 } else {
1279 ErrorLogger::without_logger()
1280 };
1281 metrics_enabled = !configuration.observability.metric_channels.is_empty();
1282 metrics_sender = if metrics_enabled {
1283 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1284 } else {
1285 MetricsMultiSender::without_sender()
1286 };
1287 traces_enabled = !configuration.observability.trace_channels.is_empty();
1288 traces_senders = if traces_enabled {
1289 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1290 for channel in &configuration.observability.trace_channels {
1291 channel.0.send(()).await.unwrap_or_default();
1292 if let Ok(channel2) = channel.1.recv().await {
1293 traces_senders.push(channel2);
1294 }
1295 }
1296 traces_senders
1297 } else {
1298 vec![]
1299 };
1300 continue;
1301 }
1302 }
1303 }
1304 let response = generate_error_response(status, &configuration, &headers).await;
1305 return finalize_with_modifying_handlers(
1306 response,
1307 executed_handlers,
1308 &configuration,
1309 http3_alt_port,
1310 headers_to_add,
1311 headers_to_replace,
1312 headers_to_remove,
1313 &log_request_parts,
1314 &socket_data,
1315 latest_auth_data.as_deref(),
1316 log_date_format,
1317 log_format,
1318 log_json_props.as_ref(),
1319 metrics_sender,
1320 metrics_enabled,
1321 traces_senders,
1322 traces_enabled,
1323 timeout_instant,
1324 timeout_duration,
1325 )
1326 .await;
1327 }
1328 None => match request_option {
1329 Some(request_obtained) => {
1330 request = request_obtained;
1331 continue;
1332 }
1333 None => {
1334 break;
1335 }
1336 },
1337 },
1338 }
1339 }
1340 Err(err) => {
1341 let response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, &configuration, &None).await;
1342 let err_string = err.to_string();
1343
1344 if !err_string.is_empty() {
1345 for logger in &configuration.observability.log_channels {
1346 logger
1347 .send(LogMessage::new(
1348 format!("Unexpected error while serving a request: {err}"),
1349 true,
1350 ))
1351 .await
1352 .unwrap_or_default();
1353 }
1354 }
1355
1356 let response_result = finalize_with_modifying_handlers(
1357 response,
1358 executed_handlers,
1359 &configuration,
1360 http3_alt_port,
1361 headers_to_add,
1362 headers_to_replace,
1363 headers_to_remove,
1364 &log_request_parts,
1365 &socket_data,
1366 latest_auth_data.as_deref(),
1367 log_date_format,
1368 log_format,
1369 log_json_props.as_ref(),
1370 metrics_sender,
1371 metrics_enabled,
1372 traces_senders,
1373 traces_enabled,
1374 timeout_instant,
1375 timeout_duration,
1376 )
1377 .await;
1378
1379 if err_string.is_empty() {
1380 return Err(anyhow::anyhow!("HTTP request aborted"));
1382 }
1383
1384 return response_result;
1385 }
1386 }
1387 }
1388
1389 let response = generate_error_response(StatusCode::NOT_FOUND, &configuration, &None).await;
1390
1391 finalize_with_modifying_handlers(
1392 response,
1393 executed_handlers,
1394 &configuration,
1395 http3_alt_port,
1396 headers_to_add,
1397 headers_to_replace,
1398 headers_to_remove,
1399 &log_request_parts,
1400 &socket_data,
1401 latest_auth_data.as_deref(),
1402 log_date_format,
1403 log_format,
1404 log_json_props.as_ref(),
1405 metrics_sender,
1406 metrics_enabled,
1407 traces_senders,
1408 traces_enabled,
1409 timeout_instant,
1410 timeout_duration,
1411 )
1412 .await
1413}