1use std::convert::Infallible;
2use std::net::SocketAddr;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_channel::Sender;
8use chrono::{DateTime, Local};
9use ferron_common::logging::{ErrorLogger, LogMessage};
10use ferron_common::observability::{MetricsMultiSender, TraceSignal};
11use futures_util::stream::TryStreamExt;
12use http_body_util::combinators::BoxBody;
13use http_body_util::{BodyExt, Empty, Full, StreamBody};
14use hyper::body::{Body, Bytes, Frame};
15use hyper::header::{HeaderName, HeaderValue};
16use hyper::{header, HeaderMap, Method, Request, Response, StatusCode};
17#[cfg(feature = "runtime-tokio")]
18use tokio::io::BufReader;
19#[cfg(feature = "runtime-tokio")]
20use tokio_util::io::ReaderStream;
21
22use crate::config::{ServerConfiguration, ServerConfigurations};
23use crate::get_value;
24use crate::runtime::timeout;
25#[cfg(feature = "runtime-monoio")]
26use crate::util::MonoioFileStream;
27use crate::util::{
28 generate_default_error_page, replace_header_placeholders, replace_log_placeholders, sanitize_url, SERVER_SOFTWARE,
29};
30
31use ferron_common::modules::{ModuleHandlers, RequestData, SocketData};
32use ferron_common::{get_entries, get_entry};
33
34async fn generate_error_response(
36 status_code: StatusCode,
37 config: &ServerConfiguration,
38 headers: &Option<HeaderMap>,
39) -> Response<BoxBody<Bytes, std::io::Error>> {
40 let bare_body = generate_default_error_page(
41 status_code,
42 get_value!("server_administrator_email", config).and_then(|v| v.as_str()),
43 );
44 let mut content_length: Option<u64> = bare_body.len().try_into().ok();
45 let mut response_body = Full::new(Bytes::from(bare_body)).map_err(|e| match e {}).boxed();
46
47 if let Some(error_pages) = get_entries!("error_page", config) {
48 for error_page in &error_pages.inner {
49 if let Some(page_status_code) = error_page.values.first().and_then(|v| v.as_i128()) {
50 let page_status_code = match StatusCode::from_u16(match page_status_code.try_into() {
51 Ok(status_code) => status_code,
52 Err(_) => continue,
53 }) {
54 Ok(status_code) => status_code,
55 Err(_) => continue,
56 };
57 if status_code != page_status_code {
58 continue;
59 }
60 if let Some(page_path) = error_page.values.get(1).and_then(|v| v.as_str()) {
61 #[cfg(feature = "runtime-monoio")]
62 let file = monoio::fs::File::open(page_path).await;
63 #[cfg(feature = "runtime-tokio")]
64 let file = tokio::fs::File::open(page_path).await;
65
66 let file = match file {
67 Ok(file) => file,
68 Err(_) => continue,
69 };
70
71 #[cfg(any(feature = "runtime-tokio", all(feature = "runtime-monoio", unix)))]
73 let metadata = file.metadata().await;
74 #[cfg(all(feature = "runtime-monoio", windows))]
75 let metadata = {
76 let page_path = page_path.to_owned();
77 monoio::spawn_blocking(move || std::fs::metadata(page_path))
78 .await
79 .unwrap_or(Err(std::io::Error::other(
80 "Can't spawn a blocking task to obtain the file metadata",
81 )))
82 };
83
84 content_length = match metadata {
85 Ok(metadata) => Some(metadata.len()),
86 Err(_) => None,
87 };
88
89 #[cfg(feature = "runtime-monoio")]
90 let file_stream = MonoioFileStream::new(file, None, content_length);
91 #[cfg(feature = "runtime-tokio")]
92 let file_stream = ReaderStream::new(BufReader::with_capacity(12800, file));
93
94 let stream_body = StreamBody::new(file_stream.map_ok(Frame::data));
95 let boxed_body = stream_body.boxed();
96
97 response_body = boxed_body;
98
99 break;
100 }
101 }
102 }
103 }
104
105 let mut response_builder = Response::builder().status(status_code);
106
107 if let Some(headers) = headers {
108 let headers_iter = headers.iter();
109 for (name, value) in headers_iter {
110 if name != header::CONTENT_TYPE && name != header::CONTENT_LENGTH {
111 response_builder = response_builder.header(name, value);
112 }
113 }
114 }
115
116 if let Some(content_length) = content_length {
117 response_builder = response_builder.header(header::CONTENT_LENGTH, content_length);
118 }
119 response_builder = response_builder.header(header::CONTENT_TYPE, "text/html");
120
121 response_builder.body(response_body).unwrap_or_default()
122}
123
124#[allow(clippy::too_many_arguments)]
126async fn log_access(
127 loggers: &Vec<Sender<LogMessage>>,
128 request_parts: &hyper::http::request::Parts,
129 socket_data: &SocketData,
130 auth_user: Option<&str>,
131 status_code: u16,
132 content_length: Option<u64>,
133 date_format: Option<&str>,
134 log_format: Option<&str>,
135) {
136 let now: DateTime<Local> = Local::now();
137 let formatted_time = now.format(date_format.unwrap_or("%d/%b/%Y:%H:%M:%S %z")).to_string();
138 let log_message_string = replace_log_placeholders(
139 log_format.unwrap_or(
140 "{client_ip} - {auth_user} [{timestamp}] \"{method} {path_and_query} {version}\" \
141 {status_code} {content_length} \"{header:Referer}\" \"{header:User-Agent}\"",
142 ),
143 request_parts,
144 socket_data,
145 auth_user,
146 &formatted_time,
147 status_code,
148 content_length,
149 );
150 for logger in loggers {
151 logger
152 .send(LogMessage::new(log_message_string.clone(), false))
153 .await
154 .unwrap_or_default();
155 }
156}
157
158fn add_custom_headers(
160 response_parts: &mut hyper::http::response::Parts,
161 headers_to_add: &HeaderMap,
162 headers_to_replace: &HeaderMap,
163 headers_to_remove: &[HeaderName],
164) {
165 for (header_name, header_value) in headers_to_add {
166 if !response_parts.headers.contains_key(header_name) {
167 response_parts.headers.insert(header_name, header_value.to_owned());
168 }
169 }
170
171 for (header_name, header_value) in headers_to_replace {
172 response_parts.headers.insert(header_name, header_value.to_owned());
173 }
174
175 for header_to_remove in headers_to_remove.iter().rev() {
176 if response_parts.headers.contains_key(header_to_remove) {
177 while response_parts.headers.remove(header_to_remove).is_some() {}
178 }
179 }
180}
181
182fn add_http3_alt_svc_header(response_parts: &mut hyper::http::response::Parts, http3_alt_port: Option<u16>) {
184 if let Some(http3_alt_port) = http3_alt_port {
185 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
186 Some(value) => {
187 let header_value_old = String::from_utf8_lossy(value.as_bytes());
188 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
189
190 if header_value_old != header_value_new {
191 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
192 } else {
193 HeaderValue::from_bytes(header_value_old.as_bytes())
194 }
195 }
196 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
197 } {
198 response_parts.headers.insert(header::ALT_SVC, header_value);
199 }
200 }
201}
202
203fn add_server_header(response_parts: &mut hyper::http::response::Parts) {
205 response_parts
206 .headers
207 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
208}
209
210fn extract_content_length(response: &Response<BoxBody<Bytes, std::io::Error>>) -> Option<u64> {
212 match response.headers().get(header::CONTENT_LENGTH) {
213 Some(header_value) => match header_value.to_str() {
214 Ok(header_value) => match header_value.parse::<u64>() {
215 Ok(content_length) => Some(content_length),
216 Err(_) => response.body().size_hint().exact(),
217 },
218 Err(_) => response.body().size_hint().exact(),
219 },
220 None => response.body().size_hint().exact(),
221 }
222}
223
224#[allow(clippy::too_many_arguments)]
226async fn finalize_response_and_log(
227 response: Response<BoxBody<Bytes, std::io::Error>>,
228 http3_alt_port: Option<u16>,
229 headers_to_add: HeaderMap,
230 headers_to_replace: HeaderMap,
231 headers_to_remove: Vec<HeaderName>,
232 loggers: &Vec<Sender<LogMessage>>,
233 request_parts: &Option<hyper::http::request::Parts>,
234 socket_data: &SocketData,
235 latest_auth_data: Option<&str>,
236 date_format: Option<&str>,
237 log_format: Option<&str>,
238) -> Response<BoxBody<Bytes, std::io::Error>> {
239 let (mut response_parts, response_body) = response.into_parts();
240
241 add_custom_headers(
242 &mut response_parts,
243 &headers_to_add,
244 &headers_to_replace,
245 &headers_to_remove,
246 );
247 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
248 add_server_header(&mut response_parts);
249
250 let response = Response::from_parts(response_parts, response_body);
251
252 if let Some(request_parts) = request_parts {
253 if !loggers.is_empty() {
254 log_access(
255 loggers,
256 request_parts,
257 socket_data,
258 latest_auth_data,
259 response.status().as_u16(),
260 extract_content_length(&response),
261 date_format,
262 log_format,
263 )
264 .await;
265 }
266 }
267
268 response
269}
270
271#[allow(clippy::too_many_arguments)]
273async fn execute_response_modifying_handlers(
274 mut response: Response<BoxBody<Bytes, std::io::Error>>,
275 mut executed_handlers: Vec<Box<dyn ModuleHandlers>>,
276 configuration: &ServerConfiguration,
277 http3_alt_port: Option<u16>,
278 headers_to_add: HeaderMap,
279 headers_to_replace: HeaderMap,
280 headers_to_remove: Vec<HeaderName>,
281 loggers: &Vec<Sender<LogMessage>>,
282 request_parts: &Option<hyper::http::request::Parts>,
283 socket_data: &SocketData,
284 latest_auth_data: Option<&str>,
285 date_format: Option<&str>,
286 log_format: Option<&str>,
287 metrics_sender: MetricsMultiSender,
288 metrics_enabled: bool,
289 traces_senders: Vec<Sender<TraceSignal>>,
290 traces_enabled: bool,
291) -> Result<Response<BoxBody<Bytes, std::io::Error>>, Response<BoxBody<Bytes, std::io::Error>>> {
292 while let Some(mut executed_handler) = executed_handlers.pop() {
293 if traces_enabled {
294 for trace_sender in &traces_senders {
295 trace_sender
296 .send(TraceSignal::StartSpan(format!(
297 "{}::response_modifying_handler",
298 executed_handler.get_name()
299 )))
300 .await
301 .unwrap_or_default();
302 }
303 }
304 let response_status = executed_handler.response_modifying_handler(response).await;
305 if traces_enabled {
306 for trace_sender in &traces_senders {
307 trace_sender
308 .send(TraceSignal::EndSpan(
309 format!("{}::response_modifying_handler", executed_handler.get_name()),
310 response_status.as_ref().err().map(|e| e.to_string()),
311 ))
312 .await
313 .unwrap_or_default();
314 }
315 }
316 response = match response_status {
317 Ok(response) => response,
318 Err(err) => {
319 for logger in loggers {
320 logger
321 .send(LogMessage::new(
322 format!("Unexpected error while serving a request: {err}"),
323 true,
324 ))
325 .await
326 .unwrap_or_default();
327 }
328
329 let error_response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, configuration, &None).await;
330
331 let final_response = finalize_response_and_log(
332 error_response,
333 http3_alt_port,
334 headers_to_add,
335 headers_to_replace,
336 headers_to_remove,
337 loggers,
338 request_parts,
339 socket_data,
340 latest_auth_data,
341 date_format,
342 log_format,
343 )
344 .await;
345
346 if metrics_enabled {
347 while let Some(mut executed_handler) = executed_handlers.pop() {
348 executed_handler.metric_data_after_handler(&metrics_sender).await;
349 }
350 }
351
352 return Err(final_response);
353 }
354 };
355 if metrics_enabled {
356 executed_handler.metric_data_after_handler(&metrics_sender).await;
357 }
358 }
359 Ok(response)
360}
361
362#[allow(clippy::too_many_arguments)]
364async fn request_handler_wrapped(
365 mut request: Request<BoxBody<Bytes, std::io::Error>>,
366 client_address: SocketAddr,
367 server_address: SocketAddr,
368 encrypted: bool,
369 configurations: Arc<ServerConfigurations>,
370 http3_alt_port: Option<u16>,
371 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
372 proxy_protocol_client_address: Option<SocketAddr>,
373 proxy_protocol_server_address: Option<SocketAddr>,
374) -> Result<Response<BoxBody<Bytes, std::io::Error>>, Infallible> {
375 let global_configuration = configurations.find_global_configuration();
377
378 match request.version() {
380 hyper::Version::HTTP_2 | hyper::Version::HTTP_3 => {
381 if let Some(authority) = request.uri().authority() {
383 let authority = authority.to_owned();
384 let headers = request.headers_mut();
385 if !headers.contains_key(header::HOST) {
386 if let Ok(authority_value) = HeaderValue::from_bytes(authority.as_str().as_bytes()) {
387 headers.append(header::HOST, authority_value);
388 }
389 }
390 }
391
392 let mut cookie_normalized = String::new();
394 let mut cookie_set = false;
395 let headers = request.headers_mut();
396 for cookie in headers.get_all(header::COOKIE) {
397 if let Ok(cookie) = cookie.to_str() {
398 if cookie_set {
399 cookie_normalized.push_str("; ");
400 }
401 cookie_set = true;
402 cookie_normalized.push_str(cookie);
403 }
404 }
405 if cookie_set {
406 if let Ok(cookie_value) = HeaderValue::from_bytes(cookie_normalized.as_bytes()) {
407 headers.insert(header::COOKIE, cookie_value);
408 }
409 }
410 }
411 _ => (),
412 }
413
414 let mut socket_data = SocketData {
416 remote_addr: proxy_protocol_client_address.unwrap_or(client_address),
417 local_addr: proxy_protocol_server_address.unwrap_or(server_address),
418 encrypted,
419 };
420
421 let host_header_option = request.headers().get(header::HOST);
423 if let Some(header_data) = host_header_option {
424 match header_data.to_str() {
425 Ok(host_header) => {
426 let host_header_lower_case = host_header.to_lowercase();
427 let host_header_without_dot = host_header_lower_case
428 .strip_suffix('.')
429 .unwrap_or(host_header_lower_case.as_str());
430 if host_header_without_dot != host_header {
431 let host_header_value = match HeaderValue::from_str(host_header_without_dot) {
432 Ok(host_header_value) => host_header_value,
433 Err(err) => {
434 for logger in global_configuration
435 .as_ref()
436 .map_or(&vec![], |c| &c.observability.log_channels)
437 {
438 logger
439 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
440 .await
441 .unwrap_or_default();
442 }
443 let response = Response::builder()
444 .status(StatusCode::BAD_REQUEST)
445 .header(header::CONTENT_TYPE, "text/html")
446 .body(
447 Full::new(Bytes::from(generate_default_error_page(StatusCode::BAD_REQUEST, None)))
448 .map_err(|e| match e {})
449 .boxed(),
450 )
451 .unwrap_or_default();
452
453 let (request_parts, _) = request.into_parts();
454 log_access(
455 global_configuration
456 .as_ref()
457 .map_or(&vec![], |c| &c.observability.log_channels),
458 &request_parts,
459 &socket_data,
460 None,
461 response.status().as_u16(),
462 match response.headers().get(header::CONTENT_LENGTH) {
463 Some(header_value) => match header_value.to_str() {
464 Ok(header_value) => match header_value.parse::<u64>() {
465 Ok(content_length) => Some(content_length),
466 Err(_) => response.body().size_hint().exact(),
467 },
468 Err(_) => response.body().size_hint().exact(),
469 },
470 None => response.body().size_hint().exact(),
471 },
472 global_configuration
473 .as_deref()
474 .and_then(|c| get_value!("log_date_format", c))
475 .and_then(|v| v.as_str()),
476 global_configuration
477 .as_deref()
478 .and_then(|c| get_value!("log_format", c))
479 .and_then(|v| v.as_str()),
480 )
481 .await;
482 let (mut response_parts, response_body) = response.into_parts();
483 if let Some(http3_alt_port) = http3_alt_port {
484 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
485 Some(value) => {
486 let header_value_old = String::from_utf8_lossy(value.as_bytes());
487 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
488
489 if header_value_old != header_value_new {
490 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
491 } else {
492 HeaderValue::from_bytes(header_value_old.as_bytes())
493 }
494 }
495 None => {
496 HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes())
497 }
498 } {
499 response_parts.headers.insert(header::ALT_SVC, header_value);
500 }
501 }
502 response_parts
503 .headers
504 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
505
506 return Ok(Response::from_parts(response_parts, response_body));
507 }
508 };
509
510 request.headers_mut().insert(header::HOST, host_header_value);
511 }
512 }
513 Err(err) => {
514 for logger in global_configuration
515 .as_ref()
516 .map_or(&vec![], |c| &c.observability.log_channels)
517 {
518 logger
519 .send(LogMessage::new(format!("Host header sanitation error: {err}"), true))
520 .await
521 .unwrap_or_default();
522 }
523 let response = Response::builder()
524 .status(StatusCode::BAD_REQUEST)
525 .header(header::CONTENT_TYPE, "text/html")
526 .body(
527 Full::new(Bytes::from(generate_default_error_page(StatusCode::BAD_REQUEST, None)))
528 .map_err(|e| match e {})
529 .boxed(),
530 )
531 .unwrap_or_default();
532 let (request_parts, _) = request.into_parts();
533 log_access(
534 global_configuration
535 .as_ref()
536 .map_or(&vec![], |c| &c.observability.log_channels),
537 &request_parts,
538 &socket_data,
539 None,
540 response.status().as_u16(),
541 match response.headers().get(header::CONTENT_LENGTH) {
542 Some(header_value) => match header_value.to_str() {
543 Ok(header_value) => match header_value.parse::<u64>() {
544 Ok(content_length) => Some(content_length),
545 Err(_) => response.body().size_hint().exact(),
546 },
547 Err(_) => response.body().size_hint().exact(),
548 },
549 None => response.body().size_hint().exact(),
550 },
551 global_configuration
552 .as_deref()
553 .and_then(|c| get_value!("log_date_format", c))
554 .and_then(|v| v.as_str()),
555 global_configuration
556 .as_deref()
557 .and_then(|c| get_value!("log_format", c))
558 .and_then(|v| v.as_str()),
559 )
560 .await;
561 let (mut response_parts, response_body) = response.into_parts();
562 if let Some(http3_alt_port) = http3_alt_port {
563 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
564 Some(value) => {
565 let header_value_old = String::from_utf8_lossy(value.as_bytes());
566 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
567
568 if header_value_old != header_value_new {
569 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
570 } else {
571 HeaderValue::from_bytes(header_value_old.as_bytes())
572 }
573 }
574 None => {
575 HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes())
576 }
577 } {
578 response_parts.headers.insert(header::ALT_SVC, header_value);
579 }
580 }
581 response_parts
582 .headers
583 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
584
585 return Ok(Response::from_parts(response_parts, response_body));
586 }
587 }
588 };
589
590 let hostname_determinant = match request.headers().get(header::HOST) {
591 Some(value) => value.to_str().ok().map(|h| {
592 if let Some((left, right)) = h.rsplit_once(':') {
593 if right.parse::<u16>().is_ok() {
594 left.to_string()
595 } else {
596 h.to_string()
597 }
598 } else {
599 h.to_string()
600 }
601 }),
602 None => None,
603 };
604
605 let (request_parts, request_body) = request.into_parts();
606 let mut log_request_parts = if global_configuration
607 .as_ref()
608 .is_some_and(|c| !c.observability.log_channels.is_empty())
609 {
610 Some(request_parts.clone())
611 } else {
612 None
613 };
614 let request = Request::from_parts(request_parts, request_body);
615
616 let (request_parts, request_body) = request.into_parts();
618 let configuration_option =
619 configurations.find_configuration(&request_parts, hostname_determinant.as_deref(), &socket_data);
620 let mut request = Request::from_parts(request_parts, request_body);
621 let mut configuration = match configuration_option {
622 Ok(Some(configuration)) => configuration,
623 Ok(None) => {
624 for logger in global_configuration
625 .as_ref()
626 .map_or(&vec![], |c| &c.observability.log_channels)
627 {
628 logger
629 .send(LogMessage::new(
630 String::from("Cannot determine server configuration"),
631 true,
632 ))
633 .await
634 .unwrap_or_default()
635 }
636 let response = Response::builder()
637 .status(StatusCode::INTERNAL_SERVER_ERROR)
638 .header(header::CONTENT_TYPE, "text/html")
639 .body(
640 Full::new(Bytes::from(generate_default_error_page(
641 StatusCode::INTERNAL_SERVER_ERROR,
642 None,
643 )))
644 .map_err(|e| match e {})
645 .boxed(),
646 )
647 .unwrap_or_default();
648 let (request_parts, _) = request.into_parts();
649 log_access(
650 global_configuration
651 .as_ref()
652 .map_or(&vec![], |c| &c.observability.log_channels),
653 &request_parts,
654 &socket_data,
655 None,
656 response.status().as_u16(),
657 match response.headers().get(header::CONTENT_LENGTH) {
658 Some(header_value) => match header_value.to_str() {
659 Ok(header_value) => match header_value.parse::<u64>() {
660 Ok(content_length) => Some(content_length),
661 Err(_) => response.body().size_hint().exact(),
662 },
663 Err(_) => response.body().size_hint().exact(),
664 },
665 None => response.body().size_hint().exact(),
666 },
667 global_configuration
668 .as_deref()
669 .and_then(|c| get_value!("log_date_format", c))
670 .and_then(|v| v.as_str()),
671 global_configuration
672 .as_deref()
673 .and_then(|c| get_value!("log_format", c))
674 .and_then(|v| v.as_str()),
675 )
676 .await;
677 let (mut response_parts, response_body) = response.into_parts();
678 if let Some(http3_alt_port) = http3_alt_port {
679 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
680 Some(value) => {
681 let header_value_old = String::from_utf8_lossy(value.as_bytes());
682 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
683
684 if header_value_old != header_value_new {
685 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
686 } else {
687 HeaderValue::from_bytes(header_value_old.as_bytes())
688 }
689 }
690 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
691 } {
692 response_parts.headers.insert(header::ALT_SVC, header_value);
693 }
694 }
695 response_parts
696 .headers
697 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
698
699 return Ok(Response::from_parts(response_parts, response_body));
700 }
701 Err(err) => {
702 for logger in global_configuration
703 .as_ref()
704 .map_or(&vec![], |c| &c.observability.log_channels)
705 {
706 logger
707 .send(LogMessage::new(
708 format!("Cannot determine server configuration: {err}"),
709 true,
710 ))
711 .await
712 .unwrap_or_default()
713 }
714 let response = Response::builder()
715 .status(StatusCode::INTERNAL_SERVER_ERROR)
716 .header(header::CONTENT_TYPE, "text/html")
717 .body(
718 Full::new(Bytes::from(generate_default_error_page(
719 StatusCode::INTERNAL_SERVER_ERROR,
720 None,
721 )))
722 .map_err(|e| match e {})
723 .boxed(),
724 )
725 .unwrap_or_default();
726 let (request_parts, _) = request.into_parts();
727 log_access(
728 global_configuration
729 .as_ref()
730 .map_or(&vec![], |c| &c.observability.log_channels),
731 &request_parts,
732 &socket_data,
733 None,
734 response.status().as_u16(),
735 match response.headers().get(header::CONTENT_LENGTH) {
736 Some(header_value) => match header_value.to_str() {
737 Ok(header_value) => match header_value.parse::<u64>() {
738 Ok(content_length) => Some(content_length),
739 Err(_) => response.body().size_hint().exact(),
740 },
741 Err(_) => response.body().size_hint().exact(),
742 },
743 None => response.body().size_hint().exact(),
744 },
745 global_configuration
746 .as_deref()
747 .and_then(|c| get_value!("log_date_format", c))
748 .and_then(|v| v.as_str()),
749 global_configuration
750 .as_deref()
751 .and_then(|c| get_value!("log_format", c))
752 .and_then(|v| v.as_str()),
753 )
754 .await;
755 let (mut response_parts, response_body) = response.into_parts();
756 if let Some(http3_alt_port) = http3_alt_port {
757 if let Ok(header_value) = match response_parts.headers.get(header::ALT_SVC) {
758 Some(value) => {
759 let header_value_old = String::from_utf8_lossy(value.as_bytes());
760 let header_value_new = format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"");
761
762 if header_value_old != header_value_new {
763 HeaderValue::from_bytes(format!("{header_value_old}, {header_value_new}").as_bytes())
764 } else {
765 HeaderValue::from_bytes(header_value_old.as_bytes())
766 }
767 }
768 None => HeaderValue::from_bytes(format!("h3=\":{http3_alt_port}\", h3-29=\":{http3_alt_port}\"").as_bytes()),
769 } {
770 response_parts.headers.insert(header::ALT_SVC, header_value);
771 }
772 }
773 response_parts
774 .headers
775 .insert(header::SERVER, HeaderValue::from_static(SERVER_SOFTWARE));
776
777 return Ok(Response::from_parts(response_parts, response_body));
778 }
779 };
780
781 let mut log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
783 let mut log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
784
785 if !configuration.observability.log_channels.is_empty() && log_request_parts.is_none() {
787 let (request_parts, request_body) = request.into_parts();
788 log_request_parts = Some(request_parts.clone());
789 request = Request::from_parts(request_parts, request_body);
790 }
791
792 let url_pathname = request.uri().path();
794 let sanitized_url_pathname = match sanitize_url(
795 url_pathname,
796 get_value!("allow_double_slashes", configuration)
797 .and_then(|v| v.as_bool())
798 .unwrap_or(false),
799 ) {
800 Ok(sanitized_url) => sanitized_url,
801 Err(err) => {
802 for logger in &configuration.observability.log_channels {
803 logger
804 .send(LogMessage::new(format!("URL sanitation error: {err}"), true))
805 .await
806 .unwrap_or_default();
807 }
808 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
809
810 let mut headers_to_add = HeaderMap::new();
812 let mut headers_to_replace = HeaderMap::new();
813 let mut headers_to_remove = Vec::new();
814 let (request_parts, _) = request.into_parts();
815 if let Some(custom_headers) = get_entries!("header", configuration) {
816 for custom_header in custom_headers.inner.iter().rev() {
817 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
818 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
819 if !headers_to_add.contains_key(header_name) {
820 if let Ok(header_name) = HeaderName::from_str(header_name) {
821 if let Ok(header_value) =
822 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
823 {
824 headers_to_add.insert(header_name, header_value);
825 }
826 }
827 }
828 }
829 }
830 }
831 }
832 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
833 for custom_header in custom_headers.inner.iter().rev() {
834 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
835 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
836 if let Ok(header_name) = HeaderName::from_str(header_name) {
837 if let Ok(header_value) =
838 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
839 {
840 headers_to_replace.insert(header_name, header_value);
841 }
842 }
843 }
844 }
845 }
846 }
847 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
848 for custom_header in custom_headers_to_remove.inner.iter().rev() {
849 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
850 if let Ok(header_name) = HeaderName::from_str(header_name) {
851 headers_to_remove.push(header_name);
852 }
853 }
854 }
855 }
856
857 return Ok(
858 finalize_response_and_log(
859 response,
860 http3_alt_port,
861 headers_to_add,
862 headers_to_replace,
863 headers_to_remove,
864 &configuration.observability.log_channels,
865 &log_request_parts,
866 &socket_data,
867 None,
868 log_date_format,
869 log_format,
870 )
871 .await,
872 );
873 }
874 };
875
876 if sanitized_url_pathname != url_pathname {
877 let (mut parts, body) = request.into_parts();
878 let orig_uri = parts.uri.clone();
879 let mut url_parts = parts.uri.into_parts();
880 url_parts.path_and_query = Some(
881 match format!(
882 "{}{}",
883 sanitized_url_pathname,
884 match url_parts.path_and_query {
885 Some(path_and_query) => {
886 match path_and_query.query() {
887 Some(query) => format!("?{query}"),
888 None => String::from(""),
889 }
890 }
891 None => String::from(""),
892 }
893 )
894 .parse()
895 {
896 Ok(path_and_query) => path_and_query,
897 Err(err) => {
898 for logger in &configuration.observability.log_channels {
899 logger
900 .send(LogMessage::new(format!("URL sanitation error: {err}"), true))
901 .await
902 .unwrap_or_default();
903 }
904
905 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
906
907 parts.uri = orig_uri;
908
909 let mut headers_to_add = HeaderMap::new();
911 let mut headers_to_replace = HeaderMap::new();
912 let mut headers_to_remove = Vec::new();
913 if let Some(custom_headers) = get_entries!("header", configuration) {
914 for custom_header in custom_headers.inner.iter().rev() {
915 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
916 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
917 if !headers_to_add.contains_key(header_name) {
918 if let Ok(header_name) = HeaderName::from_str(header_name) {
919 if let Ok(header_value) =
920 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
921 {
922 headers_to_add.insert(header_name, header_value);
923 }
924 }
925 }
926 }
927 }
928 }
929 }
930 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
931 for custom_header in custom_headers.inner.iter().rev() {
932 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
933 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
934 if let Ok(header_name) = HeaderName::from_str(header_name) {
935 if let Ok(header_value) =
936 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
937 {
938 headers_to_replace.insert(header_name, header_value);
939 }
940 }
941 }
942 }
943 }
944 }
945 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
946 for custom_header in custom_headers_to_remove.inner.iter().rev() {
947 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
948 if let Ok(header_name) = HeaderName::from_str(header_name) {
949 headers_to_remove.push(header_name);
950 }
951 }
952 }
953 }
954
955 return Ok(
956 finalize_response_and_log(
957 response,
958 http3_alt_port,
959 headers_to_add,
960 headers_to_replace,
961 headers_to_remove,
962 &configuration.observability.log_channels,
963 &log_request_parts,
964 &socket_data,
965 None,
966 log_date_format,
967 log_format,
968 )
969 .await,
970 );
971 }
972 },
973 );
974 parts.uri = match hyper::Uri::from_parts(url_parts) {
975 Ok(uri) => uri,
976 Err(err) => {
977 for logger in &configuration.observability.log_channels {
978 logger
979 .send(LogMessage::new(format!("URL sanitation error: {err}"), true))
980 .await
981 .unwrap_or_default();
982 }
983
984 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
985
986 parts.uri = orig_uri;
987
988 let mut headers_to_add = HeaderMap::new();
990 let mut headers_to_replace = HeaderMap::new();
991 let mut headers_to_remove = Vec::new();
992 if let Some(custom_headers) = get_entries!("header", configuration) {
993 for custom_header in custom_headers.inner.iter().rev() {
994 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
995 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
996 if !headers_to_add.contains_key(header_name) {
997 if let Ok(header_name) = HeaderName::from_str(header_name) {
998 if let Ok(header_value) =
999 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
1000 {
1001 headers_to_add.insert(header_name, header_value);
1002 }
1003 }
1004 }
1005 }
1006 }
1007 }
1008 }
1009 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
1010 for custom_header in custom_headers.inner.iter().rev() {
1011 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1012 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1013 if let Ok(header_name) = HeaderName::from_str(header_name) {
1014 if let Ok(header_value) =
1015 HeaderValue::from_str(&replace_header_placeholders(header_value, &parts, None))
1016 {
1017 headers_to_replace.insert(header_name, header_value);
1018 }
1019 }
1020 }
1021 }
1022 }
1023 }
1024 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
1025 for custom_header in custom_headers_to_remove.inner.iter().rev() {
1026 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1027 if let Ok(header_name) = HeaderName::from_str(header_name) {
1028 headers_to_remove.push(header_name);
1029 }
1030 }
1031 }
1032 }
1033
1034 return Ok(
1035 finalize_response_and_log(
1036 response,
1037 http3_alt_port,
1038 headers_to_add,
1039 headers_to_replace,
1040 headers_to_remove,
1041 &configuration.observability.log_channels,
1042 &log_request_parts,
1043 &socket_data,
1044 None,
1045 log_date_format,
1046 log_format,
1047 )
1048 .await,
1049 );
1050 }
1051 };
1052 let configuration_option = configurations.find_configuration(&parts, hostname_determinant.as_deref(), &socket_data);
1053 request = Request::from_parts(parts, body);
1054 match configuration_option {
1055 Ok(Some(new_configuration)) => {
1056 configuration = new_configuration;
1057 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1058 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1059 }
1060 Ok(None) => {}
1061 Err(err) => {
1062 for logger in &configuration.observability.log_channels {
1063 logger
1064 .send(LogMessage::new(
1065 format!("Cannot determine server configuration: {err}"),
1066 true,
1067 ))
1068 .await
1069 .unwrap_or_default();
1070 }
1071 let response = generate_error_response(StatusCode::BAD_REQUEST, &configuration, &None).await;
1072
1073 let mut headers_to_add = HeaderMap::new();
1075 let mut headers_to_replace = HeaderMap::new();
1076 let mut headers_to_remove = Vec::new();
1077 let (request_parts, _) = request.into_parts();
1078 if let Some(custom_headers) = get_entries!("header", configuration) {
1079 for custom_header in custom_headers.inner.iter().rev() {
1080 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1081 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1082 if !headers_to_add.contains_key(header_name) {
1083 if let Ok(header_name) = HeaderName::from_str(header_name) {
1084 if let Ok(header_value) =
1085 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1086 {
1087 headers_to_add.insert(header_name, header_value);
1088 }
1089 }
1090 }
1091 }
1092 }
1093 }
1094 }
1095 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
1096 for custom_header in custom_headers.inner.iter().rev() {
1097 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1098 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1099 if let Ok(header_name) = HeaderName::from_str(header_name) {
1100 if let Ok(header_value) =
1101 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1102 {
1103 headers_to_replace.insert(header_name, header_value);
1104 }
1105 }
1106 }
1107 }
1108 }
1109 }
1110 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
1111 for custom_header in custom_headers_to_remove.inner.iter().rev() {
1112 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1113 if let Ok(header_name) = HeaderName::from_str(header_name) {
1114 headers_to_remove.push(header_name);
1115 }
1116 }
1117 }
1118 }
1119
1120 return Ok(
1121 finalize_response_and_log(
1122 response,
1123 http3_alt_port,
1124 headers_to_add,
1125 headers_to_replace,
1126 headers_to_remove,
1127 &configuration.observability.log_channels,
1128 &log_request_parts,
1129 &socket_data,
1130 None,
1131 log_date_format,
1132 log_format,
1133 )
1134 .await,
1135 );
1136 }
1137 }
1138 }
1139
1140 let mut headers_to_add = HeaderMap::new();
1142 let mut headers_to_replace = HeaderMap::new();
1143 let mut headers_to_remove = Vec::new();
1144 let (request_parts, request_body) = request.into_parts();
1145 if let Some(custom_headers) = get_entries!("header", configuration) {
1146 for custom_header in custom_headers.inner.iter().rev() {
1147 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1148 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1149 if !headers_to_add.contains_key(header_name) {
1150 if let Ok(header_name) = HeaderName::from_str(header_name) {
1151 if let Ok(header_value) =
1152 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1153 {
1154 headers_to_add.insert(header_name, header_value);
1155 }
1156 }
1157 }
1158 }
1159 }
1160 }
1161 }
1162 if let Some(custom_headers) = get_entries!("header_replace", configuration) {
1163 for custom_header in custom_headers.inner.iter().rev() {
1164 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1165 if let Some(header_value) = custom_header.values.get(1).and_then(|v| v.as_str()) {
1166 if let Ok(header_name) = HeaderName::from_str(header_name) {
1167 if let Ok(header_value) =
1168 HeaderValue::from_str(&replace_header_placeholders(header_value, &request_parts, None))
1169 {
1170 headers_to_replace.insert(header_name, header_value);
1171 }
1172 }
1173 }
1174 }
1175 }
1176 }
1177 if let Some(custom_headers_to_remove) = get_entries!("header_remove", configuration) {
1178 for custom_header in custom_headers_to_remove.inner.iter().rev() {
1179 if let Some(header_name) = custom_header.values.first().and_then(|v| v.as_str()) {
1180 if let Ok(header_name) = HeaderName::from_str(header_name) {
1181 headers_to_remove.push(header_name);
1182 }
1183 }
1184 }
1185 }
1186 let mut request = Request::from_parts(request_parts, request_body);
1187
1188 if request.uri().path() == "*" {
1189 let response = match request.method() {
1190 &Method::OPTIONS => Response::builder()
1191 .status(StatusCode::NO_CONTENT)
1192 .header(header::ALLOW, "GET, POST, HEAD, OPTIONS")
1193 .body(Empty::new().map_err(|e| match e {}).boxed())
1194 .unwrap_or_default(),
1195 _ => {
1196 let mut header_map = HeaderMap::new();
1197 if let Ok(header_value) = HeaderValue::from_str("GET, POST, HEAD, OPTIONS") {
1198 header_map.insert(header::ALLOW, header_value);
1199 };
1200 generate_error_response(StatusCode::BAD_REQUEST, &configuration, &Some(header_map)).await
1201 }
1202 };
1203 return Ok(
1204 finalize_response_and_log(
1205 response,
1206 http3_alt_port,
1207 headers_to_add,
1208 headers_to_replace,
1209 headers_to_remove,
1210 &configuration.observability.log_channels,
1211 &log_request_parts,
1212 &socket_data,
1213 None,
1214 log_date_format,
1215 log_format,
1216 )
1217 .await,
1218 );
1219 }
1220
1221 let acme_http_01_resolvers_inner = acme_http_01_resolvers.read().await;
1223 if !acme_http_01_resolvers_inner.is_empty() {
1224 if let Some(challenge_token) = request.uri().path().strip_prefix("/.well-known/acme-challenge/") {
1225 for acme_http01_resolver in &*acme_http_01_resolvers_inner {
1226 if let Some(http01_acme_data) = &*acme_http01_resolver.read().await {
1227 let acme_response = http01_acme_data.1.clone();
1228 if challenge_token == http01_acme_data.0 {
1229 let response = Response::builder()
1230 .status(StatusCode::OK)
1231 .header(
1232 header::CONTENT_TYPE,
1233 HeaderValue::from_static("application/octet-stream"),
1234 )
1235 .body(Full::new(Bytes::from(acme_response)).map_err(|e| match e {}).boxed())
1236 .unwrap_or_default();
1237
1238 return Ok(
1239 finalize_response_and_log(
1240 response,
1241 http3_alt_port,
1242 headers_to_add,
1243 headers_to_replace,
1244 headers_to_remove,
1245 &configuration.observability.log_channels,
1246 &log_request_parts,
1247 &socket_data,
1248 None,
1249 log_date_format,
1250 log_format,
1251 )
1252 .await,
1253 );
1254 }
1255 }
1256 }
1257 }
1258 };
1259 drop(acme_http_01_resolvers_inner);
1260
1261 let mut error_logger = if !configuration.observability.log_channels.is_empty() {
1262 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1263 } else {
1264 ErrorLogger::without_logger()
1265 };
1266 let mut metrics_enabled = !configuration.observability.metric_channels.is_empty();
1267 let mut metrics_sender = if metrics_enabled {
1268 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1269 } else {
1270 MetricsMultiSender::without_sender()
1271 };
1272 let mut traces_enabled = !configuration.observability.trace_channels.is_empty();
1273 let mut traces_senders = if traces_enabled {
1274 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1275 for channel in &configuration.observability.trace_channels {
1276 channel.0.send(()).await.unwrap_or_default();
1277 if let Ok(channel2) = channel.1.recv().await {
1278 traces_senders.push(channel2);
1279 }
1280 }
1281 traces_senders
1282 } else {
1283 vec![]
1284 };
1285
1286 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1288 for module in &configuration.modules {
1289 module_handlers.push(module.get_module_handlers());
1290 }
1291
1292 request.extensions_mut().insert(RequestData {
1294 auth_user: None,
1295 original_url: None,
1296 error_status_code: None,
1297 });
1298 let mut executed_handlers = Vec::new();
1299 let (request_parts, request_body) = request.into_parts();
1300 let request_parts_cloned = if configurations.inner.iter().rev().any(|c| {
1301 c.filters.is_host
1302 && c.filters.hostname == configuration.filters.hostname
1303 && c.filters.ip == configuration.filters.ip
1304 && c.filters.port == configuration.filters.port
1305 && (c.filters.condition.is_none() || c.filters.condition == configuration.filters.condition)
1306 && c.filters.error_handler_status.is_some()
1307 }) {
1308 let mut request_parts_cloned = request_parts.clone();
1309 request_parts_cloned
1310 .headers
1311 .insert(header::CONTENT_LENGTH, HeaderValue::from_static("0"));
1312 Some(request_parts_cloned)
1313 } else {
1314 None
1316 };
1317 let mut request = Request::from_parts(request_parts, request_body);
1318 let mut latest_auth_data = None;
1319 let mut is_error_handler = false;
1320 let mut handlers_iter: Box<dyn Iterator<Item = Box<dyn ModuleHandlers>>> = Box::new(module_handlers.into_iter());
1321 while let Some(mut handlers) = handlers_iter.next() {
1322 if metrics_enabled {
1323 handlers
1324 .metric_data_before_handler(&request, &socket_data, &metrics_sender)
1325 .await;
1326 }
1327
1328 if traces_enabled {
1329 for trace_sender in &traces_senders {
1330 trace_sender
1331 .send(TraceSignal::StartSpan(format!(
1332 "{}::request_handler",
1333 handlers.get_name()
1334 )))
1335 .await
1336 .unwrap_or_default();
1337 }
1338 }
1339
1340 let response_result = handlers
1341 .request_handler(request, &configuration, &socket_data, &error_logger)
1342 .await;
1343
1344 if traces_enabled {
1345 for trace_sender in &traces_senders {
1346 trace_sender
1347 .send(TraceSignal::EndSpan(
1348 format!("{}::request_handler", handlers.get_name()),
1349 response_result.as_ref().err().map(|e| e.to_string()),
1350 ))
1351 .await
1352 .unwrap_or_default();
1353 }
1354 }
1355
1356 executed_handlers.push(handlers);
1357 match response_result {
1358 Ok(response) => {
1359 let status = response.response_status;
1360 let headers = response.response_headers;
1361 let new_remote_address = response.new_remote_address;
1362 let request_option = response.request;
1363 let response = response.response;
1364 let request_extensions = request_option
1365 .as_ref()
1366 .and_then(|r| r.extensions().get::<RequestData>());
1367 if let Some(request_extensions) = request_extensions {
1368 latest_auth_data = request_extensions.auth_user.clone();
1369 }
1370 if let Some(new_remote_address) = new_remote_address {
1371 socket_data.remote_addr = new_remote_address;
1372 };
1373
1374 match response {
1375 Some(response) => {
1376 let (mut response_parts, response_body) = response.into_parts();
1377 add_custom_headers(
1378 &mut response_parts,
1379 &headers_to_add,
1380 &headers_to_replace,
1381 &headers_to_remove,
1382 );
1383 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1384 add_server_header(&mut response_parts);
1385
1386 let response = Response::from_parts(response_parts, response_body);
1387
1388 match execute_response_modifying_handlers(
1389 response,
1390 executed_handlers,
1391 &configuration,
1392 http3_alt_port,
1393 headers_to_add,
1394 headers_to_replace,
1395 headers_to_remove,
1396 &configuration.observability.log_channels,
1397 &log_request_parts,
1398 &socket_data,
1399 latest_auth_data.as_deref(),
1400 log_date_format,
1401 log_format,
1402 metrics_sender,
1403 metrics_enabled,
1404 traces_senders,
1405 traces_enabled,
1406 )
1407 .await
1408 {
1409 Ok(response) => {
1410 if let Some(request_parts) = log_request_parts {
1411 log_access(
1412 &configuration.observability.log_channels,
1413 &request_parts,
1414 &socket_data,
1415 latest_auth_data.as_deref(),
1416 response.status().as_u16(),
1417 extract_content_length(&response),
1418 log_date_format,
1419 log_format,
1420 )
1421 .await;
1422 }
1423 return Ok(response);
1424 }
1425 Err(error_response) => {
1426 return Ok(error_response);
1427 }
1428 }
1429 }
1430 None => match status {
1431 Some(status) => {
1432 if !is_error_handler {
1433 if let Some(error_configuration) =
1434 configurations.find_error_configuration(&configuration.filters, status.as_u16())
1435 {
1436 let request_option = if let Some(request) = request_option {
1437 Some(request)
1438 } else {
1439 request_parts_cloned.clone().map(|request_parts_cloned| {
1440 Request::from_parts(request_parts_cloned, Empty::new().map_err(|e| match e {}).boxed())
1441 })
1442 };
1443 if let Some(request_cloned) = request_option {
1444 configuration = error_configuration;
1445 let mut module_handlers = Vec::with_capacity(configuration.modules.len());
1446 for module in &configuration.modules {
1447 module_handlers.push(module.get_module_handlers());
1448 }
1449 handlers_iter = Box::new(module_handlers.into_iter());
1450 if metrics_enabled {
1451 while let Some(mut executed_handler) = executed_handlers.pop() {
1452 executed_handler.metric_data_after_handler(&metrics_sender).await;
1453 }
1454 }
1455 executed_handlers = Vec::new();
1456 request = request_cloned;
1457 if let Some(request_data) = request.extensions_mut().get_mut::<RequestData>() {
1458 request_data.error_status_code = Some(status);
1459 }
1460 is_error_handler = true;
1461 log_date_format = get_value!("log_date_format", configuration).and_then(|v| v.as_str());
1462 log_format = get_value!("log_format", configuration).and_then(|v| v.as_str());
1463 error_logger = if !configuration.observability.log_channels.is_empty() {
1464 ErrorLogger::new_multiple(configuration.observability.log_channels.clone())
1465 } else {
1466 ErrorLogger::without_logger()
1467 };
1468 metrics_enabled = !configuration.observability.metric_channels.is_empty();
1469 metrics_sender = if metrics_enabled {
1470 MetricsMultiSender::new_multiple(configuration.observability.metric_channels.clone())
1471 } else {
1472 MetricsMultiSender::without_sender()
1473 };
1474 traces_enabled = !configuration.observability.trace_channels.is_empty();
1475 traces_senders = if traces_enabled {
1476 let mut traces_senders = Vec::with_capacity(configuration.observability.trace_channels.len());
1477 for channel in &configuration.observability.trace_channels {
1478 channel.0.send(()).await.unwrap_or_default();
1479 if let Ok(channel2) = channel.1.recv().await {
1480 traces_senders.push(channel2);
1481 }
1482 }
1483 traces_senders
1484 } else {
1485 vec![]
1486 };
1487 continue;
1488 }
1489 }
1490 }
1491 let response = generate_error_response(status, &configuration, &headers).await;
1492
1493 let (mut response_parts, response_body) = response.into_parts();
1494 add_custom_headers(
1495 &mut response_parts,
1496 &headers_to_add,
1497 &headers_to_replace,
1498 &headers_to_remove,
1499 );
1500 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1501 add_server_header(&mut response_parts);
1502
1503 let response = Response::from_parts(response_parts, response_body);
1504
1505 match execute_response_modifying_handlers(
1506 response,
1507 executed_handlers,
1508 &configuration,
1509 http3_alt_port,
1510 headers_to_add,
1511 headers_to_replace,
1512 headers_to_remove,
1513 &configuration.observability.log_channels,
1514 &log_request_parts,
1515 &socket_data,
1516 latest_auth_data.as_deref(),
1517 log_date_format,
1518 log_format,
1519 metrics_sender,
1520 metrics_enabled,
1521 traces_senders,
1522 traces_enabled,
1523 )
1524 .await
1525 {
1526 Ok(response) => {
1527 if let Some(request_parts) = log_request_parts {
1528 log_access(
1529 &configuration.observability.log_channels,
1530 &request_parts,
1531 &socket_data,
1532 latest_auth_data.as_deref(),
1533 response.status().as_u16(),
1534 extract_content_length(&response),
1535 log_date_format,
1536 log_format,
1537 )
1538 .await;
1539 }
1540 return Ok(response);
1541 }
1542 Err(error_response) => {
1543 return Ok(error_response);
1544 }
1545 }
1546 }
1547 None => match request_option {
1548 Some(request_obtained) => {
1549 request = request_obtained;
1550 continue;
1551 }
1552 None => {
1553 break;
1554 }
1555 },
1556 },
1557 }
1558 }
1559 Err(err) => {
1560 let response = generate_error_response(StatusCode::INTERNAL_SERVER_ERROR, &configuration, &None).await;
1561
1562 for logger in &configuration.observability.log_channels {
1563 logger
1564 .send(LogMessage::new(
1565 format!("Unexpected error while serving a request: {err}"),
1566 true,
1567 ))
1568 .await
1569 .unwrap_or_default();
1570 }
1571
1572 let (mut response_parts, response_body) = response.into_parts();
1573 add_custom_headers(
1574 &mut response_parts,
1575 &headers_to_add,
1576 &headers_to_replace,
1577 &headers_to_remove,
1578 );
1579 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1580 add_server_header(&mut response_parts);
1581
1582 let response = Response::from_parts(response_parts, response_body);
1583
1584 match execute_response_modifying_handlers(
1585 response,
1586 executed_handlers,
1587 &configuration,
1588 http3_alt_port,
1589 headers_to_add,
1590 headers_to_replace,
1591 headers_to_remove,
1592 &configuration.observability.log_channels,
1593 &log_request_parts,
1594 &socket_data,
1595 latest_auth_data.as_deref(),
1596 log_date_format,
1597 log_format,
1598 metrics_sender,
1599 metrics_enabled,
1600 traces_senders,
1601 traces_enabled,
1602 )
1603 .await
1604 {
1605 Ok(response) => {
1606 if let Some(request_parts) = log_request_parts {
1607 log_access(
1608 &configuration.observability.log_channels,
1609 &request_parts,
1610 &socket_data,
1611 latest_auth_data.as_deref(),
1612 response.status().as_u16(),
1613 extract_content_length(&response),
1614 log_date_format,
1615 log_format,
1616 )
1617 .await;
1618 }
1619 return Ok(response);
1620 }
1621 Err(error_response) => {
1622 return Ok(error_response);
1623 }
1624 }
1625 }
1626 }
1627 }
1628
1629 let response = generate_error_response(StatusCode::NOT_FOUND, &configuration, &None).await;
1630
1631 let (mut response_parts, response_body) = response.into_parts();
1632 add_custom_headers(
1633 &mut response_parts,
1634 &headers_to_add,
1635 &headers_to_replace,
1636 &headers_to_remove,
1637 );
1638 add_http3_alt_svc_header(&mut response_parts, http3_alt_port);
1639 add_server_header(&mut response_parts);
1640
1641 let response = Response::from_parts(response_parts, response_body);
1642
1643 match execute_response_modifying_handlers(
1644 response,
1645 executed_handlers,
1646 &configuration,
1647 http3_alt_port,
1648 headers_to_add,
1649 headers_to_replace,
1650 headers_to_remove,
1651 &configuration.observability.log_channels,
1652 &log_request_parts,
1653 &socket_data,
1654 latest_auth_data.as_deref(),
1655 log_date_format,
1656 log_format,
1657 metrics_sender,
1658 metrics_enabled,
1659 traces_senders,
1660 traces_enabled,
1661 )
1662 .await
1663 {
1664 Ok(response) => {
1665 if let Some(request_parts) = log_request_parts {
1666 log_access(
1667 &configuration.observability.log_channels,
1668 &request_parts,
1669 &socket_data,
1670 latest_auth_data.as_deref(),
1671 response.status().as_u16(),
1672 extract_content_length(&response),
1673 log_date_format,
1674 log_format,
1675 )
1676 .await;
1677 }
1678 Ok(response)
1679 }
1680 Err(error_response) => Ok(error_response),
1681 }
1682}
1683
1684#[allow(clippy::too_many_arguments)]
1686pub async fn request_handler(
1687 request: Request<BoxBody<Bytes, std::io::Error>>,
1688 client_address: SocketAddr,
1689 server_address: SocketAddr,
1690 encrypted: bool,
1691 configurations: Arc<ServerConfigurations>,
1692 http3_alt_port: Option<u16>,
1693 acme_http_01_resolvers: Arc<tokio::sync::RwLock<Vec<crate::acme::Http01DataLock>>>,
1694 proxy_protocol_client_address: Option<SocketAddr>,
1695 proxy_protocol_server_address: Option<SocketAddr>,
1696) -> Result<Response<BoxBody<Bytes, std::io::Error>>, anyhow::Error> {
1697 let global_configuration = configurations.find_global_configuration();
1698 let timeout_from_config = global_configuration
1699 .as_deref()
1700 .and_then(|c| get_entry!("timeout", c))
1701 .and_then(|e| e.values.last());
1702 let request_handler_future = request_handler_wrapped(
1703 request,
1704 client_address,
1705 server_address,
1706 encrypted,
1707 configurations,
1708 http3_alt_port,
1709 acme_http_01_resolvers,
1710 proxy_protocol_client_address,
1711 proxy_protocol_server_address,
1712 );
1713 if timeout_from_config.is_some_and(|v| v.is_null()) {
1714 request_handler_future.await.map_err(|e| anyhow::anyhow!(e))
1715 } else {
1716 let timeout_millis = timeout_from_config.and_then(|v| v.as_i128()).unwrap_or(300000) as u64;
1717 match timeout(Duration::from_millis(timeout_millis), request_handler_future).await {
1718 Ok(response) => response.map_err(|e| anyhow::anyhow!(e)),
1719 Err(_) => Err(anyhow::anyhow!("The client or server has timed out")),
1720 }
1721 }
1722}