ferron/listeners/
tcp.rs

1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5use ferron_common::logging::LogMessage;
6#[cfg(feature = "runtime-monoio")]
7use monoio::net::TcpListener;
8#[cfg(feature = "runtime-tokio")]
9use tokio::net::TcpListener;
10use tokio_util::sync::CancellationToken;
11
12use crate::listener_handler_communication::{Connection, ConnectionData};
13
14/// Creates a TCP listener
15#[allow(clippy::too_many_arguments)]
16pub fn create_tcp_listener(
17  address: SocketAddr,
18  encrypted: bool,
19  tx: Sender<ConnectionData>,
20  enable_uring: Option<bool>,
21  logging_tx: Option<Sender<LogMessage>>,
22  first_startup: bool,
23  tcp_buffer_sizes: (Option<usize>, Option<usize>),
24  io_uring_disabled: Sender<Option<std::io::Error>>,
25) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
26  let shutdown_tx = CancellationToken::new();
27  let shutdown_rx = shutdown_tx.clone();
28  let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
29  std::thread::Builder::new()
30    .name(format!("TCP listener for {address}"))
31    .spawn(move || {
32      let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
33        Ok(rt) => rt,
34        Err(error) => {
35          listen_error_tx
36            .send_blocking(Some(
37              anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
38            ))
39            .unwrap_or_default();
40          return;
41        }
42      };
43      io_uring_disabled
44        .send_blocking(rt.return_io_uring_error())
45        .unwrap_or_default();
46      rt.run(async move {
47        if let Err(error) = tcp_listener_fn(
48          address,
49          encrypted,
50          tx,
51          &listen_error_tx,
52          logging_tx,
53          first_startup,
54          tcp_buffer_sizes,
55          shutdown_rx,
56        )
57        .await
58        {
59          listen_error_tx.send(Some(error)).await.unwrap_or_default();
60        }
61      });
62    })?;
63
64  if let Some(error) = listen_error_rx.recv_blocking()? {
65    Err(error)?;
66  }
67
68  Ok(shutdown_tx)
69}
70
71/// TCP listener function
72#[allow(clippy::too_many_arguments)]
73async fn tcp_listener_fn(
74  address: SocketAddr,
75  encrypted: bool,
76  tx: Sender<ConnectionData>,
77  listen_error_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
78  logging_tx: Option<Sender<LogMessage>>,
79  first_startup: bool,
80  tcp_buffer_sizes: (Option<usize>, Option<usize>),
81  shutdown_rx: CancellationToken,
82) -> Result<(), Box<dyn Error + Send + Sync>> {
83  let mut listener_result;
84  let mut tries: u64 = 0;
85  loop {
86    listener_result = (|| {
87      // Create a new socket
88      let listener_socket2 = socket2::Socket::new(
89        if address.is_ipv6() {
90          socket2::Domain::IPV6
91        } else {
92          socket2::Domain::IPV4
93        },
94        socket2::Type::STREAM,
95        Some(socket2::Protocol::TCP),
96      )?;
97
98      // Set socket options
99      listener_socket2.set_reuse_address(!cfg!(windows)).unwrap_or_default();
100      #[cfg(unix)]
101      listener_socket2.set_reuse_port(false).unwrap_or_default();
102      if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
103        listener_socket2
104          .set_send_buffer_size(tcp_send_buffer_size)
105          .unwrap_or_default();
106      }
107      if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
108        listener_socket2
109          .set_recv_buffer_size(tcp_recv_buffer_size)
110          .unwrap_or_default();
111      }
112      if address.is_ipv6() {
113        listener_socket2.set_only_v6(false).unwrap_or_default();
114      }
115
116      #[cfg(feature = "runtime-monoio")]
117      let is_poll_io = monoio::utils::is_legacy();
118      #[cfg(feature = "runtime-tokio")]
119      let is_poll_io = true;
120
121      if is_poll_io {
122        listener_socket2.set_nonblocking(true).unwrap_or_default();
123      }
124
125      // Bind the socket to the address
126      listener_socket2.bind(&address.into())?;
127      listener_socket2.listen(-1)?;
128
129      // Wrap the socket into a TcpListener
130      TcpListener::from_std(listener_socket2.into())
131    })();
132    if first_startup || listener_result.is_ok() {
133      break;
134    }
135    tries += 1;
136    if tries >= 10 {
137      if encrypted {
138        println!("HTTPS port is used at try #{tries}, skipping...");
139      } else {
140        println!("HTTP port is used at try #{tries}, skipping...");
141      }
142      listen_error_tx.send(None).await.unwrap_or_default();
143      break;
144    }
145    let duration = Duration::from_millis(1000);
146    if encrypted {
147      println!("HTTPS port is used at try #{tries}, retrying in {duration:?}...");
148    } else {
149      println!("HTTP port is used at try #{tries}, retrying in {duration:?}...");
150    }
151    crate::runtime::sleep(duration).await;
152  }
153  let listener = match listener_result {
154    Ok(listener) => listener,
155    Err(err) => {
156      if encrypted {
157        Err(anyhow::anyhow!(format!("Cannot listen to HTTPS port: {}", err)))?
158      } else {
159        Err(anyhow::anyhow!(format!("Cannot listen to HTTP port: {}", err)))?
160      }
161    }
162  };
163
164  if encrypted {
165    println!("HTTPS server is listening on {address}...");
166  } else {
167    println!("HTTP server is listening on {address}...");
168  }
169  listen_error_tx.send(None).await.unwrap_or_default();
170
171  loop {
172    let (tcp, remote_address) = match crate::runtime::select! {
173      result = listener.accept() => {
174        result
175      }
176      _ = shutdown_rx.cancelled() => {
177        return Ok(());
178      }
179    } {
180      Ok(data) => data,
181      Err(err) => {
182        if let Some(logging_tx) = &logging_tx {
183          logging_tx
184            .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
185            .await
186            .unwrap_or_default();
187        }
188        continue;
189      }
190    };
191    let local_address: SocketAddr = match tcp.local_addr() {
192      Ok(data) => data,
193      Err(err) => {
194        if let Some(logging_tx) = &logging_tx {
195          logging_tx
196            .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
197            .await
198            .unwrap_or_default();
199        }
200        continue;
201      }
202    };
203
204    #[cfg(feature = "runtime-monoio")]
205    let tcp_data = {
206      #[cfg(unix)]
207      let tcp_std = {
208        use std::os::fd::{FromRawFd, IntoRawFd};
209        let raw_fd = tcp.into_raw_fd();
210        // Safety: We just extracted the raw file descriptor from the Monoio TcpStream,
211        // and we are immediately wrapping it in a std::net::TcpStream. No other code
212        // has access to the raw_fd in the interim, so we uphold the invariant that
213        // the fd is owned by only one entity at a time.
214        unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
215      };
216      #[cfg(windows)]
217      let tcp_std = {
218        use std::os::windows::io::{FromRawSocket, IntoRawSocket};
219        let raw_fd = tcp.into_raw_socket();
220        // Safety: We extracted the raw socket from the Monoio TcpStream and are
221        // immediately converting it into a std::net::TcpStream. No duplication or
222        // other use of the raw socket occurs, so ownership and safety invariants are preserved.
223        unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
224      };
225
226      // Set TCP_NODELAY
227      let tcp_socket2 = socket2::Socket::from(tcp_std);
228      tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
229
230      let tcp_std = tcp_socket2.into();
231      ConnectionData {
232        connection: Connection::Tcp(tcp_std),
233        client_address: remote_address,
234        server_address: local_address,
235      }
236    };
237    #[cfg(feature = "runtime-tokio")]
238    let tcp_data = {
239      tcp.set_nodelay(true).unwrap_or_default();
240
241      ConnectionData {
242        connection: Connection::Tcp(tcp),
243        client_address: remote_address,
244        server_address: local_address,
245      }
246    };
247
248    let tcp_tx = tx.clone();
249    crate::runtime::spawn(async move {
250      // Send the `TcpStream` and socket addresses to the request handlers
251      tcp_tx.send(tcp_data).await.unwrap_or_default();
252    });
253  }
254}