ferron/
request_handler.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_channel::Sender;
8use chrono::{DateTime, Local};
9use ferron_common::logging::{ErrorLogger, LogMessage};
10use ferron_common::observability::{MetricsMultiSender, TraceSignal};
11#[cfg(feature = "runtime-vibeio")]
12use ferron_common::util::FileStream;
13use futures_util::stream::TryStreamExt;
14use http_body_util::combinators::BoxBody;
15use http_body_util::{BodyExt, Empty, Full, StreamBody};
16use hyper::body::{Body, Bytes, Frame};
17use hyper::header::{HeaderName, HeaderValue};
18use hyper::{header, HeaderMap, Method, Request, Response, StatusCode};
19#[cfg(feature = "runtime-tokio")]
20use tokio::io::BufReader;
21#[cfg(feature = "runtime-tokio")]
22use tokio_util::io::ReaderStream;
23
24use crate::config::{ServerConfiguration, ServerConfigurationValue, ServerConfigurations};
25use crate::get_value;
26use crate::runtime::timeout;
27#[cfg(feature = "runtime-monoio")]
28use crate::util::MonoioFileStreamNoSpawn;
29use crate::util::{
30  generate_access_log_message, generate_default_error_page, replace_header_placeholders, sanitize_url, SERVER_SOFTWARE,
31};
32
33use ferron_common::modules::{ModuleHandlers, RequestData, SocketData};
34use ferron_common::{get_entries, get_entry};
35
36/// Generates an error response
37async fn generate_error_response(
38  status_code: StatusCode,
39  config: &ServerConfiguration,
40  headers: &Option<HeaderMap>,
41) -> Response<BoxBody<Bytes, std::io::Error>> {
42  let bare_body = generate_default_error_page(
43    status_code,
44    get_value!("server_administrator_email", config).and_then(|v| v.as_str()),
45  );
46  let mut content_length: Option<u64> = bare_body.len().try_into().ok();
47  let mut response_body = Full::new(Bytes::from(bare_body)).map_err(|e| match e {}).boxed();
48
49  if let Some(error_pages) = get_entries!("error_page", config) {
50    for error_page in &error_pages.inner {
51      if let Some(page_status_code) = error_page.values.first().and_then(|v| v.as_i128()) {
52        if page_status_code
53          .try_into()
54          .ok()
55          .and_then(|s| StatusCode::from_u16(s).ok())
56          .is_none_or(|s| s != status_code)
57        {
58          continue;
59        }
60        if let Some(page_path) = error_page.values.get(1).and_then(|v| v.as_str()) {
61          #[cfg(feature = "runtime-monoio")]
62          let file = monoio::fs::File::open(page_path).await;
63          #[cfg(feature = "runtime-tokio")]
64          let file = tokio::fs::File::open(page_path).await;
65          #[cfg(feature = "runtime-vibeio")]
66          let file = vibeio::fs::File::open(page_path).await;
67
68          let Ok(file) = file else {
69            continue;
70          };
71
72          // Monoio's `File` doesn't expose `metadata()` on Windows, so we have to spawn a blocking task to obtain the metadata on this platform
73          #[cfg(any(
74            feature = "runtime-tokio",
75            feature = "runtime-vibeio",
76            all(feature = "runtime-monoio", unix)
77          ))]
78          let metadata = file.metadata().await;
79          #[cfg(all(feature = "runtime-monoio", windows))]
80          let metadata = {
81            let page_path = page_path.to_owned();
82            monoio::spawn_blocking(move || std::fs::metadata(page_path))
83              .await
84              .unwrap_or(Err(std::io::Error::other(
85                "Can't spawn a blocking task to obtain the file metadata",
86              )))
87          };
88
89          content_length = metadata.ok().map(|m| m.len());
90
91          #[cfg(feature = "runtime-monoio")]
92          let file_stream = MonoioFileStreamNoSpawn::new(file, None, content_length);
93          #[cfg(feature = "runtime-vibeio")]
94          let file_stream = FileStream::new(file, None, content_length);
95          #[cfg(feature = "runtime-tokio")]
96          let file_stream = ReaderStream::new(BufReader::with_capacity(12800, file));
97
98          let stream_body = StreamBody::new(file_stream.map_ok(Frame::data));
99          let boxed_body = stream_body.boxed();
100
101          response_body = boxed_body;
102
103          break;
104        }
105      }
106    }
107  }
108
109  let mut response_builder = Response::builder().status(status_code);
110
111  if let Some(headers) = headers {
112    let headers_iter = headers.iter();
113    for (name, value) in headers_iter {
114      if name != header::CONTENT_TYPE && name != header::CONTENT_LENGTH {
115        response_builder = response_builder.header(name, value);
116      }
117    }
118  }
119
120  if let Some(content_length) = content_length {
121    response_builder = response_builder.header(header::CONTENT_LENGTH, content_length);
122  }
123  response_builder = response_builder.header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"));
124
125  response_builder.body(response_body).unwrap_or_default()
126}
127
128/// Sends an access log message using either text or JSON formatting
129#[allow(clippy::too_many_arguments)]
130async fn log_access(
131  loggers: &[Sender<LogMessage>],
132  request_parts: &hyper::http::request::Parts,
133  socket_data: &SocketData,
134  auth_user: Option<&str>,
135  status_code: u16,
136  content_length: Option<u64>,
137  date_format: Option<&str>,
138  log_format: Option<&str>,
139  log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
140) {
141  let now: DateTime<Local> = Local::now();
142  let formatted_time = now.format(date_format.unwrap_or("%d/%b/%Y:%H:%M:%S %z")).to_string();
143  let log_message_string = generate_access_log_message(
144    request_parts,
145    socket_data,
146    auth_user,
147    &formatted_time,
148    status_code,
149    content_length,
150    log_format,
151    log_json_props,
152  );
153  for logger in loggers {
154    logger
155      .send(LogMessage::new(log_message_string.clone(), false))
156      .await
157      .unwrap_or_default();
158  }
159}
160
161/// Helper function to add custom headers to response
162#[inline]
163fn add_custom_headers(
164  response_parts: &mut hyper::http::response::Parts,
165  headers_to_add: &HeaderMap,
166  headers_to_replace: &HeaderMap,
167  headers_to_remove: &[HeaderName],
168) {
169  for (header_name, header_value) in headers_to_add {
170    if !response_parts.headers.contains_key(header_name) {
171      response_parts.headers.insert(header_name, header_value.to_owned());
172    }
173  }
174
175  for (header_name, header_value) in headers_to_replace {
176    response_parts.headers.insert(header_name, header_value.to_owned());
177  }
178
179  for header_to_remove in headers_to_remove.iter().rev() {
180    if response_parts.headers.contains_key(header_to_remove) {
181      while response_parts.headers.remove(header_to_remove).is_some() {}
182    }
183  }
184}
185
186/// Helper function to add HTTP/3 Alt-Svc header
187#[inline]
188fn add_http3_alt_svc_header(response_parts: &mut hyper::http::response::Parts, http3_alt_port: Option<u16>) {
189  if let Some(http3_alt_port) = http3_alt_port {
190    if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
191      Some(value) => {
192        let header_value_old = String::from_utf8_lossy(value.as_bytes());
193        let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
194
195        if header_value_old != header_value_new {
196          HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
197        } else {
198          HeaderValue::from_bytes(header_value_old.as_bytes())
199        }
200      }
201      None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
202    } {
203      response_parts.headers.insert(header::ALT_SVC, header_value);
204    }
205  }
206}
207
208/// Helper function to add server header
209#[inline]
210fn add_server_header(response_parts: &mut hyper::http::response::Parts) {
211  response_parts
212    .headers
213    .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
214}
215
216/// Helper function to extract content length for logging
217fn extract_content_length(response: &Response<BoxBody<Bytes, std::io::Error>>) -> Option<u64> {
218  response
219    .headers()
220    .get(header::CONTENT_LENGTH)
221    .and_then(|header_value| {
222      header_value.to_str().ok().and_then(|header_value| {
223        header_value
224          .parse::<u64>()
225          .ok()
226          .or_else(|| response.body().size_hint().exact())
227      })
228    })
229    .or_else(|| response.body().size_hint().exact())
230}
231
232/// Helper function to build a basic HTML error response without configuration context
233fn basic_error_response(status_code: StatusCode) -> Response<BoxBody<Bytes, std::io::Error>> {
234  Response::builder()
235    .status(status_code)
236    .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
237    .body(
238      Full::new(Bytes::from(generate_default_error_page(status_code, None)))
239        .map_err(|e| match e {})
240        .boxed(),
241    )
242    .unwrap_or_default()
243}
244
245/// Helper function to build custom header sets for responses
246fn build_custom_headers(
247  configuration: &ServerConfiguration,
248  request_parts: &hyper::http::request::Parts,
249) -> (HeaderMap, HeaderMap, Vec<HeaderName>) {
250  let mut headers_to_add = HeaderMap::new();
251  let mut headers_to_replace = HeaderMap::new();
252  let mut headers_to_remove = Vec::new();
253
254  if let Some(custom_headers) = get_entries!("header", configuration) {
255    for custom_header in custom_headers.inner.iter().rev() {
256      if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
257        if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
258          if !headers_to_add.contains_key(header_name) {
259            if let Ok(header_name) = HeaderName::from_str(header_name) {
260              if let Ok(header_value) =
261                HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
262              {
263                headers_to_add.insert(header_name, header_value);
264              }
265            }
266          }
267        }
268      }
269    }
270  }
271
272  if let Some(custom_headers) = get_entries!("header_replace", configuration) {
273    for custom_header in custom_headers.inner.iter().rev() {
274      if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
275        if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
276          if let Ok(header_name) = HeaderName::from_str(header_name) {
277            if let Ok(header_value) =
278              HeaderValue::from_str(&replace_header_placeholders(header_value, request_parts, None))
279            {
280              headers_to_replace.insert(header_name, header_value);
281            }
282          }
283        }
284      }
285    }
286  }
287
288  if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
289    for custom_header in custom_headers_to_remove.inner.iter().rev() {
290      if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
291        if let Ok(header_name) = HeaderName::from_str(header_name) {
292          headers_to_remove.push(header_name);
293        }
294      }
295    }
296  }
297
298  (headers_to_add, headers_to_replace, headers_to_remove)
299}
300
301/// Helper function to apply all response headers and log if needed
302#[allow(clippy::too_many_arguments)]
303async fn finalize_response_and_log(
304  response: Response<BoxBody<Bytes, std::io::Error>>,
305  http3_alt_port: Option<u16>,
306  headers_to_add: HeaderMap,
307  headers_to_replace: HeaderMap,
308  headers_to_remove: Vec<HeaderName>,
309  loggers: &[Sender<LogMessage>],
310  request_parts: &Option<hyper::http::request::Parts>,
311  socket_data: &SocketData,
312  latest_auth_data: Option<&str>,
313  date_format: Option<&str>,
314  log_format: Option<&str>,
315  log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
316) -> Response<BoxBody<Bytes, std::io::Error>> {
317  let (mut response_parts, response_body) = response.into_parts();
318
319  add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
320  add_server_header(&mut response_parts);
321  add_custom_headers(
322    &mut response_parts,
323    &headers_to_add,
324    &headers_to_replace,
325    &headers_to_remove,
326  );
327
328  let response = Response::from_parts(response_parts, response_body);
329
330  if let Some(request_parts) = request_parts {
331    if !loggers.is_empty() {
332      log_access(
333        loggers,
334        request_parts,
335        socket_data,
336        latest_auth_data,
337        response.status().as_u16(),
338        extract_content_length(&response),
339        date_format,
340        log_format,
341        log_json_props,
342      )
343      .await;
344    }
345  }
346
347  response
348}
349
350/// Helper function to finalize a basic error response with logging and default headers
351#[allow(clippy::too_many_arguments)]
352async fn finalize_basic_error_response(
353  response: Response<BoxBody<Bytes, std::io::Error>>,
354  request_parts: hyper::http::request::Parts,
355  http3_alt_port: Option<u16>,
356  loggers: &[Sender<LogMessage>],
357  socket_data: &SocketData,
358  date_format: Option<&str>,
359  log_format: Option<&str>,
360  log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
361) -> Response<BoxBody<Bytes, std::io::Error>> {
362  let request_parts = Some(request_parts);
363  finalize_response_and_log(
364    response,
365    http3_alt_port,
366    HeaderMap::new(),
367    HeaderMap::new(),
368    Vec::new(),
369    loggers,
370    &request_parts,
371    socket_data,
372    None,
373    date_format,
374    log_format,
375    log_json_props,
376  )
377  .await
378}
379
380/// Helper function to execute response modifying handlers
381#[allow(clippy::too_many_arguments)]
382#[inline]
383async fn execute_response_modifying_handlers(
384  mut response: Response<BoxBody<Bytes, std::io::Error>>,
385  mut executed_handlers: Vec<Box<dyn ModuleHandlers>>,
386  configuration: &ServerConfiguration,
387  http3_alt_port: Option<u16>,
388  headers_to_add: HeaderMap,
389  headers_to_replace: HeaderMap,
390  headers_to_remove: Vec<HeaderName>,
391  loggers: &[Sender<LogMessage>],
392  request_parts: &Option<hyper::http::request::Parts>,
393  socket_data: &SocketData,
394  latest_auth_data: Option<&str>,
395  date_format: Option<&str>,
396  log_format: Option<&str>,
397  log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
398  metrics_sender: MetricsMultiSender,
399  metrics_enabled: bool,
400  traces_senders: Vec<Sender<TraceSignal>>,
401  traces_enabled: bool,
402  timeout_instant: std::time::Instant,
403  timeout_duration: Option<std::time::Duration>,
404) -> Result<Result<Response<BoxBody<Bytes, std::io::Error>>, Response<BoxBody<Bytes, std::io::Error>>>, anyhow::Error> {
405  while let Some(mut executed_handler) = executed_handlers.pop() {
406    if traces_enabled {
407      for trace_sender in &traces_senders {
408        trace_sender
409          .send(TraceSignal::StartSpan(format!(
410            "{}::response_modifying_handler",
411            executed_handler.get_name()
412          )))
413          .await
414          .unwrap_or_default();
415      }
416    }
417    let (response_status, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
418      let elapsed = timeout_instant.elapsed();
419      if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
420        match timeout(
421          timeout_cur_duration,
422          executed_handler.response_modifying_handler(response),
423        )
424        .await
425        {
426          Ok(result) => (result, false),
427          Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
428        }
429      } else {
430        (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
431      }
432    } else {
433      (executed_handler.response_modifying_handler(response).await, false)
434    };
435    if traces_enabled {
436      for trace_sender in &traces_senders {
437        trace_sender
438          .send(TraceSignal::EndSpan(
439            format!("{}::response_modifying_handler", executed_handler.get_name()),
440            response_status.as_ref().err().map(|e| e.to_string()),
441          ))
442          .await
443          .unwrap_or_default();
444      }
445    }
446    if is_timeout {
447      if metrics_enabled {
448        while let Some(mut executed_handler) = executed_handlers.pop() {
449          executed_handler.metric_data_after_handler(&metrics_sender).await;
450        }
451      }
452      Err(anyhow::anyhow!("The client or server has timed out"))?;
453    }
454    response = match response_status {
455      Ok(response) => response,
456      Err(err) => {
457        for logger in loggers {
458          logger
459            .send(LogMessage::new(
460              format!("Unexpected error while serving a request: {err}"),
461              true,
462            ))
463            .await
464            .unwrap_or_default();
465        }
466
467        let error_response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, configuration, &None).await;
468
469        let final_response = finalize_response_and_log(
470          error_response,
471          http3_alt_port,
472          headers_to_add,
473          headers_to_replace,
474          headers_to_remove,
475          loggers,
476          request_parts,
477          socket_data,
478          latest_auth_data,
479          date_format,
480          log_format,
481          log_json_props,
482        )
483        .await;
484
485        if metrics_enabled {
486          while let Some(mut executed_handler) = executed_handlers.pop() {
487            executed_handler.metric_data_after_handler(&metrics_sender).await;
488          }
489        }
490
491        return Ok(Err(final_response));
492      }
493    };
494    if metrics_enabled {
495      executed_handler.metric_data_after_handler(&metrics_sender).await;
496    }
497  }
498  Ok(Ok(response))
499}
500
501/// Helper function to apply response headers, execute response modifiers, and log access if needed
502#[allow(clippy::too_many_arguments)]
503async fn finalize_with_modifying_handlers(
504  response: Response<BoxBody<Bytes, std::io::Error>>,
505  executed_handlers: Vec<Box<dyn ModuleHandlers>>,
506  configuration: &ServerConfiguration,
507  http3_alt_port: Option<u16>,
508  headers_to_add: HeaderMap,
509  headers_to_replace: HeaderMap,
510  headers_to_remove: Vec<HeaderName>,
511  log_request_parts: &Option<hyper::http::request::Parts>,
512  socket_data: &SocketData,
513  latest_auth_data: Option<&str>,
514  log_date_format: Option<&str>,
515  log_format: Option<&str>,
516  log_json_props: Option<&HashMap<String, ServerConfigurationValue>>,
517  metrics_sender: MetricsMultiSender,
518  metrics_enabled: bool,
519  traces_senders: Vec<Sender<TraceSignal>>,
520  traces_enabled: bool,
521  timeout_instant: std::time::Instant,
522  timeout_duration: Option<std::time::Duration>,
523) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
524  let (mut response_parts, response_body) = response.into_parts();
525
526  add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
527  add_server_header(&mut response_parts);
528  add_custom_headers(
529    &mut response_parts,
530    &headers_to_add,
531    &headers_to_replace,
532    &headers_to_remove,
533  );
534
535  let response = Response::from_parts(response_parts, response_body);
536
537  match execute_response_modifying_handlers(
538    response,
539    executed_handlers,
540    configuration,
541    http3_alt_port,
542    headers_to_add,
543    headers_to_replace,
544    headers_to_remove,
545    &configuration.observability.log_channels,
546    log_request_parts,
547    socket_data,
548    latest_auth_data,
549    log_date_format,
550    log_format,
551    log_json_props,
552    metrics_sender,
553    metrics_enabled,
554    traces_senders,
555    traces_enabled,
556    timeout_instant,
557    timeout_duration,
558  )
559  .await?
560  {
561    Ok(response) => {
562      if let Some(request_parts) = log_request_parts.as_ref() {
563        log_access(
564          &configuration.observability.log_channels,
565          request_parts,
566          socket_data,
567          latest_auth_data,
568          response.status().as_u16(),
569          extract_content_length(&response),
570          log_date_format,
571          log_format,
572          log_json_props,
573        )
574        .await;
575      }
576      Ok(response)
577    }
578    Err(error_response) => Ok(error_response),
579  }
580}
581
582#[allow(clippy::type_complexity)]
583#[allow(clippy::result_large_err)]
584fn sanitize_url_request(
585  configuration: &ServerConfiguration,
586  mut request: Request<BoxBody<Bytes, std::io::Error>>,
587) -> Result<
588  (Request<BoxBody<Bytes, std::io::Error>>, bool),
589  (
590    Request<BoxBody<Bytes, std::io::Error>>,
591    Box<dyn std::error::Error + Send + Sync>,
592  ),
593> {
594  let url_pathname = request.uri().path();
595  let sanitized_url_pathname = match sanitize_url(
596    url_pathname,
597    get_value!("allow_double_slashes", configuration)
598      .and_then(|v| v.as_bool())
599      .unwrap_or(false),
600  ) {
601    Ok(sanitized_url_pathname) => sanitized_url_pathname,
602    Err(err) => {
603      return Err((request, err.into()));
604    }
605  };
606
607  if sanitized_url_pathname != url_pathname {
608    let (mut parts, body) = request.into_parts();
609    let orig_uri = parts.uri.clone();
610    let mut url_parts = parts.uri.into_parts();
611    url_parts.path_and_query = Some(
612      match format!(
613        "{}{}",
614        sanitized_url_pathname,
615        url_parts
616          .path_and_query
617          .as_ref()
618          .and_then(|pq| pq.query())
619          .map_or("".to_string(), |q| format!("?{q}"))
620      )
621      .parse()
622      {
623        Ok(path_and_query) => path_and_query,
624        Err(e) => {
625          parts.uri = orig_uri;
626          request = Request::from_parts(parts, body);
627          return Err((request, e.into()));
628        }
629      },
630    );
631    parts.uri = match hyper::Uri::from_parts(url_parts) {
632      Ok(uri) => uri,
633      Err(e) => {
634        parts.uri = orig_uri;
635        request = Request::from_parts(parts, body);
636        return Err((request, e.into()));
637      }
638    };
639    request = Request::from_parts(parts, body);
640    return Ok((request, true));
641  }
642  Ok((request, false))
643}
644
645/// The HTTP request handler, with timeout
646#[allow(clippy::too_many_arguments)]
647pub async fn request_handler(
648  mut request: Request<BoxBody<Bytes, std::io::Error>>,
649  client_address: SocketAddr,
650  server_address: SocketAddr,
651  encrypted: bool,
652  configurations: Arc<ServerConfigurations>,
653  http3_alt_port: Option<u16>,
654  acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
655  proxy_protocol_client_address: Option<SocketAddr>,
656  proxy_protocol_server_address: Option<SocketAddr>,
657) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
658  // Global configuration
659  let global_configuration = configurations.find_global_configuration();
660  let global_loggers: &[Sender<LogMessage>] = global_configuration
661    .as_ref()
662    .map_or(&[], |c| &*c.observability.log_channels);
663  let global_log_date_format = global_configuration
664    .as_deref()
665    .and_then(|c| get_value!("log_date_format", c))
666    .and_then(|v| v.as_str());
667  let global_log_format = global_configuration
668    .as_deref()
669    .and_then(|c| get_value!("log_format", c))
670    .and_then(|v| v.as_str());
671  let global_log_json = global_configuration
672    .as_deref()
673    .and_then(|c| c.entries.get("log_json"))
674    .and_then(|entries| entries.get_entry())
675    .map(|entry| entry.props.clone());
676
677  // Request timeout
678  let timeout_from_config = global_configuration
679    .as_deref()
680    .and_then(|c| get_entry!("timeout", c))
681    .and_then(|e| e.values.last());
682  let timeout_duration = if timeout_from_config.is_some_and(|v| v.is_null()) {
683    None
684  } else {
685    let timeout_millis = timeout_from_config.and_then(|v| v.as_i128()).unwrap_or(300000) as u64;
686    Some(Duration::from_millis(timeout_millis))
687  };
688  let timeout_instant = std::time::Instant::now();
689
690  // Normalize HTTP/2 and HTTP/3 request objects
691  if matches!(request.version(), hyper::Version::HTTP_2 | hyper::Version::HTTP_3) {
692    // Set "Host" request header for HTTP/2 and HTTP/3 connections
693    if let Some(authority) = request.uri().authority() {
694      let authority = authority.to_owned();
695      let headers = request.headers_mut();
696      if !headers.contains_key(header::HOST) {
697        if let Ok(authority_value) = HeaderValue::from_bytes(authority.as_str().as_bytes()) {
698          headers.append(header::HOST, authority_value);
699        }
700      }
701    }
702
703    // Normalize the Cookie header for HTTP/2 and HTTP/3
704    let mut cookie_normalized = String::new();
705    let mut cookie_set = false;
706    let headers = request.headers_mut();
707    for cookie in headers.get_all(header::COOKIE) {
708      if let Ok(cookie) = cookie.to_str() {
709        if cookie_set {
710          cookie_normalized.push_str("; ");
711        }
712        cookie_set = true;
713        cookie_normalized.push_str(cookie);
714      }
715    }
716    if cookie_set {
717      if let Ok(cookie_value) = HeaderValue::from_bytes(cookie_normalized.as_bytes()) {
718        headers.insert(header::COOKIE, cookie_value);
719      }
720    }
721  }
722
723  // Construct socket data
724  let mut socket_data = SocketData {
725    remote_addr: proxy_protocol_client_address.unwrap_or(client_address),
726    local_addr: proxy_protocol_server_address.unwrap_or(server_address),
727    encrypted,
728  };
729
730  // Sanitize "Host" header
731  let host_header_option = request.headers().get(header::HOST);
732  if let Some(header_data) = host_header_option {
733    match header_data.to_str() {
734      Ok(host_header) => {
735        let host_header_lower_case = host_header.to_lowercase();
736        let host_header_without_dot = host_header_lower_case
737          .strip_suffix('.')
738          .unwrap_or(host_header_lower_case.as_str());
739        if host_header_without_dot != host_header {
740          let host_header_value = match HeaderValue::from_str(host_header_without_dot) {
741            Ok(host_header_value) => host_header_value,
742            Err(err) => {
743              for logger in global_loggers {
744                logger
745                  .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
746                  .await
747                  .unwrap_or_default();
748              }
749              let response = basic_error_response(StatusCode::BAD_REQUEST);
750              let (request_parts, _) = request.into_parts();
751              return Ok(
752                finalize_basic_error_response(
753                  response,
754                  request_parts,
755                  http3_alt_port,
756                  global_loggers,
757                  &socket_data,
758                  global_log_date_format,
759                  global_log_format,
760                  global_log_json.as_ref(),
761                )
762                .await,
763              );
764            }
765          };
766
767          request.headers_mut().insert(header::HOST, host_header_value);
768        }
769      }
770      Err(err) => {
771        for logger in global_loggers {
772          logger
773            .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
774            .await
775            .unwrap_or_default();
776        }
777        let response = basic_error_response(StatusCode::BAD_REQUEST);
778        let (request_parts, _) = request.into_parts();
779        return Ok(
780          finalize_basic_error_response(
781            response,
782            request_parts,
783            http3_alt_port,
784            global_loggers,
785            &socket_data,
786            global_log_date_format,
787            global_log_format,
788            global_log_json.as_ref(),
789          )
790          .await,
791        );
792      }
793    }
794  };
795
796  let hostname_determinant = request.headers().get(header::HOST).and_then(|value| {
797    value.to_str().ok().map(|h| {
798      h.rsplit_once(':')
799        .and_then(|(left, right)| {
800          if right.parse::<u16>().is_ok() {
801            Some(left.to_string())
802          } else {
803            None
804          }
805        })
806        .unwrap_or_else(|| h.to_string())
807    })
808  });
809
810  let (request_parts, request_body) = request.into_parts();
811  let mut log_request_parts = if global_configuration
812    .as_ref()
813    .is_some_and(|c| !c.observability.log_channels.is_empty())
814  {
815    Some(request_parts.clone())
816  } else {
817    None
818  };
819  let request = Request::from_parts(request_parts, request_body);
820
821  // Find the server configuration
822  let (request_parts, request_body) = request.into_parts();
823  let configuration_option =
824    configurations.find_configuration(&request_parts, hostname_determinant.as_deref(), &socket_data);
825  let mut request = Request::from_parts(request_parts, request_body);
826  let mut configuration_error_handler_lookup = match configuration_option
827    .and_then(|c| c.ok_or_else(|| anyhow::anyhow!("No matching configuration found").into_boxed_dyn_error()))
828  {
829    Ok(configuration) => configuration,
830    Err(err) => {
831      for logger in global_loggers {
832        logger
833          .send(LogMessage::new(
834            format!("Cannot determine server configuration: {err}"),
835            true,
836          ))
837          .await
838          .unwrap_or_default()
839      }
840      let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
841      let (request_parts, _) = request.into_parts();
842      return Ok(
843        finalize_basic_error_response(
844          response,
845          request_parts,
846          http3_alt_port,
847          global_loggers,
848          &socket_data,
849          global_log_date_format,
850          global_log_format,
851          global_log_json.as_ref(),
852        )
853        .await,
854      );
855    }
856  };
857  let mut configuration = match configuration_error_handler_lookup.get_default().cloned() {
858    Some(configuration) => configuration,
859    None => {
860      for logger in global_loggers {
861        logger
862          .send(LogMessage::new(
863            "Cannot determine server configuration: No matching configuration found".to_string(),
864            true,
865          ))
866          .await
867          .unwrap_or_default()
868      }
869      let response = basic_error_response(StatusCode::INTERNAL_SERVER_ERROR);
870      let (request_parts, _) = request.into_parts();
871      return Ok(
872        finalize_basic_error_response(
873          response,
874          request_parts,
875          http3_alt_port,
876          global_loggers,
877          &socket_data,
878          global_log_date_format,
879          global_log_format,
880          global_log_json.as_ref(),
881        )
882        .await,
883      );
884    }
885  };
886
887  // Determine the log formats
888  let mut log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
889  let mut log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
890  let mut log_json_props = configuration
891    .entries
892    .get("log_json")
893    .and_then(|entries| entries.get_entry())
894    .map(|entry| entry.props.clone());
895
896  // Clone the log request parts if logging is enabled and request parts are not already cloned
897  if !configuration.observability.log_channels.is_empty() && log_request_parts.is_none() {
898    let (request_parts, request_body) = request.into_parts();
899    log_request_parts = Some(request_parts.clone());
900    request = Request::from_parts(request_parts, request_body);
901  }
902
903  // Sanitize the URL, if the URL sanitizer is enabled
904  if !get_value!("disable_url_sanitizer", configuration)
905    .and_then(|v| v.as_bool())
906    .unwrap_or(false)
907  {
908    request = match sanitize_url_request(&configuration, request) {
909      Ok((mut request, was_dirty)) => {
910        if was_dirty {
911          let (parts, body) = request.into_parts();
912          let configuration_option =
913            configurations.find_configuration(&parts, hostname_determinant.as_deref(), &socket_data);
914          request = Request::from_parts(parts, body);
915          match configuration_option {
916            Ok(Some(new_configuration)) => {
917              if let Some(new_config2) = new_configuration.get_default().cloned() {
918                configuration_error_handler_lookup = new_configuration;
919                configuration = new_config2;
920                log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
921                log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
922                log_json_props = configuration
923                  .entries
924                  .get("log_json")
925                  .and_then(|entries| entries.get_entry())
926                  .map(|entry| entry.props.clone());
927              }
928            }
929            Ok(None) => {}
930            Err(err) => {
931              for logger in &configuration.observability.log_channels {
932                logger
933                  .send(LogMessage::new(
934                    format!("Cannot determine server configuration: {err}"),
935                    true,
936                  ))
937                  .await
938                  .unwrap_or_default();
939              }
940              let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
941
942              let (request_parts, _) = request.into_parts();
943              let (headers_to_add, headers_to_replace, headers_to_remove) =
944                build_custom_headers(&configuration, &request_parts);
945
946              return Ok(
947                finalize_response_and_log(
948                  response,
949                  http3_alt_port,
950                  headers_to_add,
951                  headers_to_replace,
952                  headers_to_remove,
953                  &configuration.observability.log_channels,
954                  &log_request_parts,
955                  &socket_data,
956                  None,
957                  log_date_format,
958                  log_format,
959                  log_json_props.as_ref(),
960                )
961                .await,
962              );
963            }
964          }
965        }
966        request
967      }
968      Err((request, error)) => {
969        for logger in &configuration.observability.log_channels {
970          logger
971            .send(LogMessage::new(format!("URL sanitation error: {error}"), true))
972            .await
973            .unwrap_or_default();
974        }
975
976        let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
977
978        let (parts, _) = request.into_parts();
979        let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &parts);
980
981        return Ok(
982          finalize_response_and_log(
983            response,
984            http3_alt_port,
985            headers_to_add,
986            headers_to_replace,
987            headers_to_remove,
988            &configuration.observability.log_channels,
989            &log_request_parts,
990            &socket_data,
991            None,
992            log_date_format,
993            log_format,
994            log_json_props.as_ref(),
995          )
996          .await,
997        );
998      }
999    };
1000  }
1001
1002  let (request_parts, request_body) = request.into_parts();
1003  let (headers_to_add, headers_to_replace, headers_to_remove) = build_custom_headers(&configuration, &request_parts);
1004  let mut request = Request::from_parts(request_parts, request_body);
1005
1006  if request.uri().path() == "*" {
1007    let response = match request.method() {
1008      &Method::OPTIONS => Response::builder()
1009        .status(StatusCode::NO_CONTENT)
1010        .header(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"))
1011        .body(Empty::new().map_err(|e| match e {}).boxed())
1012        .unwrap_or_default(),
1013      _ => {
1014        let mut header_map = HeaderMap::new();
1015        header_map.insert(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"));
1016        generate_error_response(StatusCode::BAD_REQUEST, &configuration, &Some(header_map)).await
1017      }
1018    };
1019    return Ok(
1020      finalize_response_and_log(
1021        response,
1022        http3_alt_port,
1023        headers_to_add,
1024        headers_to_replace,
1025        headers_to_remove,
1026        &configuration.observability.log_channels,
1027        &log_request_parts,
1028        &socket_data,
1029        None,
1030        log_date_format,
1031        log_format,
1032        log_json_props.as_ref(),
1033      )
1034      .await,
1035    );
1036  }
1037
1038  // HTTP-01 ACME challenge for automatic TLS
1039  let acme_http_01_resolvers_inner = acme_http_01_resolvers.read().await;
1040  if !acme_http_01_resolvers_inner.is_empty() {
1041    if let Some(challenge_token) = request.uri().path().strip_prefix("/.well-known/acme-challenge/") {
1042      for acme_http01_resolver in &*acme_http_01_resolvers_inner {
1043        if let Some(http01_acme_data) = &*acme_http01_resolver.read().await {
1044          let acme_response = http01_acme_data.1.clone();
1045          if challenge_token == http01_acme_data.0 {
1046            let response = Response::builder()
1047              .status(StatusCode::OK)
1048              .header(
1049                header::CONTENT_TYPE,
1050                HeaderValue::from_static("application/octet-stream"),
1051              )
1052              .body(Full::new(Bytes::from(acme_response)).map_err(|e| match e {}).boxed())
1053              .unwrap_or_default();
1054
1055            return Ok(
1056              finalize_response_and_log(
1057                response,
1058                http3_alt_port,
1059                headers_to_add,
1060                headers_to_replace,
1061                headers_to_remove,
1062                &configuration.observability.log_channels,
1063                &log_request_parts,
1064                &socket_data,
1065                None,
1066                log_date_format,
1067                log_format,
1068                log_json_props.as_ref(),
1069              )
1070              .await,
1071            );
1072          }
1073        }
1074      }
1075    }
1076  };
1077  drop(acme_http_01_resolvers_inner);
1078
1079  let mut error_logger = if !configuration.observability.log_channels.is_empty() {
1080    ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1081  } else {
1082    ErrorLogger::without_logger()
1083  };
1084  let mut metrics_enabled = !configuration.observability.metric_channels.is_empty();
1085  let mut metrics_sender = if metrics_enabled {
1086    MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1087  } else {
1088    MetricsMultiSender::without_sender()
1089  };
1090  let mut traces_enabled = !configuration.observability.trace_channels.is_empty();
1091  let mut traces_senders = if traces_enabled {
1092    let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1093    for channel in &configuration.observability.trace_channels {
1094      channel.0.send(()).await.unwrap_or_default();
1095      if let Ok(channel2) = channel.1.recv().await {
1096        traces_senders.push(channel2);
1097      }
1098    }
1099    traces_senders
1100  } else {
1101    vec![]
1102  };
1103
1104  // Obtain module handlers
1105  let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1106  for module in &configuration.modules {
1107    module_handlers.push(module.get_module_handlers());
1108  }
1109
1110  // Execute modules!
1111  request.extensions_mut().insert(RequestData {
1112    auth_user: None,
1113    original_url: None,
1114    error_status_code: None,
1115  });
1116  let mut executed_handlers = Vec::new();
1117  let (request_parts, request_body) = request.into_parts();
1118  let request_parts_cloned = if configuration_error_handler_lookup.has_status_codes() {
1119    let mut request_parts_cloned = request_parts.clone();
1120    request_parts_cloned
1121      .headers
1122      .insert(header::CONTENT_LENGTH, HeaderValue::from_static("0"));
1123    Some(request_parts_cloned)
1124  } else {
1125    // If the error configuration is not specified, don't clone the request parts to improve performance
1126    None
1127  };
1128  let mut request = Request::from_parts(request_parts, request_body);
1129  let mut latest_auth_data = None;
1130  let mut is_error_handler = false;
1131  let mut handlers_iter: Box<dyn Iterator<Item = Box<dyn ModuleHandlers>>> = Box::new(module_handlers.into_iter());
1132  while let Some(mut handlers) = handlers_iter.next() {
1133    if metrics_enabled {
1134      handlers
1135        .metric_data_before_handler(&request, &socket_data, &metrics_sender)
1136        .await;
1137    }
1138
1139    if traces_enabled {
1140      for trace_sender in &traces_senders {
1141        trace_sender
1142          .send(TraceSignal::StartSpan(format!(
1143            "{}::request_handler",
1144            handlers.get_name()
1145          )))
1146          .await
1147          .unwrap_or_default();
1148      }
1149    }
1150
1151    let (response_result, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
1152      let elapsed = timeout_instant.elapsed();
1153      if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
1154        match timeout(
1155          timeout_cur_duration,
1156          handlers.request_handler(request, &configuration, &socket_data, &error_logger),
1157        )
1158        .await
1159        {
1160          Ok(result) => (result, false),
1161          Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
1162        }
1163      } else {
1164        (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
1165      }
1166    } else {
1167      (
1168        handlers
1169          .request_handler(request, &configuration, &socket_data, &error_logger)
1170          .await,
1171        false,
1172      )
1173    };
1174
1175    if traces_enabled {
1176      for trace_sender in &traces_senders {
1177        trace_sender
1178          .send(TraceSignal::EndSpan(
1179            format!("{}::request_handler", handlers.get_name()),
1180            response_result.as_ref().err().map(|e| e.to_string()),
1181          ))
1182          .await
1183          .unwrap_or_default();
1184      }
1185    }
1186
1187    executed_handlers.push(handlers);
1188
1189    if is_timeout {
1190      if metrics_enabled {
1191        while let Some(mut executed_handler) = executed_handlers.pop() {
1192          executed_handler.metric_data_after_handler(&metrics_sender).await;
1193        }
1194      }
1195      Err(anyhow::anyhow!("The client or server has timed out"))?;
1196    }
1197
1198    match response_result {
1199      Ok(response) => {
1200        let status = response.response_status;
1201        let headers = response.response_headers;
1202        let new_remote_address = response.new_remote_address;
1203        let request_option = response.request;
1204        let response = response.response;
1205        let request_extensions = request_option
1206          .as_ref()
1207          .and_then(|r| r.extensions().get::<RequestData>());
1208        if let Some(request_extensions) = request_extensions {
1209          latest_auth_data = request_extensions.auth_user.clone();
1210        }
1211        if let Some(new_remote_address) = new_remote_address {
1212          socket_data.remote_addr = new_remote_address;
1213        };
1214
1215        match response {
1216          Some(response) => {
1217            return finalize_with_modifying_handlers(
1218              response,
1219              executed_handlers,
1220              &configuration,
1221              http3_alt_port,
1222              headers_to_add,
1223              headers_to_replace,
1224              headers_to_remove,
1225              &log_request_parts,
1226              &socket_data,
1227              latest_auth_data.as_deref(),
1228              log_date_format,
1229              log_format,
1230              log_json_props.as_ref(),
1231              metrics_sender,
1232              metrics_enabled,
1233              traces_senders,
1234              traces_enabled,
1235              timeout_instant,
1236              timeout_duration,
1237            )
1238            .await;
1239          }
1240          None => match status {
1241            Some(status) => {
1242              if !is_error_handler {
1243                if let Some(error_configuration) = configuration_error_handler_lookup.get(status.as_u16()).cloned() {
1244                  let request_option = if let Some(request) = request_option {
1245                    Some(request)
1246                  } else {
1247                    request_parts_cloned.clone().map(|request_parts_cloned| {
1248                      Request::from_parts(request_parts_cloned, Empty::new().map_err(|e| match e {}).boxed())
1249                    })
1250                  };
1251                  if let Some(request_cloned) = request_option {
1252                    configuration = error_configuration;
1253                    let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1254                    for module in &configuration.modules {
1255                      module_handlers.push(module.get_module_handlers());
1256                    }
1257                    handlers_iter = Box::new(module_handlers.into_iter());
1258                    if metrics_enabled {
1259                      while let Some(mut executed_handler) = executed_handlers.pop() {
1260                        executed_handler.metric_data_after_handler(&metrics_sender).await;
1261                      }
1262                    }
1263                    executed_handlers = Vec::new();
1264                    request = request_cloned;
1265                    if let Some(request_data) = request.extensions_mut().get_mut::<RequestData>() {
1266                      request_data.error_status_code = Some(status);
1267                    }
1268                    is_error_handler = true;
1269                    log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1270                    log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1271                    log_json_props = configuration
1272                      .entries
1273                      .get("log_json")
1274                      .and_then(|entries| entries.get_entry())
1275                      .map(|entry| entry.props.clone());
1276                    error_logger = if !configuration.observability.log_channels.is_empty() {
1277                      ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1278                    } else {
1279                      ErrorLogger::without_logger()
1280                    };
1281                    metrics_enabled = !configuration.observability.metric_channels.is_empty();
1282                    metrics_sender = if metrics_enabled {
1283                      MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1284                    } else {
1285                      MetricsMultiSender::without_sender()
1286                    };
1287                    traces_enabled = !configuration.observability.trace_channels.is_empty();
1288                    traces_senders = if traces_enabled {
1289                      let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1290                      for channel in &configuration.observability.trace_channels {
1291                        channel.0.send(()).await.unwrap_or_default();
1292                        if let Ok(channel2) = channel.1.recv().await {
1293                          traces_senders.push(channel2);
1294                        }
1295                      }
1296                      traces_senders
1297                    } else {
1298                      vec![]
1299                    };
1300                    continue;
1301                  }
1302                }
1303              }
1304              let response = generate_error_response(status, &configuration, &headers).await;
1305              return finalize_with_modifying_handlers(
1306                response,
1307                executed_handlers,
1308                &configuration,
1309                http3_alt_port,
1310                headers_to_add,
1311                headers_to_replace,
1312                headers_to_remove,
1313                &log_request_parts,
1314                &socket_data,
1315                latest_auth_data.as_deref(),
1316                log_date_format,
1317                log_format,
1318                log_json_props.as_ref(),
1319                metrics_sender,
1320                metrics_enabled,
1321                traces_senders,
1322                traces_enabled,
1323                timeout_instant,
1324                timeout_duration,
1325              )
1326              .await;
1327            }
1328            None => match request_option {
1329              Some(request_obtained) => {
1330                request = request_obtained;
1331                continue;
1332              }
1333              None => {
1334                break;
1335              }
1336            },
1337          },
1338        }
1339      }
1340      Err(err) => {
1341        let response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, &configuration, &None).await;
1342        let err_string = err.to_string();
1343
1344        if !err_string.is_empty() {
1345          for logger in &configuration.observability.log_channels {
1346            logger
1347              .send(LogMessage::new(
1348                format!("Unexpected error while serving a request: {err}"),
1349                true,
1350              ))
1351              .await
1352              .unwrap_or_default();
1353          }
1354        }
1355
1356        let response_result = finalize_with_modifying_handlers(
1357          response,
1358          executed_handlers,
1359          &configuration,
1360          http3_alt_port,
1361          headers_to_add,
1362          headers_to_replace,
1363          headers_to_remove,
1364          &log_request_parts,
1365          &socket_data,
1366          latest_auth_data.as_deref(),
1367          log_date_format,
1368          log_format,
1369          log_json_props.as_ref(),
1370          metrics_sender,
1371          metrics_enabled,
1372          traces_senders,
1373          traces_enabled,
1374          timeout_instant,
1375          timeout_duration,
1376        )
1377        .await;
1378
1379        if err_string.is_empty() {
1380          // Abort the HTTP request
1381          return Err(anyhow::anyhow!("HTTP request aborted"));
1382        }
1383
1384        return response_result;
1385      }
1386    }
1387  }
1388
1389  let response = generate_error_response(StatusCode::NOT_FOUND, &configuration, &None).await;
1390
1391  finalize_with_modifying_handlers(
1392    response,
1393    executed_handlers,
1394    &configuration,
1395    http3_alt_port,
1396    headers_to_add,
1397    headers_to_replace,
1398    headers_to_remove,
1399    &log_request_parts,
1400    &socket_data,
1401    latest_auth_data.as_deref(),
1402    log_date_format,
1403    log_format,
1404    log_json_props.as_ref(),
1405    metrics_sender,
1406    metrics_enabled,
1407    traces_senders,
1408    traces_enabled,
1409    timeout_instant,
1410    timeout_duration,
1411  )
1412  .await
1413}