ferron/listeners/
tcp.rs

1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5use ferron_common::logging::LogMessage;
6#[cfg(feature = "runtime-monoio")]
7use monoio::net::TcpListener;
8#[cfg(feature = "runtime-tokio")]
9use tokio::net::TcpListener;
10use tokio_util::sync::CancellationToken;
11#[cfg(feature = "runtime-vibeio")]
12use vibeio::net::TcpListener;
13
14use crate::listener_handler_communication::{Connection, ConnectionData};
15
16type ListenerError = Box<dyn Error + Send + Sync>;
17type ListenerResult = Result<TcpListener, std::io::Error>;
18
19#[inline]
20fn protocol_name(encrypted: bool) -> &'static str {
21  if encrypted {
22    "HTTPS"
23  } else {
24    "HTTP"
25  }
26}
27
28#[inline]
29fn listen_error_message(encrypted: bool, err: &std::io::Error) -> anyhow::Error {
30  anyhow::anyhow!("Cannot listen to {} port: {err}", protocol_name(encrypted))
31}
32
33#[inline]
34fn log_retry(encrypted: bool, tries: u64, duration: Duration) {
35  println!(
36    "{} port is used at try #{tries}, retrying in {duration:?}...",
37    protocol_name(encrypted)
38  );
39}
40
41#[inline]
42fn log_skip(encrypted: bool, tries: u64) {
43  println!("{} port is used at try #{tries}, skipping...", protocol_name(encrypted));
44}
45
46#[inline]
47fn log_listening(encrypted: bool, address: SocketAddr) {
48  println!("{} server is listening on {address}...", protocol_name(encrypted));
49}
50
51#[inline]
52fn build_tcp_listener(address: SocketAddr, tcp_buffer_sizes: (Option<usize>, Option<usize>)) -> ListenerResult {
53  // Create a new socket
54  let listener_socket2 = socket2::Socket::new(
55    if address.is_ipv6() {
56      socket2::Domain::IPV6
57    } else {
58      socket2::Domain::IPV4
59    },
60    socket2::Type::STREAM,
61    Some(socket2::Protocol::TCP),
62  )?;
63
64  // Set socket options
65  listener_socket2.set_reuse_address(!cfg!(windows)).unwrap_or_default();
66  #[cfg(unix)]
67  listener_socket2.set_reuse_port(false).unwrap_or_default();
68  if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
69    listener_socket2
70      .set_send_buffer_size(tcp_send_buffer_size)
71      .unwrap_or_default();
72  }
73  if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
74    listener_socket2
75      .set_recv_buffer_size(tcp_recv_buffer_size)
76      .unwrap_or_default();
77  }
78  if address.is_ipv6() {
79    listener_socket2.set_only_v6(false).unwrap_or_default();
80  }
81
82  #[cfg(feature = "runtime-monoio")]
83  let is_poll_io = monoio::utils::is_legacy();
84  #[cfg(feature = "runtime-vibeio")]
85  let is_poll_io = !vibeio::util::supports_completion();
86  #[cfg(feature = "runtime-tokio")]
87  let is_poll_io = true;
88
89  if is_poll_io {
90    listener_socket2.set_nonblocking(true).unwrap_or_default();
91  }
92
93  // Bind the socket to the address
94  listener_socket2.bind(&address.into())?;
95  listener_socket2.listen(-1)?;
96
97  // Wrap the socket into a TcpListener
98  TcpListener::from_std(listener_socket2.into())
99}
100
101#[inline]
102async fn log_accept_error(logging_tx: &Option<Sender<LogMessage>>, err: &std::io::Error) {
103  if let Some(logging_tx) = logging_tx {
104    logging_tx
105      .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
106      .await
107      .unwrap_or_default();
108  }
109}
110
111/// Creates a TCP listener
112#[allow(clippy::too_many_arguments)]
113pub fn create_tcp_listener(
114  address: SocketAddr,
115  encrypted: bool,
116  tx: Sender<ConnectionData>,
117  enable_uring: Option<bool>,
118  logging_tx: Option<Sender<LogMessage>>,
119  first_startup: bool,
120  tcp_buffer_sizes: (Option<usize>, Option<usize>),
121  io_uring_disabled: Sender<Option<std::io::Error>>,
122) -> Result<CancellationToken, ListenerError> {
123  let shutdown_tx = CancellationToken::new();
124  let shutdown_rx = shutdown_tx.clone();
125  let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
126  std::thread::Builder::new()
127    .name(format!("TCP listener for {address}"))
128    .spawn(move || {
129      let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
130        Ok(rt) => rt,
131        Err(error) => {
132          listen_error_tx
133            .send_blocking(Some(
134              anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
135            ))
136            .unwrap_or_default();
137          return;
138        }
139      };
140      io_uring_disabled
141        .send_blocking(rt.return_io_uring_error())
142        .unwrap_or_default();
143      rt.run(async move {
144        if let Err(error) = tcp_listener_fn(
145          address,
146          encrypted,
147          tx,
148          &listen_error_tx,
149          logging_tx,
150          first_startup,
151          tcp_buffer_sizes,
152          shutdown_rx,
153        )
154        .await
155        {
156          listen_error_tx.send(Some(error)).await.unwrap_or_default();
157        }
158      });
159    })?;
160
161  if let Some(error) = listen_error_rx.recv_blocking()? {
162    Err(error)?;
163  }
164
165  Ok(shutdown_tx)
166}
167
168/// TCP listener function
169#[allow(clippy::too_many_arguments)]
170async fn tcp_listener_fn(
171  address: SocketAddr,
172  encrypted: bool,
173  tx: Sender<ConnectionData>,
174  listen_error_tx: &Sender<Option<ListenerError>>,
175  logging_tx: Option<Sender<LogMessage>>,
176  first_startup: bool,
177  tcp_buffer_sizes: (Option<usize>, Option<usize>),
178  shutdown_rx: CancellationToken,
179) -> Result<(), ListenerError> {
180  let mut listener_result;
181  let mut tries: u64 = 0;
182  loop {
183    listener_result = build_tcp_listener(address, tcp_buffer_sizes);
184    if first_startup || listener_result.is_ok() {
185      break;
186    }
187    tries += 1;
188    if tries >= 10 {
189      log_skip(encrypted, tries);
190      listen_error_tx.send(None).await.unwrap_or_default();
191      break;
192    }
193    let duration = Duration::from_millis(1000);
194    log_retry(encrypted, tries, duration);
195    crate::runtime::sleep(duration).await;
196  }
197  let listener = match listener_result {
198    Ok(listener) => listener,
199    Err(err) => Err(listen_error_message(encrypted, &err))?,
200  };
201
202  log_listening(encrypted, address);
203  listen_error_tx.send(None).await.unwrap_or_default();
204
205  #[cfg(unix)]
206  let mut handle_exhaustion_backoff = Duration::from_millis(10);
207
208  loop {
209    let (tcp, remote_address) = match crate::runtime::select! {
210      result = listener.accept() => {
211        result
212      }
213      _ = shutdown_rx.cancelled() => {
214        return Ok(());
215      }
216    } {
217      Ok(data) => {
218        #[cfg(unix)]
219        {
220          handle_exhaustion_backoff = Duration::from_millis(10);
221        }
222        data
223      }
224      Err(err) => {
225        log_accept_error(&logging_tx, &err).await;
226
227        // 24 = EMFILE
228        #[cfg(unix)]
229        if err.raw_os_error() == Some(24) {
230          crate::runtime::sleep(handle_exhaustion_backoff).await;
231          handle_exhaustion_backoff *= 2;
232          if handle_exhaustion_backoff > Duration::from_secs(1) {
233            handle_exhaustion_backoff = Duration::from_secs(1);
234          }
235        }
236
237        continue;
238      }
239    };
240    let local_address: SocketAddr = match tcp.local_addr() {
241      Ok(data) => data,
242      Err(err) => {
243        log_accept_error(&logging_tx, &err).await;
244        continue;
245      }
246    };
247
248    #[cfg(any(feature = "runtime-vibeio", feature = "runtime-monoio"))]
249    let tcp_data = {
250      #[cfg(unix)]
251      let tcp_std = {
252        use std::os::fd::{FromRawFd, IntoRawFd};
253        let raw_fd = tcp.into_raw_fd();
254        // Safety: We just extracted the raw file descriptor from the Monoio TcpStream,
255        // and we are immediately wrapping it in a std::net::TcpStream. No other code
256        // has access to the raw_fd in the interim, so we uphold the invariant that
257        // the fd is owned by only one entity at a time.
258        unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
259      };
260      #[cfg(windows)]
261      let tcp_std = {
262        use std::os::windows::io::{FromRawSocket, IntoRawSocket};
263        let raw_fd = tcp.into_raw_socket();
264        // Safety: We extracted the raw socket from the Monoio TcpStream and are
265        // immediately converting it into a std::net::TcpStream. No duplication or
266        // other use of the raw socket occurs, so ownership and safety invariants are preserved.
267        unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
268      };
269
270      // Set TCP_NODELAY
271      let tcp_socket2 = socket2::Socket::from(tcp_std);
272      tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
273
274      let tcp_std = tcp_socket2.into();
275      ConnectionData {
276        connection: Connection::Tcp(tcp_std),
277        client_address: remote_address,
278        server_address: local_address,
279      }
280    };
281    #[cfg(feature = "runtime-tokio")]
282    let tcp_data = {
283      tcp.set_nodelay(true).unwrap_or_default();
284
285      ConnectionData {
286        connection: Connection::Tcp(tcp),
287        client_address: remote_address,
288        server_address: local_address,
289      }
290    };
291
292    let tcp_tx = tx.clone();
293    crate::runtime::spawn(async move {
294      // Send the `TcpStream` and socket addresses to the request handlers
295      tcp_tx.send(tcp_data).await.unwrap_or_default();
296    });
297  }
298}