1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5use ferron_common::logging::LogMessage;
6#[cfg(feature = "runtime-monoio")]
7use monoio::net::TcpListener;
8#[cfg(feature = "runtime-tokio")]
9use tokio::net::TcpListener;
10use tokio_util::sync::CancellationToken;
11
12use crate::listener_handler_communication::{Connection, ConnectionData};
13
14#[allow(clippy::too_many_arguments)]
16pub fn create_tcp_listener(
17 address: SocketAddr,
18 encrypted: bool,
19 tx: Sender<ConnectionData>,
20 enable_uring: Option<bool>,
21 logging_tx: Option<Sender<LogMessage>>,
22 first_startup: bool,
23 tcp_buffer_sizes: (Option<usize>, Option<usize>),
24 io_uring_disabled: Sender<Option<std::io::Error>>,
25) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
26 let shutdown_tx = CancellationToken::new();
27 let shutdown_rx = shutdown_tx.clone();
28 let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
29 std::thread::Builder::new()
30 .name(format!("TCP listener for {address}"))
31 .spawn(move || {
32 let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
33 Ok(rt) => rt,
34 Err(error) => {
35 listen_error_tx
36 .send_blocking(Some(
37 anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
38 ))
39 .unwrap_or_default();
40 return;
41 }
42 };
43 io_uring_disabled
44 .send_blocking(rt.return_io_uring_error())
45 .unwrap_or_default();
46 rt.run(async move {
47 let tcp_listener_future = tcp_listener_fn(
48 address,
49 encrypted,
50 tx,
51 &listen_error_tx,
52 logging_tx,
53 first_startup,
54 tcp_buffer_sizes,
55 );
56 crate::runtime::select! {
57 result = tcp_listener_future => {
58 if let Some(error) = result.err() {
59 listen_error_tx.send(Some(error)).await.unwrap_or_default();
60 }
61 }
62 _ = shutdown_rx.cancelled() => {
63
64 }
65 }
66 });
67 })?;
68
69 if let Some(error) = listen_error_rx.recv_blocking()? {
70 Err(error)?;
71 }
72
73 Ok(shutdown_tx)
74}
75
76async fn tcp_listener_fn(
78 address: SocketAddr,
79 encrypted: bool,
80 tx: Sender<ConnectionData>,
81 listen_error_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
82 logging_tx: Option<Sender<LogMessage>>,
83 first_startup: bool,
84 tcp_buffer_sizes: (Option<usize>, Option<usize>),
85) -> Result<(), Box<dyn Error + Send + Sync>> {
86 let mut listener_result;
87 let mut tries: u64 = 0;
88 loop {
89 listener_result = (|| {
90 let listener_socket2 = socket2::Socket::new(
92 if address.is_ipv6() {
93 socket2::Domain::IPV6
94 } else {
95 socket2::Domain::IPV4
96 },
97 socket2::Type::STREAM,
98 Some(socket2::Protocol::TCP),
99 )?;
100
101 listener_socket2.set_reuse_address(!cfg!(windows)).unwrap_or_default();
103 #[cfg(unix)]
104 listener_socket2.set_reuse_port(false).unwrap_or_default();
105 if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
106 listener_socket2
107 .set_send_buffer_size(tcp_send_buffer_size)
108 .unwrap_or_default();
109 }
110 if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
111 listener_socket2
112 .set_recv_buffer_size(tcp_recv_buffer_size)
113 .unwrap_or_default();
114 }
115 if address.is_ipv6() {
116 listener_socket2.set_only_v6(false).unwrap_or_default();
117 }
118
119 #[cfg(feature = "runtime-monoio")]
120 let is_poll_io = monoio::utils::is_legacy();
121 #[cfg(feature = "runtime-tokio")]
122 let is_poll_io = true;
123
124 if is_poll_io {
125 listener_socket2.set_nonblocking(true).unwrap_or_default();
126 }
127
128 listener_socket2.bind(&address.into())?;
130 listener_socket2.listen(-1)?;
131
132 TcpListener::from_std(listener_socket2.into())
134 })();
135 if first_startup || listener_result.is_ok() {
136 break;
137 }
138 tries += 1;
139 if tries >= 10 {
140 if encrypted {
141 println!("HTTPS port is used at try #{tries}, skipping...");
142 } else {
143 println!("HTTP port is used at try #{tries}, skipping...");
144 }
145 listen_error_tx.send(None).await.unwrap_or_default();
146 break;
147 }
148 let duration = Duration::from_millis(1000);
149 if encrypted {
150 println!("HTTPS port is used at try #{tries}, retrying in {duration:?}...");
151 } else {
152 println!("HTTP port is used at try #{tries}, retrying in {duration:?}...");
153 }
154 crate::runtime::sleep(duration).await;
155 }
156 let listener = match listener_result {
157 Ok(listener) => listener,
158 Err(err) => {
159 if encrypted {
160 Err(anyhow::anyhow!(format!("Cannot listen to HTTPS port: {}", err)))?
161 } else {
162 Err(anyhow::anyhow!(format!("Cannot listen to HTTP port: {}", err)))?
163 }
164 }
165 };
166
167 if encrypted {
168 println!("HTTPS server is listening on {address}...");
169 } else {
170 println!("HTTP server is listening on {address}...");
171 }
172 listen_error_tx.send(None).await.unwrap_or_default();
173
174 loop {
175 let (tcp, remote_address) = match listener.accept().await {
176 Ok(data) => data,
177 Err(err) => {
178 if let Some(logging_tx) = &logging_tx {
179 logging_tx
180 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
181 .await
182 .unwrap_or_default();
183 }
184 continue;
185 }
186 };
187 let local_address: SocketAddr = match tcp.local_addr() {
188 Ok(data) => data,
189 Err(err) => {
190 if let Some(logging_tx) = &logging_tx {
191 logging_tx
192 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
193 .await
194 .unwrap_or_default();
195 }
196 continue;
197 }
198 };
199
200 #[cfg(feature = "runtime-monoio")]
201 let tcp_data = {
202 #[cfg(unix)]
203 let tcp_std = {
204 use std::os::fd::{FromRawFd, IntoRawFd};
205 let raw_fd = tcp.into_raw_fd();
206 unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
211 };
212 #[cfg(windows)]
213 let tcp_std = {
214 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
215 let raw_fd = tcp.into_raw_socket();
216 unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
220 };
221
222 let tcp_socket2 = socket2::Socket::from(tcp_std);
224 tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
225
226 let tcp_std = tcp_socket2.into();
227 ConnectionData {
228 connection: Connection::Tcp(tcp_std),
229 client_address: remote_address,
230 server_address: local_address,
231 }
232 };
233 #[cfg(feature = "runtime-tokio")]
234 let tcp_data = {
235 tcp.set_nodelay(true).unwrap_or_default();
236
237 ConnectionData {
238 connection: Connection::Tcp(tcp),
239 client_address: remote_address,
240 server_address: local_address,
241 }
242 };
243
244 let tcp_tx = tx.clone();
245 crate::runtime::spawn(async move {
246 tcp_tx.send(tcp_data).await.unwrap_or_default();
248 });
249 }
250}