1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5use ferron_common::logging::LogMessage;
6#[cfg(feature = "runtime-monoio")]
7use monoio::net::TcpListener;
8#[cfg(feature = "runtime-tokio")]
9use tokio::net::TcpListener;
10use tokio_util::sync::CancellationToken;
11
12use crate::listener_handler_communication::{Connection, ConnectionData};
13
14#[allow(clippy::too_many_arguments)]
16pub fn create_tcp_listener(
17 address: SocketAddr,
18 encrypted: bool,
19 tx: Sender<ConnectionData>,
20 enable_uring: Option<bool>,
21 logging_tx: Option<Sender<LogMessage>>,
22 first_startup: bool,
23 tcp_buffer_sizes: (Option<usize>, Option<usize>),
24 io_uring_disabled: Sender<Option<std::io::Error>>,
25) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
26 let shutdown_tx = CancellationToken::new();
27 let shutdown_rx = shutdown_tx.clone();
28 let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
29 std::thread::Builder::new()
30 .name(format!("TCP listener for {address}"))
31 .spawn(move || {
32 let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
33 Ok(rt) => rt,
34 Err(error) => {
35 listen_error_tx
36 .send_blocking(Some(
37 anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
38 ))
39 .unwrap_or_default();
40 return;
41 }
42 };
43 io_uring_disabled
44 .send_blocking(rt.return_io_uring_error())
45 .unwrap_or_default();
46 rt.run(async move {
47 if let Err(error) = tcp_listener_fn(
48 address,
49 encrypted,
50 tx,
51 &listen_error_tx,
52 logging_tx,
53 first_startup,
54 tcp_buffer_sizes,
55 shutdown_rx,
56 )
57 .await
58 {
59 listen_error_tx.send(Some(error)).await.unwrap_or_default();
60 }
61 });
62 })?;
63
64 if let Some(error) = listen_error_rx.recv_blocking()? {
65 Err(error)?;
66 }
67
68 Ok(shutdown_tx)
69}
70
71#[allow(clippy::too_many_arguments)]
73async fn tcp_listener_fn(
74 address: SocketAddr,
75 encrypted: bool,
76 tx: Sender<ConnectionData>,
77 listen_error_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
78 logging_tx: Option<Sender<LogMessage>>,
79 first_startup: bool,
80 tcp_buffer_sizes: (Option<usize>, Option<usize>),
81 shutdown_rx: CancellationToken,
82) -> Result<(), Box<dyn Error + Send + Sync>> {
83 let mut listener_result;
84 let mut tries: u64 = 0;
85 loop {
86 listener_result = (|| {
87 let listener_socket2 = socket2::Socket::new(
89 if address.is_ipv6() {
90 socket2::Domain::IPV6
91 } else {
92 socket2::Domain::IPV4
93 },
94 socket2::Type::STREAM,
95 Some(socket2::Protocol::TCP),
96 )?;
97
98 listener_socket2.set_reuse_address(!cfg!(windows)).unwrap_or_default();
100 #[cfg(unix)]
101 listener_socket2.set_reuse_port(false).unwrap_or_default();
102 if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
103 listener_socket2
104 .set_send_buffer_size(tcp_send_buffer_size)
105 .unwrap_or_default();
106 }
107 if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
108 listener_socket2
109 .set_recv_buffer_size(tcp_recv_buffer_size)
110 .unwrap_or_default();
111 }
112 if address.is_ipv6() {
113 listener_socket2.set_only_v6(false).unwrap_or_default();
114 }
115
116 #[cfg(feature = "runtime-monoio")]
117 let is_poll_io = monoio::utils::is_legacy();
118 #[cfg(feature = "runtime-tokio")]
119 let is_poll_io = true;
120
121 if is_poll_io {
122 listener_socket2.set_nonblocking(true).unwrap_or_default();
123 }
124
125 listener_socket2.bind(&address.into())?;
127 listener_socket2.listen(-1)?;
128
129 TcpListener::from_std(listener_socket2.into())
131 })();
132 if first_startup || listener_result.is_ok() {
133 break;
134 }
135 tries += 1;
136 if tries >= 10 {
137 if encrypted {
138 println!("HTTPS port is used at try #{tries}, skipping...");
139 } else {
140 println!("HTTP port is used at try #{tries}, skipping...");
141 }
142 listen_error_tx.send(None).await.unwrap_or_default();
143 break;
144 }
145 let duration = Duration::from_millis(1000);
146 if encrypted {
147 println!("HTTPS port is used at try #{tries}, retrying in {duration:?}...");
148 } else {
149 println!("HTTP port is used at try #{tries}, retrying in {duration:?}...");
150 }
151 crate::runtime::sleep(duration).await;
152 }
153 let listener = match listener_result {
154 Ok(listener) => listener,
155 Err(err) => {
156 if encrypted {
157 Err(anyhow::anyhow!(format!("Cannot listen to HTTPS port: {}", err)))?
158 } else {
159 Err(anyhow::anyhow!(format!("Cannot listen to HTTP port: {}", err)))?
160 }
161 }
162 };
163
164 if encrypted {
165 println!("HTTPS server is listening on {address}...");
166 } else {
167 println!("HTTP server is listening on {address}...");
168 }
169 listen_error_tx.send(None).await.unwrap_or_default();
170
171 loop {
172 let (tcp, remote_address) = match crate::runtime::select! {
173 result = listener.accept() => {
174 result
175 }
176 _ = shutdown_rx.cancelled() => {
177 return Ok(());
178 }
179 } {
180 Ok(data) => data,
181 Err(err) => {
182 if let Some(logging_tx) = &logging_tx {
183 logging_tx
184 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
185 .await
186 .unwrap_or_default();
187 }
188 continue;
189 }
190 };
191 let local_address: SocketAddr = match tcp.local_addr() {
192 Ok(data) => data,
193 Err(err) => {
194 if let Some(logging_tx) = &logging_tx {
195 logging_tx
196 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
197 .await
198 .unwrap_or_default();
199 }
200 continue;
201 }
202 };
203
204 #[cfg(feature = "runtime-monoio")]
205 let tcp_data = {
206 #[cfg(unix)]
207 let tcp_std = {
208 use std::os::fd::{FromRawFd, IntoRawFd};
209 let raw_fd = tcp.into_raw_fd();
210 unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
215 };
216 #[cfg(windows)]
217 let tcp_std = {
218 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
219 let raw_fd = tcp.into_raw_socket();
220 unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
224 };
225
226 let tcp_socket2 = socket2::Socket::from(tcp_std);
228 tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
229
230 let tcp_std = tcp_socket2.into();
231 ConnectionData {
232 connection: Connection::Tcp(tcp_std),
233 client_address: remote_address,
234 server_address: local_address,
235 }
236 };
237 #[cfg(feature = "runtime-tokio")]
238 let tcp_data = {
239 tcp.set_nodelay(true).unwrap_or_default();
240
241 ConnectionData {
242 connection: Connection::Tcp(tcp),
243 client_address: remote_address,
244 server_address: local_address,
245 }
246 };
247
248 let tcp_tx = tx.clone();
249 crate::runtime::spawn(async move {
250 tcp_tx.send(tcp_data).await.unwrap_or_default();
252 });
253 }
254}