1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5#[cfg(feature = "runtime-monoio")]
6use monoio::net::{ListenerOpts, TcpListener};
7#[cfg(feature = "runtime-tokio")]
8use tokio::net::TcpListener;
9use tokio_util::sync::CancellationToken;
10
11use crate::listener_handler_communication::{Connection, ConnectionData};
12use crate::logging::LogMessage;
13
14pub fn create_tcp_listener(
16 address: SocketAddr,
17 encrypted: bool,
18 tx: Sender<ConnectionData>,
19 enable_uring: bool,
20 logging_tx: Option<Sender<LogMessage>>,
21 first_startup: bool,
22 tcp_buffer_sizes: (Option<usize>, Option<usize>),
23) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
24 let shutdown_tx = CancellationToken::new();
25 let shutdown_rx = shutdown_tx.clone();
26 let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
27 std::thread::Builder::new()
28 .name(format!("TCP listener for {address}"))
29 .spawn(move || {
30 crate::runtime::new_runtime(async move {
31 crate::runtime::select! {
32 result = tcp_listener_fn(address, encrypted, tx, &listen_error_tx, logging_tx, first_startup, tcp_buffer_sizes) => {
33 if let Some(error) = result.err() {
34 listen_error_tx.send(Some(error)).await.unwrap_or_default();
35 }
36 }
37 _ = shutdown_rx.cancelled() => {
38
39 }
40 }
41 }, enable_uring).unwrap();
42 })?;
43
44 if let Some(error) = listen_error_rx.recv_blocking()? {
45 Err(error)?;
46 }
47
48 Ok(shutdown_tx)
49}
50
51async fn tcp_listener_fn(
53 address: SocketAddr,
54 encrypted: bool,
55 tx: Sender<ConnectionData>,
56 listen_error_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
57 logging_tx: Option<Sender<LogMessage>>,
58 first_startup: bool,
59 tcp_buffer_sizes: (Option<usize>, Option<usize>),
60) -> Result<(), Box<dyn Error + Send + Sync>> {
61 let mut listener_result;
62 let mut tries: u64 = 0;
63 loop {
64 #[cfg(feature = "runtime-monoio")]
65 let listener_opts = {
66 let mut listener_opts = ListenerOpts::new()
67 .reuse_addr(!cfg!(windows))
68 .reuse_port(false)
69 .backlog(-1);
70 if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
71 listener_opts = listener_opts.send_buf_size(tcp_send_buffer_size);
72 }
73 if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
74 listener_opts = listener_opts.recv_buf_size(tcp_recv_buffer_size);
75 }
76 listener_opts
77 };
78 #[cfg(feature = "runtime-monoio")]
79 let listener_result2 = TcpListener::bind_with_config(address, &listener_opts);
80 #[cfg(feature = "runtime-tokio")]
81 let listener_result2 = (|| {
82 let listener = std::net::TcpListener::bind(address)?;
83 listener.set_nonblocking(true).unwrap_or_default();
84 let listener_socket2 = socket2::Socket::from(listener);
85 if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
86 listener_socket2
87 .set_send_buffer_size(tcp_send_buffer_size)
88 .unwrap_or_default();
89 }
90 if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
91 listener_socket2
92 .set_recv_buffer_size(tcp_recv_buffer_size)
93 .unwrap_or_default();
94 }
95 TcpListener::from_std(listener_socket2.into())
96 })();
97 listener_result = listener_result2;
98 if first_startup || listener_result.is_ok() {
99 break;
100 }
101 tries += 1;
102 if tries >= 10 {
103 if encrypted {
104 println!("HTTPS port is used at try #{tries}, skipping...");
105 } else {
106 println!("HTTP port is used at try #{tries}, skipping...");
107 }
108 listen_error_tx.send(None).await.unwrap_or_default();
109 break;
110 }
111 let duration = Duration::from_millis(1000);
112 if encrypted {
113 println!("HTTPS port is used at try #{tries}, retrying in {duration:?}...");
114 } else {
115 println!("HTTP port is used at try #{tries}, retrying in {duration:?}...");
116 }
117 crate::runtime::sleep(duration).await;
118 }
119 let listener = match listener_result {
120 Ok(listener) => listener,
121 Err(err) => {
122 if encrypted {
123 Err(anyhow::anyhow!(format!("Cannot listen to HTTPS port: {}", err)))?
124 } else {
125 Err(anyhow::anyhow!(format!("Cannot listen to HTTP port: {}", err)))?
126 }
127 }
128 };
129
130 if encrypted {
131 println!("HTTPS server is listening on {address}...");
132 } else {
133 println!("HTTP server is listening on {address}...");
134 }
135 listen_error_tx.send(None).await.unwrap_or_default();
136
137 loop {
138 let (tcp, remote_address) = match listener.accept().await {
139 Ok(data) => data,
140 Err(err) => {
141 if let Some(logging_tx) = &logging_tx {
142 logging_tx
143 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
144 .await
145 .unwrap_or_default();
146 }
147 continue;
148 }
149 };
150 let local_address: SocketAddr = match tcp.local_addr() {
151 Ok(data) => data,
152 Err(err) => {
153 if let Some(logging_tx) = &logging_tx {
154 logging_tx
155 .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
156 .await
157 .unwrap_or_default();
158 }
159 continue;
160 }
161 };
162
163 #[cfg(feature = "runtime-monoio")]
164 let tcp_data = {
165 #[cfg(unix)]
166 let tcp_std = {
167 use std::os::fd::{FromRawFd, IntoRawFd};
168 let raw_fd = tcp.into_raw_fd();
169 unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
174 };
175 #[cfg(windows)]
176 let tcp_std = {
177 use std::os::windows::io::{FromRawSocket, IntoRawSocket};
178 let raw_fd = tcp.into_raw_socket();
179 unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
183 };
184
185 let tcp_socket2 = socket2::Socket::from(tcp_std);
187 tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
188
189 let tcp_std = tcp_socket2.into();
190 ConnectionData {
191 connection: Connection::Tcp(tcp_std),
192 client_address: remote_address,
193 server_address: local_address,
194 }
195 };
196 #[cfg(feature = "runtime-tokio")]
197 let tcp_data = {
198 tcp.set_nodelay(true).unwrap_or_default();
199
200 ConnectionData {
201 connection: Connection::Tcp(tcp),
202 client_address: remote_address,
203 server_address: local_address,
204 }
205 };
206
207 let tcp_tx = tx.clone();
208 crate::runtime::spawn(async move {
209 tcp_tx.send(tcp_data).await.unwrap_or_default();
211 });
212 }
213}