1use std::net::SocketAddr;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_channel::Sender;
7use chrono::{DateTime, Local};
8use ferron_common::logging::{ErrorLogger, LogMessage};
9use ferron_common::observability::{MetricsMultiSender, TraceSignal};
10use futures_util::stream::TryStreamExt;
11use http_body_util::combinators::BoxBody;
12use http_body_util::{BodyExt, Empty, Full, StreamBody};
13use hyper::body::{Body, Bytes, Frame};
14use hyper::header::{HeaderName, HeaderValue};
15use hyper::{header, HeaderMap, Method, Request, Response, StatusCode};
16#[cfg(feature = "runtime-tokio")]
17use tokio::io::BufReader;
18#[cfg(feature = "runtime-tokio")]
19use tokio_util::io::ReaderStream;
20
21use crate::config::{ServerConfiguration, ServerConfigurations};
22use crate::get_value;
23use crate::runtime::timeout;
24#[cfg(feature = "runtime-monoio")]
25use crate::util::MonoioFileStreamNoSpawn;
26use crate::util::{
27 generate_default_error_page, replace_header_placeholders, replace_log_placeholders, sanitize_url, SERVER_SOFTWARE,
28};
29
30use ferron_common::modules::{ModuleHandlers, RequestData, SocketData};
31use ferron_common::{get_entries, get_entry};
32
33async fn generate_error_response(
35 status_code: StatusCode,
36 config: &ServerConfiguration,
37 headers: &Option<HeaderMap>,
38) -> Response<BoxBody<Bytes, std::io::Error>> {
39 let bare_body = generate_default_error_page(
40 status_code,
41 get_value!("server_administrator_email", config).and_then(|v| v.as_str()),
42 );
43 let mut content_length: Option<u64> = bare_body.len().try_into().ok();
44 let mut response_body = Full::new(Bytes::from(bare_body)).map_err(|e| match e {}).boxed();
45
46 if let Some(error_pages) = get_entries!("error_page", config) {
47 for error_page in &error_pages.inner {
48 if let Some(page_status_code) = error_page.values.first().and_then(|v| v.as_i128()) {
49 let page_status_code = match StatusCode::from_u16(match page_status_code.try_into() {
50 Ok(status_code) => status_code,
51 Err(_) => continue,
52 }) {
53 Ok(status_code) => status_code,
54 Err(_) => continue,
55 };
56 if status_code != page_status_code {
57 continue;
58 }
59 if let Some(page_path) = error_page.values.get(1).and_then(|v| v.as_str()) {
60 #[cfg(feature = "runtime-monoio")]
61 let file = monoio::fs::File::open(page_path).await;
62 #[cfg(feature = "runtime-tokio")]
63 let file = tokio::fs::File::open(page_path).await;
64
65 let file = match file {
66 Ok(file) => file,
67 Err(_) => continue,
68 };
69
70 #[cfg(any(feature = "runtime-tokio", all(feature = "runtime-monoio", unix)))]
72 let metadata = file.metadata().await;
73 #[cfg(all(feature = "runtime-monoio", windows))]
74 let metadata = {
75 let page_path = page_path.to_owned();
76 monoio::spawn_blocking(move || std::fs::metadata(page_path))
77 .await
78 .unwrap_or(Err(std::io::Error::other(
79 "Can't spawn a blocking task to obtain the file metadata",
80 )))
81 };
82
83 content_length = match metadata {
84 Ok(metadata) => Some(metadata.len()),
85 Err(_) => None,
86 };
87
88 #[cfg(feature = "runtime-monoio")]
89 let file_stream = MonoioFileStreamNoSpawn::new(file, None, content_length);
90 #[cfg(feature = "runtime-tokio")]
91 let file_stream = ReaderStream::new(BufReader::with_capacity(12800, file));
92
93 let stream_body = StreamBody::new(file_stream.map_ok(Frame::data));
94 let boxed_body = stream_body.boxed();
95
96 response_body = boxed_body;
97
98 break;
99 }
100 }
101 }
102 }
103
104 let mut response_builder = Response::builder().status(status_code);
105
106 if let Some(headers) = headers {
107 let headers_iter = headers.iter();
108 for (name, value) in headers_iter {
109 if name != header::CONTENT_TYPE && name != header::CONTENT_LENGTH {
110 response_builder = response_builder.header(name, value);
111 }
112 }
113 }
114
115 if let Some(content_length) = content_length {
116 response_builder = response_builder.header(header::CONTENT_LENGTH, content_length);
117 }
118 response_builder = response_builder.header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"));
119
120 response_builder.body(response_body).unwrap_or_default()
121}
122
123#[allow(clippy::too_many_arguments)]
125async fn log_access(
126 loggers: &Vec<Sender<LogMessage>>,
127 request_parts: &hyper::http::request::Parts,
128 socket_data: &SocketData,
129 auth_user: Option<&str>,
130 status_code: u16,
131 content_length: Option<u64>,
132 date_format: Option<&str>,
133 log_format: Option<&str>,
134) {
135 let now: DateTime<Local> = Local::now();
136 let formatted_time = now.format(date_format.unwrap_or("%d/%b/%Y:%H:%M:%S %z")).to_string();
137 let log_message_string = replace_log_placeholders(
138 log_format.unwrap_or(
139 "{client_ip} - {auth_user} [{timestamp}] \"{method} {path_and_query} {version}\" \
140 {status_code} {content_length} \"{header:Referer}\" \"{header:User-Agent}\"",
141 ),
142 request_parts,
143 socket_data,
144 auth_user,
145 &formatted_time,
146 status_code,
147 content_length,
148 );
149 for logger in loggers {
150 logger
151 .send(LogMessage::new(log_message_string.clone(), false))
152 .await
153 .unwrap_or_default();
154 }
155}
156
157fn add_custom_headers(
159 response_parts: &mut hyper::http::response::Parts,
160 headers_to_add: &HeaderMap,
161 headers_to_replace: &HeaderMap,
162 headers_to_remove: &[HeaderName],
163) {
164 for (header_name, header_value) in headers_to_add {
165 if !response_parts.headers.contains_key(header_name) {
166 response_parts.headers.insert(header_name, header_value.to_owned());
167 }
168 }
169
170 for (header_name, header_value) in headers_to_replace {
171 response_parts.headers.insert(header_name, header_value.to_owned());
172 }
173
174 for header_to_remove in headers_to_remove.iter().rev() {
175 if response_parts.headers.contains_key(header_to_remove) {
176 while response_parts.headers.remove(header_to_remove).is_some() {}
177 }
178 }
179}
180
181fn add_http3_alt_svc_header(response_parts: &mut hyper::http::response::Parts, http3_alt_port: Option<u16>) {
183 if let Some(http3_alt_port) = http3_alt_port {
184 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
185 Some(value) => {
186 let header_value_old = String::from_utf8_lossy(value.as_bytes());
187 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
188
189 if header_value_old != header_value_new {
190 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
191 } else {
192 HeaderValue::from_bytes(header_value_old.as_bytes())
193 }
194 }
195 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
196 } {
197 response_parts.headers.insert(header::ALT_SVC, header_value);
198 }
199 }
200}
201
202fn add_server_header(response_parts: &mut hyper::http::response::Parts) {
204 response_parts
205 .headers
206 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
207}
208
209fn extract_content_length(response: &Response<BoxBody<Bytes, std::io::Error>>) -> Option<u64> {
211 match response.headers().get(header::CONTENT_LENGTH) {
212 Some(header_value) => match header_value.to_str() {
213 Ok(header_value) => match header_value.parse::<u64>() {
214 Ok(content_length) => Some(content_length),
215 Err(_) => response.body().size_hint().exact(),
216 },
217 Err(_) => response.body().size_hint().exact(),
218 },
219 None => response.body().size_hint().exact(),
220 }
221}
222
223#[allow(clippy::too_many_arguments)]
225async fn finalize_response_and_log(
226 response: Response<BoxBody<Bytes, std::io::Error>>,
227 http3_alt_port: Option<u16>,
228 headers_to_add: HeaderMap,
229 headers_to_replace: HeaderMap,
230 headers_to_remove: Vec<HeaderName>,
231 loggers: &Vec<Sender<LogMessage>>,
232 request_parts: &Option<hyper::http::request::Parts>,
233 socket_data: &SocketData,
234 latest_auth_data: Option<&str>,
235 date_format: Option<&str>,
236 log_format: Option<&str>,
237) -> Response<BoxBody<Bytes, std::io::Error>> {
238 let (mut response_parts, response_body) = response.into_parts();
239
240 add_custom_headers(
241 &mut response_parts,
242 &headers_to_add,
243 &headers_to_replace,
244 &headers_to_remove,
245 );
246 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
247 add_server_header(&mut response_parts);
248
249 let response = Response::from_parts(response_parts, response_body);
250
251 if let Some(request_parts) = request_parts {
252 if !loggers.is_empty() {
253 log_access(
254 loggers,
255 request_parts,
256 socket_data,
257 latest_auth_data,
258 response.status().as_u16(),
259 extract_content_length(&response),
260 date_format,
261 log_format,
262 )
263 .await;
264 }
265 }
266
267 response
268}
269
270#[allow(clippy::too_many_arguments)]
272async fn execute_response_modifying_handlers(
273 mut response: Response<BoxBody<Bytes, std::io::Error>>,
274 mut executed_handlers: Vec<Box<dyn ModuleHandlers>>,
275 configuration: &ServerConfiguration,
276 http3_alt_port: Option<u16>,
277 headers_to_add: HeaderMap,
278 headers_to_replace: HeaderMap,
279 headers_to_remove: Vec<HeaderName>,
280 loggers: &Vec<Sender<LogMessage>>,
281 request_parts: &Option<hyper::http::request::Parts>,
282 socket_data: &SocketData,
283 latest_auth_data: Option<&str>,
284 date_format: Option<&str>,
285 log_format: Option<&str>,
286 metrics_sender: MetricsMultiSender,
287 metrics_enabled: bool,
288 traces_senders: Vec<Sender<TraceSignal>>,
289 traces_enabled: bool,
290 timeout_instant: std::time::Instant,
291 timeout_duration: Option<std::time::Duration>,
292) -> Result<Result<Response<BoxBody<Bytes, std::io::Error>>, Response<BoxBody<Bytes, std::io::Error>>>, anyhow::Error> {
293 while let Some(mut executed_handler) = executed_handlers.pop() {
294 if traces_enabled {
295 for trace_sender in &traces_senders {
296 trace_sender
297 .send(TraceSignal::StartSpan(format!(
298 "{}::response_modifying_handler",
299 executed_handler.get_name()
300 )))
301 .await
302 .unwrap_or_default();
303 }
304 }
305 let (response_status, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
306 let elapsed = timeout_instant.elapsed();
307 if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
308 match timeout(
309 timeout_cur_duration,
310 executed_handler.response_modifying_handler(response),
311 )
312 .await
313 {
314 Ok(result) => (result, false),
315 Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
316 }
317 } else {
318 (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
319 }
320 } else {
321 (executed_handler.response_modifying_handler(response).await, false)
322 };
323 if traces_enabled {
324 for trace_sender in &traces_senders {
325 trace_sender
326 .send(TraceSignal::EndSpan(
327 format!("{}::response_modifying_handler", executed_handler.get_name()),
328 response_status.as_ref().err().map(|e| e.to_string()),
329 ))
330 .await
331 .unwrap_or_default();
332 }
333 }
334 if is_timeout {
335 if metrics_enabled {
336 while let Some(mut executed_handler) = executed_handlers.pop() {
337 executed_handler.metric_data_after_handler(&metrics_sender).await;
338 }
339 }
340 Err(anyhow::anyhow!("The client or server has timed out"))?;
341 }
342 response = match response_status {
343 Ok(response) => response,
344 Err(err) => {
345 for logger in loggers {
346 logger
347 .send(LogMessage::new(
348 format!("Unexpected error while serving a request: {err}"),
349 true,
350 ))
351 .await
352 .unwrap_or_default();
353 }
354
355 let error_response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, configuration, &None).await;
356
357 let final_response = finalize_response_and_log(
358 error_response,
359 http3_alt_port,
360 headers_to_add,
361 headers_to_replace,
362 headers_to_remove,
363 loggers,
364 request_parts,
365 socket_data,
366 latest_auth_data,
367 date_format,
368 log_format,
369 )
370 .await;
371
372 if metrics_enabled {
373 while let Some(mut executed_handler) = executed_handlers.pop() {
374 executed_handler.metric_data_after_handler(&metrics_sender).await;
375 }
376 }
377
378 return Ok(Err(final_response));
379 }
380 };
381 if metrics_enabled {
382 executed_handler.metric_data_after_handler(&metrics_sender).await;
383 }
384 }
385 Ok(Ok(response))
386}
387
388#[allow(clippy::too_many_arguments)]
390pub async fn request_handler(
391 mut request: Request<BoxBody<Bytes, std::io::Error>>,
392 client_address: SocketAddr,
393 server_address: SocketAddr,
394 encrypted: bool,
395 configurations: Arc<ServerConfigurations>,
396 http3_alt_port: Option<u16>,
397 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
398 proxy_protocol_client_address: Option<SocketAddr>,
399 proxy_protocol_server_address: Option<SocketAddr>,
400) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
401 let global_configuration = configurations.find_global_configuration();
403
404 let timeout_from_config = global_configuration
406 .as_deref()
407 .and_then(|c| get_entry!("timeout", c))
408 .and_then(|e| e.values.last());
409 let timeout_duration = if timeout_from_config.is_some_and(|v| v.is_null()) {
410 None
411 } else {
412 let timeout_millis = timeout_from_config.and_then(|v| v.as_i128()).unwrap_or(300000) as u64;
413 Some(Duration::from_millis(timeout_millis))
414 };
415 let timeout_instant = std::time::Instant::now();
416
417 match request.version() {
419 hyper::Version::HTTP_2 | hyper::Version::HTTP_3 => {
420 if let Some(authority) = request.uri().authority() {
422 let authority = authority.to_owned();
423 let headers = request.headers_mut();
424 if !headers.contains_key(header::HOST) {
425 if let Ok(authority_value) = HeaderValue::from_bytes(authority.as_str().as_bytes()) {
426 headers.append(header::HOST, authority_value);
427 }
428 }
429 }
430
431 let mut cookie_normalized = String::new();
433 let mut cookie_set = false;
434 let headers = request.headers_mut();
435 for cookie in headers.get_all(header::COOKIE) {
436 if let Ok(cookie) = cookie.to_str() {
437 if cookie_set {
438 cookie_normalized.push_str("; ");
439 }
440 cookie_set = true;
441 cookie_normalized.push_str(cookie);
442 }
443 }
444 if cookie_set {
445 if let Ok(cookie_value) = HeaderValue::from_bytes(cookie_normalized.as_bytes()) {
446 headers.insert(header::COOKIE, cookie_value);
447 }
448 }
449 }
450 _ => (),
451 }
452
453 let mut socket_data = SocketData {
455 remote_addr: proxy_protocol_client_address.unwrap_or(client_address),
456 local_addr: proxy_protocol_server_address.unwrap_or(server_address),
457 encrypted,
458 };
459
460 let host_header_option = request.headers().get(header::HOST);
462 if let Some(header_data) = host_header_option {
463 match header_data.to_str() {
464 Ok(host_header) => {
465 let host_header_lower_case = host_header.to_lowercase();
466 let host_header_without_dot = host_header_lower_case
467 .strip_suffix('.')
468 .unwrap_or(host_header_lower_case.as_str());
469 if host_header_without_dot != host_header {
470 let host_header_value = match HeaderValue::from_str(host_header_without_dot) {
471 Ok(host_header_value) => host_header_value,
472 Err(err) => {
473 for logger in global_configuration
474 .as_ref()
475 .map_or(&vec![], |c| &c.observability.log_channels)
476 {
477 logger
478 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
479 .await
480 .unwrap_or_default();
481 }
482 let response = Response::builder()
483 .status(StatusCode::BAD_REQUEST)
484 .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
485 .body(
486 Full::new(Bytes::from(generate_default_error_page(StatusCode::BAD_REQUEST, None)))
487 .map_err(|e| match e {})
488 .boxed(),
489 )
490 .unwrap_or_default();
491
492 let (request_parts, _) = request.into_parts();
493 log_access(
494 global_configuration
495 .as_ref()
496 .map_or(&vec![], |c| &c.observability.log_channels),
497 &request_parts,
498 &socket_data,
499 None,
500 response.status().as_u16(),
501 match response.headers().get(header::CONTENT_LENGTH) {
502 Some(header_value) => match header_value.to_str() {
503 Ok(header_value) => match header_value.parse::<u64>() {
504 Ok(content_length) => Some(content_length),
505 Err(_) => response.body().size_hint().exact(),
506 },
507 Err(_) => response.body().size_hint().exact(),
508 },
509 None => response.body().size_hint().exact(),
510 },
511 global_configuration
512 .as_deref()
513 .and_then(|c| get_value!("log_date_format", c))
514 .and_then(|v| v.as_str()),
515 global_configuration
516 .as_deref()
517 .and_then(|c| get_value!("log_format", c))
518 .and_then(|v| v.as_str()),
519 )
520 .await;
521 let (mut response_parts, response_body) = response.into_parts();
522 if let Some(http3_alt_port) = http3_alt_port {
523 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
524 Some(value) => {
525 let header_value_old = String::from_utf8_lossy(value.as_bytes());
526 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
527
528 if header_value_old != header_value_new {
529 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
530 } else {
531 HeaderValue::from_bytes(header_value_old.as_bytes())
532 }
533 }
534 None => {
535 HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes())
536 }
537 } {
538 response_parts.headers.insert(header::ALT_SVC, header_value);
539 }
540 }
541 response_parts
542 .headers
543 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
544
545 return Ok(Response::from_parts(response_parts, response_body));
546 }
547 };
548
549 request.headers_mut().insert(header::HOST, host_header_value);
550 }
551 }
552 Err(err) => {
553 for logger in global_configuration
554 .as_ref()
555 .map_or(&vec![], |c| &c.observability.log_channels)
556 {
557 logger
558 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
559 .await
560 .unwrap_or_default();
561 }
562 let response = Response::builder()
563 .status(StatusCode::BAD_REQUEST)
564 .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
565 .body(
566 Full::new(Bytes::from(generate_default_error_page(StatusCode::BAD_REQUEST, None)))
567 .map_err(|e| match e {})
568 .boxed(),
569 )
570 .unwrap_or_default();
571 let (request_parts, _) = request.into_parts();
572 log_access(
573 global_configuration
574 .as_ref()
575 .map_or(&vec![], |c| &c.observability.log_channels),
576 &request_parts,
577 &socket_data,
578 None,
579 response.status().as_u16(),
580 match response.headers().get(header::CONTENT_LENGTH) {
581 Some(header_value) => match header_value.to_str() {
582 Ok(header_value) => match header_value.parse::<u64>() {
583 Ok(content_length) => Some(content_length),
584 Err(_) => response.body().size_hint().exact(),
585 },
586 Err(_) => response.body().size_hint().exact(),
587 },
588 None => response.body().size_hint().exact(),
589 },
590 global_configuration
591 .as_deref()
592 .and_then(|c| get_value!("log_date_format", c))
593 .and_then(|v| v.as_str()),
594 global_configuration
595 .as_deref()
596 .and_then(|c| get_value!("log_format", c))
597 .and_then(|v| v.as_str()),
598 )
599 .await;
600 let (mut response_parts, response_body) = response.into_parts();
601 if let Some(http3_alt_port) = http3_alt_port {
602 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
603 Some(value) => {
604 let header_value_old = String::from_utf8_lossy(value.as_bytes());
605 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
606
607 if header_value_old != header_value_new {
608 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
609 } else {
610 HeaderValue::from_bytes(header_value_old.as_bytes())
611 }
612 }
613 None => {
614 HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes())
615 }
616 } {
617 response_parts.headers.insert(header::ALT_SVC, header_value);
618 }
619 }
620 response_parts
621 .headers
622 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
623
624 return Ok(Response::from_parts(response_parts, response_body));
625 }
626 }
627 };
628
629 let hostname_determinant = match request.headers().get(header::HOST) {
630 Some(value) => value.to_str().ok().map(|h| {
631 if let Some((left, right)) = h.rsplit_once(':') {
632 if right.parse::<u16>().is_ok() {
633 left.to_string()
634 } else {
635 h.to_string()
636 }
637 } else {
638 h.to_string()
639 }
640 }),
641 None => None,
642 };
643
644 let (request_parts, request_body) = request.into_parts();
645 let mut log_request_parts = if global_configuration
646 .as_ref()
647 .is_some_and(|c| !c.observability.log_channels.is_empty())
648 {
649 Some(request_parts.clone())
650 } else {
651 None
652 };
653 let request = Request::from_parts(request_parts, request_body);
654
655 let (request_parts, request_body) = request.into_parts();
657 let configuration_option =
658 configurations.find_configuration(&request_parts, hostname_determinant.as_deref(), &socket_data);
659 let mut request = Request::from_parts(request_parts, request_body);
660 let mut configuration = match configuration_option {
661 Ok(Some(configuration)) => configuration,
662 Ok(None) => {
663 for logger in global_configuration
664 .as_ref()
665 .map_or(&vec![], |c| &c.observability.log_channels)
666 {
667 logger
668 .send(LogMessage::new(
669 String::from("Cannot determine server configuration"),
670 true,
671 ))
672 .await
673 .unwrap_or_default()
674 }
675 let response = Response::builder()
676 .status(StatusCode::INTERNAL_SERVER_ERROR)
677 .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
678 .body(
679 Full::new(Bytes::from(generate_default_error_page(
680 StatusCode::INTERNAL_SERVER_ERROR,
681 None,
682 )))
683 .map_err(|e| match e {})
684 .boxed(),
685 )
686 .unwrap_or_default();
687 let (request_parts, _) = request.into_parts();
688 log_access(
689 global_configuration
690 .as_ref()
691 .map_or(&vec![], |c| &c.observability.log_channels),
692 &request_parts,
693 &socket_data,
694 None,
695 response.status().as_u16(),
696 match response.headers().get(header::CONTENT_LENGTH) {
697 Some(header_value) => match header_value.to_str() {
698 Ok(header_value) => match header_value.parse::<u64>() {
699 Ok(content_length) => Some(content_length),
700 Err(_) => response.body().size_hint().exact(),
701 },
702 Err(_) => response.body().size_hint().exact(),
703 },
704 None => response.body().size_hint().exact(),
705 },
706 global_configuration
707 .as_deref()
708 .and_then(|c| get_value!("log_date_format", c))
709 .and_then(|v| v.as_str()),
710 global_configuration
711 .as_deref()
712 .and_then(|c| get_value!("log_format", c))
713 .and_then(|v| v.as_str()),
714 )
715 .await;
716 let (mut response_parts, response_body) = response.into_parts();
717 if let Some(http3_alt_port) = http3_alt_port {
718 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
719 Some(value) => {
720 let header_value_old = String::from_utf8_lossy(value.as_bytes());
721 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
722
723 if header_value_old != header_value_new {
724 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
725 } else {
726 HeaderValue::from_bytes(header_value_old.as_bytes())
727 }
728 }
729 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
730 } {
731 response_parts.headers.insert(header::ALT_SVC, header_value);
732 }
733 }
734 response_parts
735 .headers
736 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
737
738 return Ok(Response::from_parts(response_parts, response_body));
739 }
740 Err(err) => {
741 for logger in global_configuration
742 .as_ref()
743 .map_or(&vec![], |c| &c.observability.log_channels)
744 {
745 logger
746 .send(LogMessage::new(
747 format!("Cannot determine server configuration: {err}"),
748 true,
749 ))
750 .await
751 .unwrap_or_default()
752 }
753 let response = Response::builder()
754 .status(StatusCode::INTERNAL_SERVER_ERROR)
755 .header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
756 .body(
757 Full::new(Bytes::from(generate_default_error_page(
758 StatusCode::INTERNAL_SERVER_ERROR,
759 None,
760 )))
761 .map_err(|e| match e {})
762 .boxed(),
763 )
764 .unwrap_or_default();
765 let (request_parts, _) = request.into_parts();
766 log_access(
767 global_configuration
768 .as_ref()
769 .map_or(&vec![], |c| &c.observability.log_channels),
770 &request_parts,
771 &socket_data,
772 None,
773 response.status().as_u16(),
774 match response.headers().get(header::CONTENT_LENGTH) {
775 Some(header_value) => match header_value.to_str() {
776 Ok(header_value) => match header_value.parse::<u64>() {
777 Ok(content_length) => Some(content_length),
778 Err(_) => response.body().size_hint().exact(),
779 },
780 Err(_) => response.body().size_hint().exact(),
781 },
782 None => response.body().size_hint().exact(),
783 },
784 global_configuration
785 .as_deref()
786 .and_then(|c| get_value!("log_date_format", c))
787 .and_then(|v| v.as_str()),
788 global_configuration
789 .as_deref()
790 .and_then(|c| get_value!("log_format", c))
791 .and_then(|v| v.as_str()),
792 )
793 .await;
794 let (mut response_parts, response_body) = response.into_parts();
795 if let Some(http3_alt_port) = http3_alt_port {
796 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
797 Some(value) => {
798 let header_value_old = String::from_utf8_lossy(value.as_bytes());
799 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
800
801 if header_value_old != header_value_new {
802 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
803 } else {
804 HeaderValue::from_bytes(header_value_old.as_bytes())
805 }
806 }
807 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
808 } {
809 response_parts.headers.insert(header::ALT_SVC, header_value);
810 }
811 }
812 response_parts
813 .headers
814 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
815
816 return Ok(Response::from_parts(response_parts, response_body));
817 }
818 };
819
820 let mut log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
822 let mut log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
823
824 if !configuration.observability.log_channels.is_empty() && log_request_parts.is_none() {
826 let (request_parts, request_body) = request.into_parts();
827 log_request_parts = Some(request_parts.clone());
828 request = Request::from_parts(request_parts, request_body);
829 }
830
831 if !get_value!("disable_url_sanitizer", configuration)
833 .and_then(|v| v.as_bool())
834 .unwrap_or(false)
835 {
836 let url_pathname = request.uri().path();
837 let sanitized_url_pathname = match sanitize_url(
838 url_pathname,
839 get_value!("allow_double_slashes", configuration)
840 .and_then(|v| v.as_bool())
841 .unwrap_or(false),
842 ) {
843 Ok(sanitized_url) => sanitized_url,
844 Err(err) => {
845 for logger in &configuration.observability.log_channels {
846 logger
847 .send(LogMessage::new(format!("URL sanitation error: {err}"), true))
848 .await
849 .unwrap_or_default();
850 }
851 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
852
853 let mut headers_to_add = HeaderMap::new();
855 let mut headers_to_replace = HeaderMap::new();
856 let mut headers_to_remove = Vec::new();
857 let (request_parts, _) = request.into_parts();
858 if let Some(custom_headers) = get_entries!("header", configuration) {
859 for custom_header in custom_headers.inner.iter().rev() {
860 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
861 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
862 if !headers_to_add.contains_key(header_name) {
863 if let Ok(header_name) = HeaderName::from_str(header_name) {
864 if let Ok(header_value) =
865 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
866 {
867 headers_to_add.insert(header_name, header_value);
868 }
869 }
870 }
871 }
872 }
873 }
874 }
875 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
876 for custom_header in custom_headers.inner.iter().rev() {
877 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
878 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
879 if let Ok(header_name) = HeaderName::from_str(header_name) {
880 if let Ok(header_value) =
881 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
882 {
883 headers_to_replace.insert(header_name, header_value);
884 }
885 }
886 }
887 }
888 }
889 }
890 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
891 for custom_header in custom_headers_to_remove.inner.iter().rev() {
892 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
893 if let Ok(header_name) = HeaderName::from_str(header_name) {
894 headers_to_remove.push(header_name);
895 }
896 }
897 }
898 }
899
900 return Ok(
901 finalize_response_and_log(
902 response,
903 http3_alt_port,
904 headers_to_add,
905 headers_to_replace,
906 headers_to_remove,
907 &configuration.observability.log_channels,
908 &log_request_parts,
909 &socket_data,
910 None,
911 log_date_format,
912 log_format,
913 )
914 .await,
915 );
916 }
917 };
918
919 if sanitized_url_pathname != url_pathname {
920 let (mut parts, body) = request.into_parts();
921 let orig_uri = parts.uri.clone();
922 let mut url_parts = parts.uri.into_parts();
923 url_parts.path_and_query = Some(
924 match format!(
925 "{}{}",
926 sanitized_url_pathname,
927 match url_parts.path_and_query {
928 Some(path_and_query) => {
929 match path_and_query.query() {
930 Some(query) => format!("?{query}"),
931 None => String::from(""),
932 }
933 }
934 None => String::from(""),
935 }
936 )
937 .parse()
938 {
939 Ok(path_and_query) => path_and_query,
940 Err(err) => {
941 for logger in &configuration.observability.log_channels {
942 logger
943 .send(LogMessage::new(format!("URL sanitation error: {err}"), true))
944 .await
945 .unwrap_or_default();
946 }
947
948 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
949
950 parts.uri = orig_uri;
951
952 let mut headers_to_add = HeaderMap::new();
954 let mut headers_to_replace = HeaderMap::new();
955 let mut headers_to_remove = Vec::new();
956 if let Some(custom_headers) = get_entries!("header", configuration) {
957 for custom_header in custom_headers.inner.iter().rev() {
958 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
959 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
960 if !headers_to_add.contains_key(header_name) {
961 if let Ok(header_name) = HeaderName::from_str(header_name) {
962 if let Ok(header_value) =
963 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
964 {
965 headers_to_add.insert(header_name, header_value);
966 }
967 }
968 }
969 }
970 }
971 }
972 }
973 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
974 for custom_header in custom_headers.inner.iter().rev() {
975 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
976 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
977 if let Ok(header_name) = HeaderName::from_str(header_name) {
978 if let Ok(header_value) =
979 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
980 {
981 headers_to_replace.insert(header_name, header_value);
982 }
983 }
984 }
985 }
986 }
987 }
988 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
989 for custom_header in custom_headers_to_remove.inner.iter().rev() {
990 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
991 if let Ok(header_name) = HeaderName::from_str(header_name) {
992 headers_to_remove.push(header_name);
993 }
994 }
995 }
996 }
997
998 return Ok(
999 finalize_response_and_log(
1000 response,
1001 http3_alt_port,
1002 headers_to_add,
1003 headers_to_replace,
1004 headers_to_remove,
1005 &configuration.observability.log_channels,
1006 &log_request_parts,
1007 &socket_data,
1008 None,
1009 log_date_format,
1010 log_format,
1011 )
1012 .await,
1013 );
1014 }
1015 },
1016 );
1017 parts.uri = match hyper::Uri::from_parts(url_parts) {
1018 Ok(uri) => uri,
1019 Err(err) => {
1020 for logger in &configuration.observability.log_channels {
1021 logger
1022 .send(LogMessage::new(format!("URL sanitation error: {err}"), true))
1023 .await
1024 .unwrap_or_default();
1025 }
1026
1027 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
1028
1029 parts.uri = orig_uri;
1030
1031 let mut headers_to_add = HeaderMap::new();
1033 let mut headers_to_replace = HeaderMap::new();
1034 let mut headers_to_remove = Vec::new();
1035 if let Some(custom_headers) = get_entries!("header", configuration) {
1036 for custom_header in custom_headers.inner.iter().rev() {
1037 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1038 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1039 if !headers_to_add.contains_key(header_name) {
1040 if let Ok(header_name) = HeaderName::from_str(header_name) {
1041 if let Ok(header_value) =
1042 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
1043 {
1044 headers_to_add.insert(header_name, header_value);
1045 }
1046 }
1047 }
1048 }
1049 }
1050 }
1051 }
1052 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
1053 for custom_header in custom_headers.inner.iter().rev() {
1054 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1055 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1056 if let Ok(header_name) = HeaderName::from_str(header_name) {
1057 if let Ok(header_value) =
1058 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
1059 {
1060 headers_to_replace.insert(header_name, header_value);
1061 }
1062 }
1063 }
1064 }
1065 }
1066 }
1067 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
1068 for custom_header in custom_headers_to_remove.inner.iter().rev() {
1069 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1070 if let Ok(header_name) = HeaderName::from_str(header_name) {
1071 headers_to_remove.push(header_name);
1072 }
1073 }
1074 }
1075 }
1076
1077 return Ok(
1078 finalize_response_and_log(
1079 response,
1080 http3_alt_port,
1081 headers_to_add,
1082 headers_to_replace,
1083 headers_to_remove,
1084 &configuration.observability.log_channels,
1085 &log_request_parts,
1086 &socket_data,
1087 None,
1088 log_date_format,
1089 log_format,
1090 )
1091 .await,
1092 );
1093 }
1094 };
1095 let configuration_option =
1096 configurations.find_configuration(&parts, hostname_determinant.as_deref(), &socket_data);
1097 request = Request::from_parts(parts, body);
1098 match configuration_option {
1099 Ok(Some(new_configuration)) => {
1100 configuration = new_configuration;
1101 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1102 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1103 }
1104 Ok(None) => {}
1105 Err(err) => {
1106 for logger in &configuration.observability.log_channels {
1107 logger
1108 .send(LogMessage::new(
1109 format!("Cannot determine server configuration: {err}"),
1110 true,
1111 ))
1112 .await
1113 .unwrap_or_default();
1114 }
1115 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
1116
1117 let mut headers_to_add = HeaderMap::new();
1119 let mut headers_to_replace = HeaderMap::new();
1120 let mut headers_to_remove = Vec::new();
1121 let (request_parts, _) = request.into_parts();
1122 if let Some(custom_headers) = get_entries!("header", configuration) {
1123 for custom_header in custom_headers.inner.iter().rev() {
1124 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1125 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1126 if !headers_to_add.contains_key(header_name) {
1127 if let Ok(header_name) = HeaderName::from_str(header_name) {
1128 if let Ok(header_value) =
1129 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1130 {
1131 headers_to_add.insert(header_name, header_value);
1132 }
1133 }
1134 }
1135 }
1136 }
1137 }
1138 }
1139 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
1140 for custom_header in custom_headers.inner.iter().rev() {
1141 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1142 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1143 if let Ok(header_name) = HeaderName::from_str(header_name) {
1144 if let Ok(header_value) =
1145 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1146 {
1147 headers_to_replace.insert(header_name, header_value);
1148 }
1149 }
1150 }
1151 }
1152 }
1153 }
1154 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
1155 for custom_header in custom_headers_to_remove.inner.iter().rev() {
1156 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1157 if let Ok(header_name) = HeaderName::from_str(header_name) {
1158 headers_to_remove.push(header_name);
1159 }
1160 }
1161 }
1162 }
1163
1164 return Ok(
1165 finalize_response_and_log(
1166 response,
1167 http3_alt_port,
1168 headers_to_add,
1169 headers_to_replace,
1170 headers_to_remove,
1171 &configuration.observability.log_channels,
1172 &log_request_parts,
1173 &socket_data,
1174 None,
1175 log_date_format,
1176 log_format,
1177 )
1178 .await,
1179 );
1180 }
1181 }
1182 }
1183 }
1184
1185 let mut headers_to_add = HeaderMap::new();
1187 let mut headers_to_replace = HeaderMap::new();
1188 let mut headers_to_remove = Vec::new();
1189 let (request_parts, request_body) = request.into_parts();
1190 if let Some(custom_headers) = get_entries!("header", configuration) {
1191 for custom_header in custom_headers.inner.iter().rev() {
1192 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1193 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1194 if !headers_to_add.contains_key(header_name) {
1195 if let Ok(header_name) = HeaderName::from_str(header_name) {
1196 if let Ok(header_value) =
1197 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1198 {
1199 headers_to_add.insert(header_name, header_value);
1200 }
1201 }
1202 }
1203 }
1204 }
1205 }
1206 }
1207 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
1208 for custom_header in custom_headers.inner.iter().rev() {
1209 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1210 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1211 if let Ok(header_name) = HeaderName::from_str(header_name) {
1212 if let Ok(header_value) =
1213 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1214 {
1215 headers_to_replace.insert(header_name, header_value);
1216 }
1217 }
1218 }
1219 }
1220 }
1221 }
1222 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
1223 for custom_header in custom_headers_to_remove.inner.iter().rev() {
1224 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1225 if let Ok(header_name) = HeaderName::from_str(header_name) {
1226 headers_to_remove.push(header_name);
1227 }
1228 }
1229 }
1230 }
1231 let mut request = Request::from_parts(request_parts, request_body);
1232
1233 if request.uri().path() == "*" {
1234 let response = match request.method() {
1235 &Method::OPTIONS => Response::builder()
1236 .status(StatusCode::NO_CONTENT)
1237 .header(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"))
1238 .body(Empty::new().map_err(|e| match e {}).boxed())
1239 .unwrap_or_default(),
1240 _ => {
1241 let mut header_map = HeaderMap::new();
1242 header_map.insert(header::ALLOW, HeaderValue::from_static("GET, POST, HEAD, OPTIONS"));
1243 generate_error_response(StatusCode::BAD_REQUEST, &configuration, &Some(header_map)).await
1244 }
1245 };
1246 return Ok(
1247 finalize_response_and_log(
1248 response,
1249 http3_alt_port,
1250 headers_to_add,
1251 headers_to_replace,
1252 headers_to_remove,
1253 &configuration.observability.log_channels,
1254 &log_request_parts,
1255 &socket_data,
1256 None,
1257 log_date_format,
1258 log_format,
1259 )
1260 .await,
1261 );
1262 }
1263
1264 let acme_http_01_resolvers_inner = acme_http_01_resolvers.read().await;
1266 if !acme_http_01_resolvers_inner.is_empty() {
1267 if let Some(challenge_token) = request.uri().path().strip_prefix("/.well-known/acme-challenge/") {
1268 for acme_http01_resolver in &*acme_http_01_resolvers_inner {
1269 if let Some(http01_acme_data) = &*acme_http01_resolver.read().await {
1270 let acme_response = http01_acme_data.1.clone();
1271 if challenge_token == http01_acme_data.0 {
1272 let response = Response::builder()
1273 .status(StatusCode::OK)
1274 .header(
1275 header::CONTENT_TYPE,
1276 HeaderValue::from_static("application/octet-stream"),
1277 )
1278 .body(Full::new(Bytes::from(acme_response)).map_err(|e| match e {}).boxed())
1279 .unwrap_or_default();
1280
1281 return Ok(
1282 finalize_response_and_log(
1283 response,
1284 http3_alt_port,
1285 headers_to_add,
1286 headers_to_replace,
1287 headers_to_remove,
1288 &configuration.observability.log_channels,
1289 &log_request_parts,
1290 &socket_data,
1291 None,
1292 log_date_format,
1293 log_format,
1294 )
1295 .await,
1296 );
1297 }
1298 }
1299 }
1300 }
1301 };
1302 drop(acme_http_01_resolvers_inner);
1303
1304 let mut error_logger = if !configuration.observability.log_channels.is_empty() {
1305 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1306 } else {
1307 ErrorLogger::without_logger()
1308 };
1309 let mut metrics_enabled = !configuration.observability.metric_channels.is_empty();
1310 let mut metrics_sender = if metrics_enabled {
1311 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1312 } else {
1313 MetricsMultiSender::without_sender()
1314 };
1315 let mut traces_enabled = !configuration.observability.trace_channels.is_empty();
1316 let mut traces_senders = if traces_enabled {
1317 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1318 for channel in &configuration.observability.trace_channels {
1319 channel.0.send(()).await.unwrap_or_default();
1320 if let Ok(channel2) = channel.1.recv().await {
1321 traces_senders.push(channel2);
1322 }
1323 }
1324 traces_senders
1325 } else {
1326 vec![]
1327 };
1328
1329 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1331 for module in &configuration.modules {
1332 module_handlers.push(module.get_module_handlers());
1333 }
1334
1335 request.extensions_mut().insert(RequestData {
1337 auth_user: None,
1338 original_url: None,
1339 error_status_code: None,
1340 });
1341 let mut executed_handlers = Vec::new();
1342 let (request_parts, request_body) = request.into_parts();
1343 let request_parts_cloned = if configurations.inner.iter().rev().any(|c| {
1344 c.filters.is_host
1345 && c.filters.hostname == configuration.filters.hostname
1346 && c.filters.ip == configuration.filters.ip
1347 && c.filters.port == configuration.filters.port
1348 && (c.filters.condition.is_none() || c.filters.condition == configuration.filters.condition)
1349 && c.filters.error_handler_status.is_some()
1350 }) {
1351 let mut request_parts_cloned = request_parts.clone();
1352 request_parts_cloned
1353 .headers
1354 .insert(header::CONTENT_LENGTH, HeaderValue::from_static("0"));
1355 Some(request_parts_cloned)
1356 } else {
1357 None
1359 };
1360 let mut request = Request::from_parts(request_parts, request_body);
1361 let mut latest_auth_data = None;
1362 let mut is_error_handler = false;
1363 let mut handlers_iter: Box<dyn Iterator<Item = Box<dyn ModuleHandlers>>> = Box::new(module_handlers.into_iter());
1364 while let Some(mut handlers) = handlers_iter.next() {
1365 if metrics_enabled {
1366 handlers
1367 .metric_data_before_handler(&request, &socket_data, &metrics_sender)
1368 .await;
1369 }
1370
1371 if traces_enabled {
1372 for trace_sender in &traces_senders {
1373 trace_sender
1374 .send(TraceSignal::StartSpan(format!(
1375 "{}::request_handler",
1376 handlers.get_name()
1377 )))
1378 .await
1379 .unwrap_or_default();
1380 }
1381 }
1382
1383 let (response_result, is_timeout) = if let Some(timeout_duration) = &timeout_duration {
1384 let elapsed = timeout_instant.elapsed();
1385 if let Some(timeout_cur_duration) = timeout_duration.checked_sub(elapsed) {
1386 match timeout(
1387 timeout_cur_duration,
1388 handlers.request_handler(request, &configuration, &socket_data, &error_logger),
1389 )
1390 .await
1391 {
1392 Ok(result) => (result, false),
1393 Err(_) => (Err(anyhow::anyhow!("The client or server has timed out").into()), true),
1394 }
1395 } else {
1396 (Err(anyhow::anyhow!("The client or server has timed out").into()), true)
1397 }
1398 } else {
1399 (
1400 handlers
1401 .request_handler(request, &configuration, &socket_data, &error_logger)
1402 .await,
1403 false,
1404 )
1405 };
1406
1407 if traces_enabled {
1408 for trace_sender in &traces_senders {
1409 trace_sender
1410 .send(TraceSignal::EndSpan(
1411 format!("{}::request_handler", handlers.get_name()),
1412 response_result.as_ref().err().map(|e| e.to_string()),
1413 ))
1414 .await
1415 .unwrap_or_default();
1416 }
1417 }
1418
1419 executed_handlers.push(handlers);
1420
1421 if is_timeout {
1422 if metrics_enabled {
1423 while let Some(mut executed_handler) = executed_handlers.pop() {
1424 executed_handler.metric_data_after_handler(&metrics_sender).await;
1425 }
1426 }
1427 Err(anyhow::anyhow!("The client or server has timed out"))?;
1428 }
1429
1430 match response_result {
1431 Ok(response) => {
1432 let status = response.response_status;
1433 let headers = response.response_headers;
1434 let new_remote_address = response.new_remote_address;
1435 let request_option = response.request;
1436 let response = response.response;
1437 let request_extensions = request_option
1438 .as_ref()
1439 .and_then(|r| r.extensions().get::<RequestData>());
1440 if let Some(request_extensions) = request_extensions {
1441 latest_auth_data = request_extensions.auth_user.clone();
1442 }
1443 if let Some(new_remote_address) = new_remote_address {
1444 socket_data.remote_addr = new_remote_address;
1445 };
1446
1447 match response {
1448 Some(response) => {
1449 let (mut response_parts, response_body) = response.into_parts();
1450 add_custom_headers(
1451 &mut response_parts,
1452 &headers_to_add,
1453 &headers_to_replace,
1454 &headers_to_remove,
1455 );
1456 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1457 add_server_header(&mut response_parts);
1458
1459 let response = Response::from_parts(response_parts, response_body);
1460
1461 match execute_response_modifying_handlers(
1462 response,
1463 executed_handlers,
1464 &configuration,
1465 http3_alt_port,
1466 headers_to_add,
1467 headers_to_replace,
1468 headers_to_remove,
1469 &configuration.observability.log_channels,
1470 &log_request_parts,
1471 &socket_data,
1472 latest_auth_data.as_deref(),
1473 log_date_format,
1474 log_format,
1475 metrics_sender,
1476 metrics_enabled,
1477 traces_senders,
1478 traces_enabled,
1479 timeout_instant,
1480 timeout_duration,
1481 )
1482 .await?
1483 {
1484 Ok(response) => {
1485 if let Some(request_parts) = log_request_parts {
1486 log_access(
1487 &configuration.observability.log_channels,
1488 &request_parts,
1489 &socket_data,
1490 latest_auth_data.as_deref(),
1491 response.status().as_u16(),
1492 extract_content_length(&response),
1493 log_date_format,
1494 log_format,
1495 )
1496 .await;
1497 }
1498 return Ok(response);
1499 }
1500 Err(error_response) => {
1501 return Ok(error_response);
1502 }
1503 }
1504 }
1505 None => match status {
1506 Some(status) => {
1507 if !is_error_handler {
1508 if let Some(error_configuration) =
1509 configurations.find_error_configuration(&configuration.filters, status.as_u16())
1510 {
1511 let request_option = if let Some(request) = request_option {
1512 Some(request)
1513 } else {
1514 request_parts_cloned.clone().map(|request_parts_cloned| {
1515 Request::from_parts(request_parts_cloned, Empty::new().map_err(|e| match e {}).boxed())
1516 })
1517 };
1518 if let Some(request_cloned) = request_option {
1519 configuration = error_configuration;
1520 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1521 for module in &configuration.modules {
1522 module_handlers.push(module.get_module_handlers());
1523 }
1524 handlers_iter = Box::new(module_handlers.into_iter());
1525 if metrics_enabled {
1526 while let Some(mut executed_handler) = executed_handlers.pop() {
1527 executed_handler.metric_data_after_handler(&metrics_sender).await;
1528 }
1529 }
1530 executed_handlers = Vec::new();
1531 request = request_cloned;
1532 if let Some(request_data) = request.extensions_mut().get_mut::<RequestData>() {
1533 request_data.error_status_code = Some(status);
1534 }
1535 is_error_handler = true;
1536 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1537 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1538 error_logger = if !configuration.observability.log_channels.is_empty() {
1539 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1540 } else {
1541 ErrorLogger::without_logger()
1542 };
1543 metrics_enabled = !configuration.observability.metric_channels.is_empty();
1544 metrics_sender = if metrics_enabled {
1545 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1546 } else {
1547 MetricsMultiSender::without_sender()
1548 };
1549 traces_enabled = !configuration.observability.trace_channels.is_empty();
1550 traces_senders = if traces_enabled {
1551 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1552 for channel in &configuration.observability.trace_channels {
1553 channel.0.send(()).await.unwrap_or_default();
1554 if let Ok(channel2) = channel.1.recv().await {
1555 traces_senders.push(channel2);
1556 }
1557 }
1558 traces_senders
1559 } else {
1560 vec![]
1561 };
1562 continue;
1563 }
1564 }
1565 }
1566 let response = generate_error_response(status, &configuration, &headers).await;
1567
1568 let (mut response_parts, response_body) = response.into_parts();
1569 add_custom_headers(
1570 &mut response_parts,
1571 &headers_to_add,
1572 &headers_to_replace,
1573 &headers_to_remove,
1574 );
1575 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1576 add_server_header(&mut response_parts);
1577
1578 let response = Response::from_parts(response_parts, response_body);
1579
1580 match execute_response_modifying_handlers(
1581 response,
1582 executed_handlers,
1583 &configuration,
1584 http3_alt_port,
1585 headers_to_add,
1586 headers_to_replace,
1587 headers_to_remove,
1588 &configuration.observability.log_channels,
1589 &log_request_parts,
1590 &socket_data,
1591 latest_auth_data.as_deref(),
1592 log_date_format,
1593 log_format,
1594 metrics_sender,
1595 metrics_enabled,
1596 traces_senders,
1597 traces_enabled,
1598 timeout_instant,
1599 timeout_duration,
1600 )
1601 .await?
1602 {
1603 Ok(response) => {
1604 if let Some(request_parts) = log_request_parts {
1605 log_access(
1606 &configuration.observability.log_channels,
1607 &request_parts,
1608 &socket_data,
1609 latest_auth_data.as_deref(),
1610 response.status().as_u16(),
1611 extract_content_length(&response),
1612 log_date_format,
1613 log_format,
1614 )
1615 .await;
1616 }
1617 return Ok(response);
1618 }
1619 Err(error_response) => {
1620 return Ok(error_response);
1621 }
1622 }
1623 }
1624 None => match request_option {
1625 Some(request_obtained) => {
1626 request = request_obtained;
1627 continue;
1628 }
1629 None => {
1630 break;
1631 }
1632 },
1633 },
1634 }
1635 }
1636 Err(err) => {
1637 let response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, &configuration, &None).await;
1638
1639 for logger in &configuration.observability.log_channels {
1640 logger
1641 .send(LogMessage::new(
1642 format!("Unexpected error while serving a request: {err}"),
1643 true,
1644 ))
1645 .await
1646 .unwrap_or_default();
1647 }
1648
1649 let (mut response_parts, response_body) = response.into_parts();
1650 add_custom_headers(
1651 &mut response_parts,
1652 &headers_to_add,
1653 &headers_to_replace,
1654 &headers_to_remove,
1655 );
1656 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1657 add_server_header(&mut response_parts);
1658
1659 let response = Response::from_parts(response_parts, response_body);
1660
1661 match execute_response_modifying_handlers(
1662 response,
1663 executed_handlers,
1664 &configuration,
1665 http3_alt_port,
1666 headers_to_add,
1667 headers_to_replace,
1668 headers_to_remove,
1669 &configuration.observability.log_channels,
1670 &log_request_parts,
1671 &socket_data,
1672 latest_auth_data.as_deref(),
1673 log_date_format,
1674 log_format,
1675 metrics_sender,
1676 metrics_enabled,
1677 traces_senders,
1678 traces_enabled,
1679 timeout_instant,
1680 timeout_duration,
1681 )
1682 .await?
1683 {
1684 Ok(response) => {
1685 if let Some(request_parts) = log_request_parts {
1686 log_access(
1687 &configuration.observability.log_channels,
1688 &request_parts,
1689 &socket_data,
1690 latest_auth_data.as_deref(),
1691 response.status().as_u16(),
1692 extract_content_length(&response),
1693 log_date_format,
1694 log_format,
1695 )
1696 .await;
1697 }
1698 return Ok(response);
1699 }
1700 Err(error_response) => {
1701 return Ok(error_response);
1702 }
1703 }
1704 }
1705 }
1706 }
1707
1708 let response = generate_error_response(StatusCode::NOT_FOUND, &configuration, &None).await;
1709
1710 let (mut response_parts, response_body) = response.into_parts();
1711 add_custom_headers(
1712 &mut response_parts,
1713 &headers_to_add,
1714 &headers_to_replace,
1715 &headers_to_remove,
1716 );
1717 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1718 add_server_header(&mut response_parts);
1719
1720 let response = Response::from_parts(response_parts, response_body);
1721
1722 match execute_response_modifying_handlers(
1723 response,
1724 executed_handlers,
1725 &configuration,
1726 http3_alt_port,
1727 headers_to_add,
1728 headers_to_replace,
1729 headers_to_remove,
1730 &configuration.observability.log_channels,
1731 &log_request_parts,
1732 &socket_data,
1733 latest_auth_data.as_deref(),
1734 log_date_format,
1735 log_format,
1736 metrics_sender,
1737 metrics_enabled,
1738 traces_senders,
1739 traces_enabled,
1740 timeout_instant,
1741 timeout_duration,
1742 )
1743 .await?
1744 {
1745 Ok(response) => {
1746 if let Some(request_parts) = log_request_parts {
1747 log_access(
1748 &configuration.observability.log_channels,
1749 &request_parts,
1750 &socket_data,
1751 latest_auth_data.as_deref(),
1752 response.status().as_u16(),
1753 extract_content_length(&response),
1754 log_date_format,
1755 log_format,
1756 )
1757 .await;
1758 }
1759 Ok(response)
1760 }
1761 Err(error_response) => Ok(error_response),
1762 }
1763}