1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5use ferron_common::logging::LogMessage;
6#[cfg(feature = "runtime-monoio")]
7use monoio::net::TcpListener;
8#[cfg(feature = "runtime-tokio")]
9use tokio::net::TcpListener;
10use tokio_util::sync::CancellationToken;
11#[cfg(feature = "runtime-vibeio")]
12use vibeio::net::TcpListener;
13
14use crate::listener_handler_communication::{Connection, ConnectionData};
15
16type ListenerError = Box<dyn Error + Send + Sync>;
17type ListenerResult = Result<TcpListener, std::io::Error>;
18
19#[inline]
20fn protocol_name(encrypted: bool) -> &'static str {
21 if encrypted {
22 "HTTPS"
23 } else {
24 "HTTP"
25 }
26}
27
28#[inline]
29fn listen_error_message(encrypted: bool, err: &std::io::Error) -> anyhow::Error {
30 anyhow::anyhow!("Cannot listen to {} port: {err}", protocol_name(encrypted))
31}
32
33#[inline]
34fn log_retry(encrypted: bool, tries: u64, duration: Duration) {
35 println!(
36 "{} port is used at try #{tries}, retrying in {duration:?}...",
37 protocol_name(encrypted)
38 );
39}
40
41#[inline]
42fn log_skip(encrypted: bool, tries: u64) {
43 println!("{} port is used at try #{tries}, skipping...", protocol_name(encrypted));
44}
45
46#[inline]
47fn log_listening(encrypted: bool, address: SocketAddr) {
48 println!("{} server is listening on {address}...", protocol_name(encrypted));
49}
50
51#[inline]
52fn build_tcp_listener(address: SocketAddr, tcp_buffer_sizes: (Option<usize>, Option<usize>)) -> ListenerResult {
53 let listener_socket2 = socket2::Socket::new(
55 if address.is_ipv6() {
56 socket2::Domain::IPV6
57 } else {
58 socket2::Domain::IPV4
59 },
60 socket2::Type::STREAM,
61 Some(socket2::Protocol::TCP),
62 )?;
63
64 listener_socket2.set_reuse_address(!cfg!(windows)).unwrap_or_default();
66 #[cfg(unix)]
67 listener_socket2.set_reuse_port(false).unwrap_or_default();
68 if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
69 listener_socket2
70 .set_send_buffer_size(tcp_send_buffer_size)
71 .unwrap_or_default();
72 }
73 if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
74 listener_socket2
75 .set_recv_buffer_size(tcp_recv_buffer_size)
76 .unwrap_or_default();
77 }
78 if address.is_ipv6() {
79 listener_socket2.set_only_v6(false).unwrap_or_default();
80 }
81
82 #[cfg(feature = "runtime-monoio")]
83 let is_poll_io = monoio::utils::is_legacy();
84 #[cfg(feature = "runtime-vibeio")]
85 let is_poll_io = !vibeio::util::supports_completion();
86 #[cfg(feature = "runtime-tokio")]
87 let is_poll_io = true;
88
89 if is_poll_io {
90 listener_socket2.set_nonblocking(true).unwrap_or_default();
91 }
92
93 listener_socket2.bind(&address.into())?;
95 listener_socket2.listen(-1)?;
96
97 TcpListener::from_std(listener_socket2.into())
99}
100
101#[inline]
102async fn log_accept_error(logging_tx: &Option<Sender<LogMessage>>, err: &std::io::Error) {
103 if let Some(logging_tx) = logging_tx {
104 logging_tx
105 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
106 .await
107 .unwrap_or_default();
108 }
109}
110
111#[allow(clippy::too_many_arguments)]
113pub fn create_tcp_listener(
114 address: SocketAddr,
115 encrypted: bool,
116 tx: Sender<ConnectionData>,
117 enable_uring: Option<bool>,
118 logging_tx: Option<Sender<LogMessage>>,
119 first_startup: bool,
120 tcp_buffer_sizes: (Option<usize>, Option<usize>),
121 io_uring_disabled: Sender<Option<std::io::Error>>,
122) -> Result<CancellationToken, ListenerError> {
123 let shutdown_tx = CancellationToken::new();
124 let shutdown_rx = shutdown_tx.clone();
125 let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
126 std::thread::Builder::new()
127 .name(format!("TCP listener for {address}"))
128 .spawn(move || {
129 let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
130 Ok(rt) => rt,
131 Err(error) => {
132 listen_error_tx
133 .send_blocking(Some(
134 anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
135 ))
136 .unwrap_or_default();
137 return;
138 }
139 };
140 io_uring_disabled
141 .send_blocking(rt.return_io_uring_error())
142 .unwrap_or_default();
143 rt.run(async move {
144 if let Err(error) = tcp_listener_fn(
145 address,
146 encrypted,
147 tx,
148 &listen_error_tx,
149 logging_tx,
150 first_startup,
151 tcp_buffer_sizes,
152 shutdown_rx,
153 )
154 .await
155 {
156 listen_error_tx.send(Some(error)).await.unwrap_or_default();
157 }
158 });
159 })?;
160
161 if let Some(error) = listen_error_rx.recv_blocking()? {
162 Err(error)?;
163 }
164
165 Ok(shutdown_tx)
166}
167
168#[allow(clippy::too_many_arguments)]
170async fn tcp_listener_fn(
171 address: SocketAddr,
172 encrypted: bool,
173 tx: Sender<ConnectionData>,
174 listen_error_tx: &Sender<Option<ListenerError>>,
175 logging_tx: Option<Sender<LogMessage>>,
176 first_startup: bool,
177 tcp_buffer_sizes: (Option<usize>, Option<usize>),
178 shutdown_rx: CancellationToken,
179) -> Result<(), ListenerError> {
180 let mut listener_result;
181 let mut tries: u64 = 0;
182 loop {
183 listener_result = build_tcp_listener(address, tcp_buffer_sizes);
184 if first_startup || listener_result.is_ok() {
185 break;
186 }
187 tries += 1;
188 if tries >= 10 {
189 log_skip(encrypted, tries);
190 listen_error_tx.send(None).await.unwrap_or_default();
191 break;
192 }
193 let duration = Duration::from_millis(1000);
194 log_retry(encrypted, tries, duration);
195 crate::runtime::sleep(duration).await;
196 }
197 let listener = match listener_result {
198 Ok(listener) => listener,
199 Err(err) => Err(listen_error_message(encrypted, &err))?,
200 };
201
202 log_listening(encrypted, address);
203 listen_error_tx.send(None).await.unwrap_or_default();
204
205 #[cfg(unix)]
206 let mut handle_exhaustion_backoff = Duration::from_millis(10);
207
208 loop {
209 let (tcp, remote_address) = match crate::runtime::select! {
210 result = listener.accept() => {
211 result
212 }
213 _ = shutdown_rx.cancelled() => {
214 return Ok(());
215 }
216 } {
217 Ok(data) => {
218 #[cfg(unix)]
219 {
220 handle_exhaustion_backoff = Duration::from_millis(10);
221 }
222 data
223 }
224 Err(err) => {
225 log_accept_error(&logging_tx, &err).await;
226
227 #[cfg(unix)]
229 if err.raw_os_error() == Some(24) {
230 crate::runtime::sleep(handle_exhaustion_backoff).await;
231 handle_exhaustion_backoff *= 2;
232 if handle_exhaustion_backoff > Duration::from_secs(1) {
233 handle_exhaustion_backoff = Duration::from_secs(1);
234 }
235 }
236
237 continue;
238 }
239 };
240 let local_address: SocketAddr = match tcp.local_addr() {
241 Ok(data) => data,
242 Err(err) => {
243 log_accept_error(&logging_tx, &err).await;
244 continue;
245 }
246 };
247
248 #[cfg(any(feature = "runtime-vibeio", feature = "runtime-monoio"))]
249 let tcp_data = {
250 #[cfg(unix)]
251 let tcp_std = {
252 use std::os::fd::{FromRawFd, IntoRawFd};
253 let raw_fd = tcp.into_raw_fd();
254 unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
259 };
260 #[cfg(windows)]
261 let tcp_std = {
262 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
263 let raw_fd = tcp.into_raw_socket();
264 unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
268 };
269
270 let tcp_socket2 = socket2::Socket::from(tcp_std);
272 tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
273
274 let tcp_std = tcp_socket2.into();
275 ConnectionData {
276 connection: Connection::Tcp(tcp_std),
277 client_address: remote_address,
278 server_address: local_address,
279 }
280 };
281 #[cfg(feature = "runtime-tokio")]
282 let tcp_data = {
283 tcp.set_nodelay(true).unwrap_or_default();
284
285 ConnectionData {
286 connection: Connection::Tcp(tcp),
287 client_address: remote_address,
288 server_address: local_address,
289 }
290 };
291
292 let tcp_tx = tx.clone();
293 crate::runtime::spawn(async move {
294 tcp_tx.send(tcp_data).await.unwrap_or_default();
296 });
297 }
298}