ferron/
request_handler.rs

1use std::net::SocketAddr;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_channel::Sender;
7use chrono::{DateTime, Local};
8use ferron_common::logging::{ErrorLogger, LogMessage};
9use ferron_common::observability::{MetricsMultiSender, TraceSignal};
10use futures_util::stream::TryStreamExt;
11use http_body_util::combinators::BoxBody;
12use http_body_util::{BodyExt, Empty, Full, StreamBody};
13use hyper::body::{Body, Bytes, Frame};
14use hyper::header::{HeaderName, HeaderValue};
15use hyper::{header, HeaderMap, Method, Request, Response, StatusCode};
16#[cfg(feature = "runtime-tokio")]
17use tokio::io::BufReader;
18#[cfg(feature = "runtime-tokio")]
19use tokio_util::io::ReaderStream;
20
21use crate::config::{ServerConfiguration, ServerConfigurations};
22use crate::get_value;
23use crate::runtime::timeout;
24#[cfg(feature = "runtime-monoio")]
25use crate::util::MonoioFileStreamNoSpawn;
26use crate::util::{
27  generate_default_error_page, replace_header_placeholders, replace_log_placeholders, sanitize_url, SERVER_SOFTWARE,
28};
29
30use ferron_common::modules::{ModuleHandlers, RequestData, SocketData};
31use ferron_common::{get_entries, get_entry};
32
33/// Generates an error response
34async fn generate_error_response(
35  status_code: StatusCode,
36  config: &ServerConfiguration,
37  headers: &Option<HeaderMap>,
38) -> Response<BoxBody<Bytes, std::io::Error>> {
39  let bare_body = generate_default_error_page(
40    status_code,
41    get_value!("server_administrator_email", config).and_then(|v| v.as_str()),
42  );
43  let mut content_length: Option<u64> = bare_body.len().try_into().ok();
44  let mut response_body = Full::new(Bytes::from(bare_body)).map_err(|e| match e {}).boxed();
45
46  if let Some(error_pages) = get_entries!("error_page", config) {
47    for error_page in &error_pages.inner {
48      if let Some(page_status_code) = error_page.values.first().and_then(|v| v.as_i128()) {
49        if page_status_code
50          .try_into()
51          .ok()
52          .and_then(|s| StatusCode::from_u16(s).ok())
53          .is_none_or(|s| s != status_code)
54        {
55          continue;
56        }
57        if let Some(page_path) = error_page.values.get(1).and_then(|v| v.as_str()) {
58          #[cfg(feature = "runtime-monoio")]
59          let file = monoio::fs::File::open(page_path).await;
60          #[cfg(feature = "runtime-tokio")]
61          let file = tokio::fs::File::open(page_path).await;
62
63          let Ok(file) = file else {
64            continue;
65          };
66
67          // Monoio's `File` doesn't expose `metadata()` on Windows, so we have to spawn a blocking task to obtain the metadata on this platform
68          #[cfg(any(feature = "runtime-tokio", all(feature = "runtime-monoio", unix)))]
69          let metadata = file.metadata().await;
70          #[cfg(all(feature = "runtime-monoio", windows))]
71          let metadata = {
72            let page_path = page_path.to_owned();
73            monoio::spawn_blocking(move || std::fs::metadata(page_path))
74              .await
75              .unwrap_or(Err(std::io::Error::other(
76                "Can't spawn a blocking task to obtain the file metadata",
77              )))
78          };
79
80          content_length = metadata.ok().map(|m| m.len());
81
82          #[cfg(feature = "runtime-monoio")]
83          let file_stream = MonoioFileStreamNoSpawn::new(file, None, content_length);
84          #[cfg(feature = "runtime-tokio")]
85          let file_stream = ReaderStream::new(BufReader::with_capacity(12800, file));
86
87          let stream_body = StreamBody::new(file_stream.map_ok(Frame::data));
88          let boxed_body = stream_body.boxed();
89
90          response_body = boxed_body;
91
92          break;
93        }
94      }
95    }
96  }
97
98  let mut response_builder = Response::builder().status(status_code);
99
100  if let Some(headers) = headers {
101    let headers_iter = headers.iter();
102    for (name, value) in headers_iter {
103      if name != header::CONTENT_TYPE && name != header::CONTENT_LENGTH {
104        response_builder = response_builder.header(name, value);
105      }
106    }
107  }
108
109  if let Some(content_length) = content_length {
110    response_builder = response_builder.header(header::CONTENT_LENGTH, content_length);
111  }
112  response_builder = response_builder.header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"));
113
114  response_builder.body(response_body).unwrap_or_default()
115}
116
117/// Sends a log message formatted according to the Combined Log Format
118#[allow(clippy::too_many_arguments)]
119async fn log_access(
120  loggers: &[Sender<LogMessage>],
121  request_parts: &hyper::http::request::Parts,
122  socket_data: &SocketData,
123  auth_user: Option<&str>,
124  status_code: u16,
125  content_length: Option<u64>,
126  date_format: Option<&str>,
127  log_format: Option<&str>,
128) {
129  let now: DateTime<Local> = Local::now();
130  let formatted_time = now.format(date_format.unwrap_or("%d/%b/%Y:%H:%M:%S %z")).to_string();
131  let log_message_string = replace_log_placeholders(
132    log_format.unwrap_or(
133      "{client_ip} - {auth_user} [{timestamp}] \"{method} {path_and_query} {version}\" \
134       {status_code} {content_length} \"{header:Referer}\" \"{header:User-Agent}\"",
135    ),
136    request_parts,
137    socket_data,
138    auth_user,
139    &formatted_time,
140    status_code,
141    content_length,
142  );
143  for logger in loggers {
144    logger
145      .send(LogMessage::new(log_message_string.clone(), false))
146      .await
147      .unwrap_or_default();
148  }
149}
150
151/// Helper function to add custom headers to response
152#[inline]
153fn add_custom_headers(
154  response_parts: &mut hyper::http::response::Parts,
155  headers_to_add: &HeaderMap,
156  headers_to_replace: &HeaderMap,
157  headers_to_remove: &[HeaderName],
158) {
159  for (header_name, header_value) in headers_to_add {
160    if !response_parts.headers.contains_key(header_name) {
161      response_parts.headers.insert(header_name, header_value.to_owned());
162    }
163  }
164
165  for (header_name, header_value) in headers_to_replace {
166    response_parts.headers.insert(header_name, header_value.to_owned());
167  }
168
169  for header_to_remove in headers_to_remove.iter().rev() {
170    if response_parts.headers.contains_key(header_to_remove) {
171      while response_parts.headers.remove(header_to_remove).is_some() {}
172    }
173  }
174}
175
176/// Helper function to add HTTP/3 Alt-Svc header
177#[inline]
178fn add_http3_alt_svc_header(response_parts: &mut hyper::http::response::Parts, http3_alt_port: Option<u16>) {
179  if let Some(http3_alt_port) = http3_alt_port {
180    if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
181      Some(value) => {
182        let header_value_old = String::from_utf8_lossy(value.as_bytes());
183        let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
184
185        if header_value_old != header_value_new {
186          HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
187        } else {
188          HeaderValue::from_bytes(header_value_old.as_bytes())
189        }
190      }
191      None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
192    } {
193      response_parts.headers.insert(header::ALT_SVC, header_value);
194    }
195  }
196}
197
198/// Helper function to add server header
199#[inline]
200fn add_server_header(response_parts: &mut hyper::http::response::Parts) {
201  response_parts
202    .headers
203    .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
204}
205
206/// Helper function to extract content length for logging
207fn extract_content_length(response: &Response<BoxBody<Bytes, std::io::Error>>) -> Option<u64> {
208  response
209    .headers()
210    .get(header::CONTENT_LENGTH)
211    .and_then(|header_value| {
212      header_value.to_str().ok().and_then(|header_value| {
213        header_value
214          .parse::<u64>()
215          .ok()
216          .or_else(|| response.body().size_hint().exact())
217      })
218    })
219    .or_else(|| response.body().size_hint().exact())
220}
221
222/// Helper function to build a basic HTML error response without configuration context
223fn basic_error_response(status_code: StatusCode) -> Response<BoxBody<Bytes, std::io::Error>> {
224  Response::builder()
225    .status(status_code)
226    .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
227    .body(
228      Full::new(Bytes::from(generate_default_error_page(status_code, None)))
229        .map_err(|e| match e {})
230        .boxed(),
231    )
232    .unwrap_or_default()
233}
234
235/// Helper function to build custom header sets for responses
236fn build_custom_headers(
237  configuration: &ServerConfiguration,
238  request_parts: &hyper::http::request::Parts,
239) -> (HeaderMap, HeaderMap, Vec<HeaderName>) {
240  let mut headers_to_add = HeaderMap::new();
241  let mut headers_to_replace = HeaderMap::new();
242  let mut headers_to_remove = Vec::new();
243
244  if let Some(custom_headers) = get_entries!("header", configuration) {
245    for custom_header in custom_headers.inner.iter().rev() {
246      if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
247        if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
248          if !headers_to_add.contains_key(header_name) {
249            if let Ok(header_name) = HeaderName::from_str(header_name) {
250              if let Ok(header_value) =
251                HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
252              {
253                headers_to_add.insert(header_name, header_value);
254              }
255            }
256          }
257        }
258      }
259    }
260  }
261
262  if let Some(custom_headers) = get_entries!("header_replace", configuration) {
263    for custom_header in custom_headers.inner.iter().rev() {
264      if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
265        if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
266          if let Ok(header_name) = HeaderName::from_str(header_name) {
267            if let Ok(header_value) =
268              HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
269            {
270              headers_to_replace.insert(header_name, header_value);
271            }
272          }
273        }
274      }
275    }
276  }
277
278  if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
279    for custom_header in custom_headers_to_remove.inner.iter().rev() {
280      if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
281        if let Ok(header_name) = HeaderName::from_str(header_name) {
282          headers_to_remove.push(header_name);
283        }
284      }
285    }
286  }
287
288  (headers_to_add, headers_to_replace, headers_to_remove)
289}
290
291/// Helper function to apply all response headers and log if needed
292#[allow(clippy::too_many_arguments)]
293async fn finalize_response_and_log(
294  response: Response<BoxBody<Bytes, std::io::Error>>,
295  http3_alt_port: Option<u16>,
296  headers_to_add: HeaderMap,
297  headers_to_replace: HeaderMap,
298  headers_to_remove: Vec<HeaderName>,
299  loggers: &[Sender<LogMessage>],
300  request_parts: &Option<hyper::http::request::Parts>,
301  socket_data: &SocketData,
302  latest_auth_data: Option<&str>,
303  date_format: Option<&str>,
304  log_format: Option<&str>,
305) -> Response<BoxBody<Bytes, std::io::Error>> {
306  let (mut response_parts, response_body) = response.into_parts();
307
308  add_custom_headers(
309    &mut response_parts,
310    &headers_to_add,
311    &headers_to_replace,
312    &headers_to_remove,
313  );
314  add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
315  add_server_header(&mut response_parts);
316
317  let response = Response::from_parts(response_parts, response_body);
318
319  if let Some(request_parts) = request_parts {
320    if !loggers.is_empty() {
321      log_access(
322        loggers,
323        request_parts,
324        socket_data,
325        latest_auth_data,
326        response.status().as_u16(),
327        extract_content_length(&response),
328        date_format,
329        log_format,
330      )
331      .await;
332    }
333  }
334
335  response
336}
337
338/// Helper function to finalize a basic error response with logging and default headers
339#[allow(clippy::too_many_arguments)]
340async fn finalize_basic_error_response(
341  response: Response<BoxBody<Bytes, std::io::Error>>,
342  request_parts: hyper::http::request::Parts,
343  http3_alt_port: Option<u16>,
344  loggers: &[Sender<LogMessage>],
345  socket_data: &SocketData,
346  date_format: Option<&str>,
347  log_format: Option<&str>,
348) -> Response<BoxBody<Bytes, std::io::Error>> {
349  let request_parts = Some(request_parts);
350  finalize_response_and_log(
351    response,
352    http3_alt_port,
353    HeaderMap::new(),
354    HeaderMap::new(),
355    Vec::new(),
356    loggers,
357    &request_parts,
358    socket_data,
359    None,
360    date_format,
361    log_format,
362  )
363  .await
364}
365
366/// Helper function to execute response modifying handlers
367#[allow(clippy::too_many_arguments)]
368#[inline]
369async fn execute_response_modifying_handlers(
370  mut response: Response<BoxBody<Bytes, std::io::Error>>,
371  mut executed_handlers: Vec<Box<dyn ModuleHandlers>>,
372  configuration: &ServerConfiguration,
373  http3_alt_port: Option<u16>,
374  headers_to_add: HeaderMap,
375  headers_to_replace: HeaderMap,
376  headers_to_remove: Vec<HeaderName>,
377  loggers: &[Sender<LogMessage>],
378  request_parts: &Option<hyper::http::request::Parts>,
379  socket_data: &SocketData,
380  latest_auth_data: Option<&str>,
381  date_format: Option<&str>,
382  log_format: Option<&str>,
383  metrics_sender: MetricsMultiSender,
384  metrics_enabled: bool,
385  traces_senders: Vec<Sender<TraceSignal>>,
386  traces_enabled: bool,
387  timeout_instant: std::time::Instant,
388  timeout_duration: Option<std::time::Duration>,
389) -> Result<Result<Response<BoxBody<Bytes, std::io::Error>>, Response<BoxBody<Bytes, std::io::Error>>>, anyhow::Error> {
390  while let Some(mut executed_handler) = executed_handlers.pop() {
391    if traces_enabled {
392      for trace_sender in &traces_senders {
393        trace_sender
394          .send(TraceSignal::StartSpan(format!(
395            "{}::response_modifying_handler",
396            executed_handler.get_name()
397          )))
398          .await
399          .unwrap_or_default();
400      }
401    }
402    let (response_status, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
403      let elapsed = timeout_instant.elapsed();
404      if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
405        match timeout(
406          timeout_cur_duration,
407          executed_handler.response_modifying_handler(response),
408        )
409        .await
410        {
411          Ok(result) => (result, false),
412          Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
413        }
414      } else {
415        (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
416      }
417    } else {
418      (executed_handler.response_modifying_handler(response).await, false)
419    };
420    if traces_enabled {
421      for trace_sender in &traces_senders {
422        trace_sender
423          .send(TraceSignal::EndSpan(
424            format!("{}::response_modifying_handler", executed_handler.get_name()),
425            response_status.as_ref().err().map(|e| e.to_string()),
426          ))
427          .await
428          .unwrap_or_default();
429      }
430    }
431    if is_timeout {
432      if metrics_enabled {
433        while let Some(mut executed_handler) = executed_handlers.pop() {
434          executed_handler.metric_data_after_handler(&metrics_sender).await;
435        }
436      }
437      Err(anyhow::anyhow!("The client or server has timed out"))?;
438    }
439    response = match response_status {
440      Ok(response) => response,
441      Err(err) => {
442        for logger in loggers {
443          logger
444            .send(LogMessage::new(
445              format!("Unexpected error while serving a request: {err}"),
446              true,
447            ))
448            .await
449            .unwrap_or_default();
450        }
451
452        let error_response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, configuration, &None).await;
453
454        let final_response = finalize_response_and_log(
455          error_response,
456          http3_alt_port,
457          headers_to_add,
458          headers_to_replace,
459          headers_to_remove,
460          loggers,
461          request_parts,
462          socket_data,
463          latest_auth_data,
464          date_format,
465          log_format,
466        )
467        .await;
468
469        if metrics_enabled {
470          while let Some(mut executed_handler) = executed_handlers.pop() {
471            executed_handler.metric_data_after_handler(&metrics_sender).await;
472          }
473        }
474
475        return Ok(Err(final_response));
476      }
477    };
478    if metrics_enabled {
479      executed_handler.metric_data_after_handler(&metrics_sender).await;
480    }
481  }
482  Ok(Ok(response))
483}
484
485/// Helper function to apply response headers, execute response modifiers, and log access if needed
486#[allow(clippy::too_many_arguments)]
487async fn finalize_with_modifying_handlers(
488  response: Response<BoxBody<Bytes, std::io::Error>>,
489  executed_handlers: Vec<Box<dyn ModuleHandlers>>,
490  configuration: &ServerConfiguration,
491  http3_alt_port: Option<u16>,
492  headers_to_add: HeaderMap,
493  headers_to_replace: HeaderMap,
494  headers_to_remove: Vec<HeaderName>,
495  log_request_parts: &Option<hyper::http::request::Parts>,
496  socket_data: &SocketData,
497  latest_auth_data: Option<&str>,
498  log_date_format: Option<&str>,
499  log_format: Option<&str>,
500  metrics_sender: MetricsMultiSender,
501  metrics_enabled: bool,
502  traces_senders: Vec<Sender<TraceSignal>>,
503  traces_enabled: bool,
504  timeout_instant: std::time::Instant,
505  timeout_duration: Option<std::time::Duration>,
506) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
507  let (mut response_parts, response_body) = response.into_parts();
508  add_custom_headers(
509    &mut response_parts,
510    &headers_to_add,
511    &headers_to_replace,
512    &headers_to_remove,
513  );
514  add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
515  add_server_header(&mut response_parts);
516
517  let response = Response::from_parts(response_parts, response_body);
518
519  match execute_response_modifying_handlers(
520    response,
521    executed_handlers,
522    configuration,
523    http3_alt_port,
524    headers_to_add,
525    headers_to_replace,
526    headers_to_remove,
527    &configuration.observability.log_channels,
528    log_request_parts,
529    socket_data,
530    latest_auth_data,
531    log_date_format,
532    log_format,
533    metrics_sender,
534    metrics_enabled,
535    traces_senders,
536    traces_enabled,
537    timeout_instant,
538    timeout_duration,
539  )
540  .await?
541  {
542    Ok(response) => {
543      if let Some(request_parts) = log_request_parts.as_ref() {
544        log_access(
545          &configuration.observability.log_channels,
546          request_parts,
547          socket_data,
548          latest_auth_data,
549          response.status().as_u16(),
550          extract_content_length(&response),
551          log_date_format,
552          log_format,
553        )
554        .await;
555      }
556      Ok(response)
557    }
558    Err(error_response) => Ok(error_response),
559  }
560}
561
562#[allow(clippy::type_complexity)]
563#[allow(clippy::result_large_err)]
564fn sanitize_url_request(
565  configuration: &ServerConfiguration,
566  mut request: Request<BoxBody<Bytes, std::io::Error>>,
567) -> Result<
568  (Request<BoxBody<Bytes, std::io::Error>>, bool),
569  (
570    Request<BoxBody<Bytes, std::io::Error>>,
571    Box<dyn std::error::Error + Send + Sync>,
572  ),
573> {
574  let url_pathname = request.uri().path();
575  let sanitized_url_pathname = match sanitize_url(
576    url_pathname,
577    get_value!("allow_double_slashes", configuration)
578      .and_then(|v| v.as_bool())
579      .unwrap_or(false),
580  ) {
581    Ok(sanitized_url_pathname) => sanitized_url_pathname,
582    Err(err) => {
583      return Err((request, err.into()));
584    }
585  };
586
587  if sanitized_url_pathname != url_pathname {
588    let (mut parts, body) = request.into_parts();
589    let orig_uri = parts.uri.clone();
590    let mut url_parts = parts.uri.into_parts();
591    url_parts.path_and_query = Some(
592      match format!(
593        "{}{}",
594        sanitized_url_pathname,
595        url_parts
596          .path_and_query
597          .as_ref()
598          .and_then(|pq| pq.query())
599          .map_or("".to_string(), |q| format!("?{q}"))
600      )
601      .parse()
602      {
603        Ok(path_and_query) => path_and_query,
604        Err(e) => {
605          parts.uri = orig_uri;
606          request = Request::from_parts(parts, body);
607          return Err((request, e.into()));
608        }
609      },
610    );
611    parts.uri = match hyper::Uri::from_parts(url_parts) {
612      Ok(uri) => uri,
613      Err(e) => {
614        parts.uri = orig_uri;
615        request = Request::from_parts(parts, body);
616        return Err((request, e.into()));
617      }
618    };
619    request = Request::from_parts(parts, body);
620    return Ok((request, true));
621  }
622  Ok((request, false))
623}
624
625/// The HTTP request handler, with timeout
626#[allow(clippy::too_many_arguments)]
627pub async fn request_handler(
628  mut request: Request<BoxBody<Bytes, std::io::Error>>,
629  client_address: SocketAddr,
630  server_address: SocketAddr,
631  encrypted: bool,
632  configurations: Arc<ServerConfigurations>,
633  http3_alt_port: Option<u16>,
634  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
635  proxy_protocol_client_address: Option<SocketAddr>,
636  proxy_protocol_server_address: Option<SocketAddr>,
637) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
638  // Global configuration
639  let global_configuration = configurations.find_global_configuration();
640  let global_loggers: &[Sender<LogMessage>] = global_configuration
641    .as_ref()
642    .map_or(&[], |c| &*c.observability.log_channels);
643  let global_log_date_format = global_configuration
644    .as_deref()
645    .and_then(|c| get_value!("log_date_format", c))
646    .and_then(|v| v.as_str());
647  let global_log_format = global_configuration
648    .as_deref()
649    .and_then(|c| get_value!("log_format", c))
650    .and_then(|v| v.as_str());
651
652  // Request timeout
653  let timeout_from_config = global_configuration
654    .as_deref()
655    .and_then(|c| get_entry!("timeout", c))
656    .and_then(|e| e.values.last());
657  let timeout_duration = if timeout_from_config.is_some_and(|v| v.is_null()) {
658    None
659  } else {
660    let timeout_millis = timeout_from_config.and_then(|v| v.as_i128()).unwrap_or(300000) as u64;
661    Some(Duration::from_millis(timeout_millis))
662  };
663  let timeout_instant = std::time::Instant::now();
664
665  // Normalize HTTP/2 and HTTP/3 request objects
666  if matches!(request.version(), hyper::Version::HTTP_2 | hyper::Version::HTTP_3) {
667    // Set "Host" request header for HTTP/2 and HTTP/3 connections
668    if let Some(authority) = request.uri().authority() {
669      let authority = authority.to_owned();
670      let headers = request.headers_mut();
671      if !headers.contains_key(header::HOST) {
672        if let Ok(authority_value) = HeaderValue::from_bytes(authority.as_str().as_bytes()) {
673          headers.append(header::HOST, authority_value);
674        }
675      }
676    }
677
678    // Normalize the Cookie header for HTTP/2 and HTTP/3
679    let mut cookie_normalized = String::new();
680    let mut cookie_set = false;
681    let headers = request.headers_mut();
682    for cookie in headers.get_all(header::COOKIE) {
683      if let Ok(cookie) = cookie.to_str() {
684        if cookie_set {
685          cookie_normalized.push_str("; ");
686        }
687        cookie_set = true;
688        cookie_normalized.push_str(cookie);
689      }
690    }
691    if cookie_set {
692      if let Ok(cookie_value) = HeaderValue::from_bytes(cookie_normalized.as_bytes()) {
693        headers.insert(header::COOKIE, cookie_value);
694      }
695    }
696  }
697
698  // Construct socket data
699  let mut socket_data = SocketData {
700    remote_addr: proxy_protocol_client_address.unwrap_or(client_address),
701    local_addr: proxy_protocol_server_address.unwrap_or(server_address),
702    encrypted,
703  };
704
705  // Sanitize "Host" header
706  let host_header_option = request.headers().get(header::HOST);
707  if let Some(header_data) = host_header_option {
708    match header_data.to_str() {
709      Ok(host_header) => {
710        let host_header_lower_case = host_header.to_lowercase();
711        let host_header_without_dot = host_header_lower_case
712          .strip_suffix('.')
713          .unwrap_or(host_header_lower_case.as_str());
714        if host_header_without_dot != host_header {
715          let host_header_value = match HeaderValue::from_str(host_header_without_dot) {
716            Ok(host_header_value) => host_header_value,
717            Err(err) => {
718              for logger in global_loggers {
719                logger
720                  .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
721                  .await
722                  .unwrap_or_default();
723              }
724              let response = basic_error_response(StatusCode::BAD_REQUEST);
725              let (request_parts, _) = request.into_parts();
726              return Ok(
727                finalize_basic_error_response(
728                  response,
729                  request_parts,
730                  http3_alt_port,
731                  global_loggers,
732                  &socket_data,
733                  global_log_date_format,
734                  global_log_format,
735                )
736                .await,
737              );
738            }
739          };
740
741          request.headers_mut().insert(header::HOST, host_header_value);
742        }
743      }
744      Err(err) => {
745        for logger in global_loggers {
746          logger
747            .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
748            .await
749            .unwrap_or_default();
750        }
751        let response = basic_error_response(StatusCode::BAD_REQUEST);
752        let (request_parts, _) = request.into_parts();
753        return Ok(
754          finalize_basic_error_response(
755            response,
756            request_parts,
757            http3_alt_port,
758            global_loggers,
759            &socket_data,
760            global_log_date_format,
761            global_log_format,
762          )
763          .await,
764        );
765      }
766    }
767  };
768
769  let hostname_determinant = request.headers().get(header::HOST).and_then(|value| {
770    value.to_str().ok().map(|h| {
771      h.rsplit_once(':')
772        .and_then(|(left, right)| {
773          if right.parse::<u16>().is_ok() {
774            Some(left.to_string())
775          } else {
776            None
777          }
778        })
779        .unwrap_or_else(|| h.to_string())
780    })
781  });
782
783  let (request_parts, request_body) = request.into_parts();
784  let mut log_request_parts = if global_configuration
785    .as_ref()
786    .is_some_and(|c| !c.observability.log_channels.is_empty())
787  {
788    Some(request_parts.clone())
789  } else {
790    None
791  };
792  let request = Request::from_parts(request_parts, request_body);
793
794  // Find the server configuration
795  let (request_parts, request_body) = request.into_parts();
796  let configuration_option =
797    configurations.find_configuration(&request_parts, hostname_determinant.as_deref(), &socket_data);
798  let mut request = Request::from_parts(request_parts, request_body);
799  let mut configuration_error_handler_lookup = match configuration_option
800    .and_then(|c| c.ok_or_else(|| anyhow::anyhow!("No matching configuration found").into_boxed_dyn_error()))
801  {
802    Ok(configuration) => configuration,
803    Err(err) => {
804      for logger in global_loggers {
805        logger
806          .send(LogMessage::new(
807            format!("Cannot determine server configuration: {err}"),
808            true,
809          ))
810          .await
811          .unwrap_or_default()
812      }
813      let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
814      let (request_parts, _) = request.into_parts();
815      return Ok(
816        finalize_basic_error_response(
817          response,
818          request_parts,
819          http3_alt_port,
820          global_loggers,
821          &socket_data,
822          global_log_date_format,
823          global_log_format,
824        )
825        .await,
826      );
827    }
828  };
829  let mut configuration = match configuration_error_handler_lookup.get_default().cloned() {
830    Some(configuration) => configuration,
831    None => {
832      for logger in global_loggers {
833        logger
834          .send(LogMessage::new(
835            "Cannot determine server configuration: No matching configuration found".to_string(),
836            true,
837          ))
838          .await
839          .unwrap_or_default()
840      }
841      let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
842      let (request_parts, _) = request.into_parts();
843      return Ok(
844        finalize_basic_error_response(
845          response,
846          request_parts,
847          http3_alt_port,
848          global_loggers,
849          &socket_data,
850          global_log_date_format,
851          global_log_format,
852        )
853        .await,
854      );
855    }
856  };
857
858  // Determine the log formats
859  let mut log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
860  let mut log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
861
862  // Clone the log request parts if logging is enabled and request parts are not already cloned
863  if !configuration.observability.log_channels.is_empty() && log_request_parts.is_none() {
864    let (request_parts, request_body) = request.into_parts();
865    log_request_parts = Some(request_parts.clone());
866    request = Request::from_parts(request_parts, request_body);
867  }
868
869  // Sanitize the URL, if the URL sanitizer is enabled
870  if !get_value!("disable_url_sanitizer", configuration)
871    .and_then(|v| v.as_bool())
872    .unwrap_or(false)
873  {
874    request = match sanitize_url_request(&configuration, request) {
875      Ok((mut request, was_dirty)) => {
876        if was_dirty {
877          let (parts, body) = request.into_parts();
878          let configuration_option =
879            configurations.find_configuration(&parts, hostname_determinant.as_deref(), &socket_data);
880          request = Request::from_parts(parts, body);
881          match configuration_option {
882            Ok(Some(new_configuration)) => {
883              if let Some(new_config2) = new_configuration.get_default().cloned() {
884                configuration_error_handler_lookup = new_configuration;
885                configuration = new_config2;
886                log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
887                log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
888              }
889            }
890            Ok(None) => {}
891            Err(err) => {
892              for logger in &configuration.observability.log_channels {
893                logger
894                  .send(LogMessage::new(
895                    format!("Cannot determine server configuration: {err}"),
896                    true,
897                  ))
898                  .await
899                  .unwrap_or_default();
900              }
901              let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
902
903              let (request_parts, _) = request.into_parts();
904              let (headers_to_add, headers_to_replace, headers_to_remove) =
905                build_custom_headers(&configuration, &request_parts);
906
907              return Ok(
908                finalize_response_and_log(
909                  response,
910                  http3_alt_port,
911                  headers_to_add,
912                  headers_to_replace,
913                  headers_to_remove,
914                  &configuration.observability.log_channels,
915                  &log_request_parts,
916                  &socket_data,
917                  None,
918                  log_date_format,
919                  log_format,
920                )
921                .await,
922              );
923            }
924          }
925        }
926        request
927      }
928      Err((request, error)) => {
929        for logger in &configuration.observability.log_channels {
930          logger
931            .send(LogMessage::new(format!("URL sanitation error: {error}"), true))
932            .await
933            .unwrap_or_default();
934        }
935
936        let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
937
938        let (parts, _) = request.into_parts();
939        let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &parts);
940
941        return Ok(
942          finalize_response_and_log(
943            response,
944            http3_alt_port,
945            headers_to_add,
946            headers_to_replace,
947            headers_to_remove,
948            &configuration.observability.log_channels,
949            &log_request_parts,
950            &socket_data,
951            None,
952            log_date_format,
953            log_format,
954          )
955          .await,
956        );
957      }
958    };
959  }
960
961  let (request_parts, request_body) = request.into_parts();
962  let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &request_parts);
963  let mut request = Request::from_parts(request_parts, request_body);
964
965  if request.uri().path() == "*" {
966    let response = match request.method() {
967      &Method::OPTIONS => Response::builder()
968        .status(StatusCode::NO_CONTENT)
969        .header(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"))
970        .body(Empty::new().map_err(|e| match e {}).boxed())
971        .unwrap_or_default(),
972      _ => {
973        let mut header_map = HeaderMap::new();
974        header_map.insert(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"));
975        generate_error_response(StatusCode::BAD_REQUEST, &configuration, &Some(header_map)).await
976      }
977    };
978    return Ok(
979      finalize_response_and_log(
980        response,
981        http3_alt_port,
982        headers_to_add,
983        headers_to_replace,
984        headers_to_remove,
985        &configuration.observability.log_channels,
986        &log_request_parts,
987        &socket_data,
988        None,
989        log_date_format,
990        log_format,
991      )
992      .await,
993    );
994  }
995
996  // HTTP-01 ACME challenge for automatic TLS
997  let acme_http_01_resolvers_inner = acme_http_01_resolvers.read().await;
998  if !acme_http_01_resolvers_inner.is_empty() {
999    if let Some(challenge_token) = request.uri().path().strip_prefix("/.well-known/acme-challenge/") {
1000      for acme_http01_resolver in &*acme_http_01_resolvers_inner {
1001        if let Some(http01_acme_data) = &*acme_http01_resolver.read().await {
1002          let acme_response = http01_acme_data.1.clone();
1003          if challenge_token == http01_acme_data.0 {
1004            let response = Response::builder()
1005              .status(StatusCode::OK)
1006              .header(
1007                header::CONTENT_TYPE,
1008                HeaderValue::from_static("application/octet-stream"),
1009              )
1010              .body(Full::new(Bytes::from(acme_response)).map_err(|e| match e {}).boxed())
1011              .unwrap_or_default();
1012
1013            return Ok(
1014              finalize_response_and_log(
1015                response,
1016                http3_alt_port,
1017                headers_to_add,
1018                headers_to_replace,
1019                headers_to_remove,
1020                &configuration.observability.log_channels,
1021                &log_request_parts,
1022                &socket_data,
1023                None,
1024                log_date_format,
1025                log_format,
1026              )
1027              .await,
1028            );
1029          }
1030        }
1031      }
1032    }
1033  };
1034  drop(acme_http_01_resolvers_inner);
1035
1036  let mut error_logger = if !configuration.observability.log_channels.is_empty() {
1037    ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1038  } else {
1039    ErrorLogger::without_logger()
1040  };
1041  let mut metrics_enabled = !configuration.observability.metric_channels.is_empty();
1042  let mut metrics_sender = if metrics_enabled {
1043    MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1044  } else {
1045    MetricsMultiSender::without_sender()
1046  };
1047  let mut traces_enabled = !configuration.observability.trace_channels.is_empty();
1048  let mut traces_senders = if traces_enabled {
1049    let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1050    for channel in &configuration.observability.trace_channels {
1051      channel.0.send(()).await.unwrap_or_default();
1052      if let Ok(channel2) = channel.1.recv().await {
1053        traces_senders.push(channel2);
1054      }
1055    }
1056    traces_senders
1057  } else {
1058    vec![]
1059  };
1060
1061  // Obtain module handlers
1062  let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1063  for module in &configuration.modules {
1064    module_handlers.push(module.get_module_handlers());
1065  }
1066
1067  // Execute modules!
1068  request.extensions_mut().insert(RequestData {
1069    auth_user: None,
1070    original_url: None,
1071    error_status_code: None,
1072  });
1073  let mut executed_handlers = Vec::new();
1074  let (request_parts, request_body) = request.into_parts();
1075  let request_parts_cloned = if configuration_error_handler_lookup.has_status_codes() {
1076    let mut request_parts_cloned = request_parts.clone();
1077    request_parts_cloned
1078      .headers
1079      .insert(header::CONTENT_LENGTH, HeaderValue::from_static("0"));
1080    Some(request_parts_cloned)
1081  } else {
1082    // If the error configuration is not specified, don't clone the request parts to improve performance
1083    None
1084  };
1085  let mut request = Request::from_parts(request_parts, request_body);
1086  let mut latest_auth_data = None;
1087  let mut is_error_handler = false;
1088  let mut handlers_iter: Box<dyn Iterator<Item = Box<dyn ModuleHandlers>>> = Box::new(module_handlers.into_iter());
1089  while let Some(mut handlers) = handlers_iter.next() {
1090    if metrics_enabled {
1091      handlers
1092        .metric_data_before_handler(&request, &socket_data, &metrics_sender)
1093        .await;
1094    }
1095
1096    if traces_enabled {
1097      for trace_sender in &traces_senders {
1098        trace_sender
1099          .send(TraceSignal::StartSpan(format!(
1100            "{}::request_handler",
1101            handlers.get_name()
1102          )))
1103          .await
1104          .unwrap_or_default();
1105      }
1106    }
1107
1108    let (response_result, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
1109      let elapsed = timeout_instant.elapsed();
1110      if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
1111        match timeout(
1112          timeout_cur_duration,
1113          handlers.request_handler(request, &configuration, &socket_data, &error_logger),
1114        )
1115        .await
1116        {
1117          Ok(result) => (result, false),
1118          Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
1119        }
1120      } else {
1121        (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
1122      }
1123    } else {
1124      (
1125        handlers
1126          .request_handler(request, &configuration, &socket_data, &error_logger)
1127          .await,
1128        false,
1129      )
1130    };
1131
1132    if traces_enabled {
1133      for trace_sender in &traces_senders {
1134        trace_sender
1135          .send(TraceSignal::EndSpan(
1136            format!("{}::request_handler", handlers.get_name()),
1137            response_result.as_ref().err().map(|e| e.to_string()),
1138          ))
1139          .await
1140          .unwrap_or_default();
1141      }
1142    }
1143
1144    executed_handlers.push(handlers);
1145
1146    if is_timeout {
1147      if metrics_enabled {
1148        while let Some(mut executed_handler) = executed_handlers.pop() {
1149          executed_handler.metric_data_after_handler(&metrics_sender).await;
1150        }
1151      }
1152      Err(anyhow::anyhow!("The client or server has timed out"))?;
1153    }
1154
1155    match response_result {
1156      Ok(response) => {
1157        let status = response.response_status;
1158        let headers = response.response_headers;
1159        let new_remote_address = response.new_remote_address;
1160        let request_option = response.request;
1161        let response = response.response;
1162        let request_extensions = request_option
1163          .as_ref()
1164          .and_then(|r| r.extensions().get::<RequestData>());
1165        if let Some(request_extensions) = request_extensions {
1166          latest_auth_data = request_extensions.auth_user.clone();
1167        }
1168        if let Some(new_remote_address) = new_remote_address {
1169          socket_data.remote_addr = new_remote_address;
1170        };
1171
1172        match response {
1173          Some(response) => {
1174            return finalize_with_modifying_handlers(
1175              response,
1176              executed_handlers,
1177              &configuration,
1178              http3_alt_port,
1179              headers_to_add,
1180              headers_to_replace,
1181              headers_to_remove,
1182              &log_request_parts,
1183              &socket_data,
1184              latest_auth_data.as_deref(),
1185              log_date_format,
1186              log_format,
1187              metrics_sender,
1188              metrics_enabled,
1189              traces_senders,
1190              traces_enabled,
1191              timeout_instant,
1192              timeout_duration,
1193            )
1194            .await;
1195          }
1196          None => match status {
1197            Some(status) => {
1198              if !is_error_handler {
1199                if let Some(error_configuration) = configuration_error_handler_lookup.get(status.as_u16()).cloned() {
1200                  let request_option = if let Some(request) = request_option {
1201                    Some(request)
1202                  } else {
1203                    request_parts_cloned.clone().map(|request_parts_cloned| {
1204                      Request::from_parts(request_parts_cloned, Empty::new().map_err(|e| match e {}).boxed())
1205                    })
1206                  };
1207                  if let Some(request_cloned) = request_option {
1208                    configuration = error_configuration;
1209                    let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1210                    for module in &configuration.modules {
1211                      module_handlers.push(module.get_module_handlers());
1212                    }
1213                    handlers_iter = Box::new(module_handlers.into_iter());
1214                    if metrics_enabled {
1215                      while let Some(mut executed_handler) = executed_handlers.pop() {
1216                        executed_handler.metric_data_after_handler(&metrics_sender).await;
1217                      }
1218                    }
1219                    executed_handlers = Vec::new();
1220                    request = request_cloned;
1221                    if let Some(request_data) = request.extensions_mut().get_mut::<RequestData>() {
1222                      request_data.error_status_code = Some(status);
1223                    }
1224                    is_error_handler = true;
1225                    log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1226                    log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1227                    error_logger = if !configuration.observability.log_channels.is_empty() {
1228                      ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1229                    } else {
1230                      ErrorLogger::without_logger()
1231                    };
1232                    metrics_enabled = !configuration.observability.metric_channels.is_empty();
1233                    metrics_sender = if metrics_enabled {
1234                      MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1235                    } else {
1236                      MetricsMultiSender::without_sender()
1237                    };
1238                    traces_enabled = !configuration.observability.trace_channels.is_empty();
1239                    traces_senders = if traces_enabled {
1240                      let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1241                      for channel in &configuration.observability.trace_channels {
1242                        channel.0.send(()).await.unwrap_or_default();
1243                        if let Ok(channel2) = channel.1.recv().await {
1244                          traces_senders.push(channel2);
1245                        }
1246                      }
1247                      traces_senders
1248                    } else {
1249                      vec![]
1250                    };
1251                    continue;
1252                  }
1253                }
1254              }
1255              let response = generate_error_response(status, &configuration, &headers).await;
1256              return finalize_with_modifying_handlers(
1257                response,
1258                executed_handlers,
1259                &configuration,
1260                http3_alt_port,
1261                headers_to_add,
1262                headers_to_replace,
1263                headers_to_remove,
1264                &log_request_parts,
1265                &socket_data,
1266                latest_auth_data.as_deref(),
1267                log_date_format,
1268                log_format,
1269                metrics_sender,
1270                metrics_enabled,
1271                traces_senders,
1272                traces_enabled,
1273                timeout_instant,
1274                timeout_duration,
1275              )
1276              .await;
1277            }
1278            None => match request_option {
1279              Some(request_obtained) => {
1280                request = request_obtained;
1281                continue;
1282              }
1283              None => {
1284                break;
1285              }
1286            },
1287          },
1288        }
1289      }
1290      Err(err) => {
1291        let response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, &configuration, &None).await;
1292
1293        for logger in &configuration.observability.log_channels {
1294          logger
1295            .send(LogMessage::new(
1296              format!("Unexpected error while serving a request: {err}"),
1297              true,
1298            ))
1299            .await
1300            .unwrap_or_default();
1301        }
1302
1303        return finalize_with_modifying_handlers(
1304          response,
1305          executed_handlers,
1306          &configuration,
1307          http3_alt_port,
1308          headers_to_add,
1309          headers_to_replace,
1310          headers_to_remove,
1311          &log_request_parts,
1312          &socket_data,
1313          latest_auth_data.as_deref(),
1314          log_date_format,
1315          log_format,
1316          metrics_sender,
1317          metrics_enabled,
1318          traces_senders,
1319          traces_enabled,
1320          timeout_instant,
1321          timeout_duration,
1322        )
1323        .await;
1324      }
1325    }
1326  }
1327
1328  let response = generate_error_response(StatusCode::NOT_FOUND, &configuration, &None).await;
1329
1330  finalize_with_modifying_handlers(
1331    response,
1332    executed_handlers,
1333    &configuration,
1334    http3_alt_port,
1335    headers_to_add,
1336    headers_to_replace,
1337    headers_to_remove,
1338    &log_request_parts,
1339    &socket_data,
1340    latest_auth_data.as_deref(),
1341    log_date_format,
1342    log_format,
1343    metrics_sender,
1344    metrics_enabled,
1345    traces_senders,
1346    traces_enabled,
1347    timeout_instant,
1348    timeout_duration,
1349  )
1350  .await
1351}