ferron/listeners/
tcp.rs

1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5use ferron_common::logging::LogMessage;
6#[cfg(feature = "runtime-monoio")]
7use monoio::net::TcpListener;
8#[cfg(feature = "runtime-tokio")]
9use tokio::net::TcpListener;
10use tokio_util::sync::CancellationToken;
11
12use crate::listener_handler_communication::{Connection, ConnectionData};
13
14/// Creates a TCP listener
15#[allow(clippy::too_many_arguments)]
16pub fn create_tcp_listener(
17  address: SocketAddr,
18  encrypted: bool,
19  tx: Sender<ConnectionData>,
20  enable_uring: Option<bool>,
21  logging_tx: Option<Sender<LogMessage>>,
22  first_startup: bool,
23  tcp_buffer_sizes: (Option<usize>, Option<usize>),
24  io_uring_disabled: Sender<Option<std::io::Error>>,
25) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
26  let shutdown_tx = CancellationToken::new();
27  let shutdown_rx = shutdown_tx.clone();
28  let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
29  std::thread::Builder::new()
30    .name(format!("TCP listener for {address}"))
31    .spawn(move || {
32      let mut rt = match crate::runtime::Runtime::new_runtime(enable_uring) {
33        Ok(rt) => rt,
34        Err(error) => {
35          listen_error_tx
36            .send_blocking(Some(
37              anyhow::anyhow!("Can't create async runtime: {error}").into_boxed_dyn_error(),
38            ))
39            .unwrap_or_default();
40          return;
41        }
42      };
43      io_uring_disabled
44        .send_blocking(rt.return_io_uring_error())
45        .unwrap_or_default();
46      rt.run(async move {
47        let tcp_listener_future = tcp_listener_fn(
48          address,
49          encrypted,
50          tx,
51          &listen_error_tx,
52          logging_tx,
53          first_startup,
54          tcp_buffer_sizes,
55        );
56        crate::runtime::select! {
57          result = tcp_listener_future => {
58            if let Some(error) = result.err() {
59                listen_error_tx.send(Some(error)).await.unwrap_or_default();
60            }
61          }
62          _ = shutdown_rx.cancelled() => {
63
64          }
65        }
66      });
67    })?;
68
69  if let Some(error) = listen_error_rx.recv_blocking()? {
70    Err(error)?;
71  }
72
73  Ok(shutdown_tx)
74}
75
76/// TCP listener function
77async fn tcp_listener_fn(
78  address: SocketAddr,
79  encrypted: bool,
80  tx: Sender<ConnectionData>,
81  listen_error_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
82  logging_tx: Option<Sender<LogMessage>>,
83  first_startup: bool,
84  tcp_buffer_sizes: (Option<usize>, Option<usize>),
85) -> Result<(), Box<dyn Error + Send + Sync>> {
86  let mut listener_result;
87  let mut tries: u64 = 0;
88  loop {
89    listener_result = (|| {
90      // Create a new socket
91      let listener_socket2 = socket2::Socket::new(
92        if address.is_ipv6() {
93          socket2::Domain::IPV6
94        } else {
95          socket2::Domain::IPV4
96        },
97        socket2::Type::STREAM,
98        Some(socket2::Protocol::TCP),
99      )?;
100
101      // Set socket options
102      listener_socket2.set_reuse_address(!cfg!(windows)).unwrap_or_default();
103      #[cfg(unix)]
104      listener_socket2.set_reuse_port(false).unwrap_or_default();
105      if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
106        listener_socket2
107          .set_send_buffer_size(tcp_send_buffer_size)
108          .unwrap_or_default();
109      }
110      if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
111        listener_socket2
112          .set_recv_buffer_size(tcp_recv_buffer_size)
113          .unwrap_or_default();
114      }
115      if address.is_ipv6() {
116        listener_socket2.set_only_v6(false).unwrap_or_default();
117      }
118
119      #[cfg(feature = "runtime-monoio")]
120      let is_poll_io = monoio::utils::is_legacy();
121      #[cfg(feature = "runtime-tokio")]
122      let is_poll_io = true;
123
124      if is_poll_io {
125        listener_socket2.set_nonblocking(true).unwrap_or_default();
126      }
127
128      // Bind the socket to the address
129      listener_socket2.bind(&address.into())?;
130      listener_socket2.listen(-1)?;
131
132      // Wrap the socket into a TcpListener
133      TcpListener::from_std(listener_socket2.into())
134    })();
135    if first_startup || listener_result.is_ok() {
136      break;
137    }
138    tries += 1;
139    if tries >= 10 {
140      if encrypted {
141        println!("HTTPS port is used at try #{tries}, skipping...");
142      } else {
143        println!("HTTP port is used at try #{tries}, skipping...");
144      }
145      listen_error_tx.send(None).await.unwrap_or_default();
146      break;
147    }
148    let duration = Duration::from_millis(1000);
149    if encrypted {
150      println!("HTTPS port is used at try #{tries}, retrying in {duration:?}...");
151    } else {
152      println!("HTTP port is used at try #{tries}, retrying in {duration:?}...");
153    }
154    crate::runtime::sleep(duration).await;
155  }
156  let listener = match listener_result {
157    Ok(listener) => listener,
158    Err(err) => {
159      if encrypted {
160        Err(anyhow::anyhow!(format!("Cannot listen to HTTPS port: {}", err)))?
161      } else {
162        Err(anyhow::anyhow!(format!("Cannot listen to HTTP port: {}", err)))?
163      }
164    }
165  };
166
167  if encrypted {
168    println!("HTTPS server is listening on {address}...");
169  } else {
170    println!("HTTP server is listening on {address}...");
171  }
172  listen_error_tx.send(None).await.unwrap_or_default();
173
174  loop {
175    let (tcp, remote_address) = match listener.accept().await {
176      Ok(data) => data,
177      Err(err) => {
178        if let Some(logging_tx) = &logging_tx {
179          logging_tx
180            .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
181            .await
182            .unwrap_or_default();
183        }
184        continue;
185      }
186    };
187    let local_address: SocketAddr = match tcp.local_addr() {
188      Ok(data) => data,
189      Err(err) => {
190        if let Some(logging_tx) = &logging_tx {
191          logging_tx
192            .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
193            .await
194            .unwrap_or_default();
195        }
196        continue;
197      }
198    };
199
200    #[cfg(feature = "runtime-monoio")]
201    let tcp_data = {
202      #[cfg(unix)]
203      let tcp_std = {
204        use std::os::fd::{FromRawFd, IntoRawFd};
205        let raw_fd = tcp.into_raw_fd();
206        // Safety: We just extracted the raw file descriptor from the Monoio TcpStream,
207        // and we are immediately wrapping it in a std::net::TcpStream. No other code
208        // has access to the raw_fd in the interim, so we uphold the invariant that
209        // the fd is owned by only one entity at a time.
210        unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
211      };
212      #[cfg(windows)]
213      let tcp_std = {
214        use std::os::windows::io::{FromRawSocket, IntoRawSocket};
215        let raw_fd = tcp.into_raw_socket();
216        // Safety: We extracted the raw socket from the Monoio TcpStream and are
217        // immediately converting it into a std::net::TcpStream. No duplication or
218        // other use of the raw socket occurs, so ownership and safety invariants are preserved.
219        unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
220      };
221
222      // Set TCP_NODELAY
223      let tcp_socket2 = socket2::Socket::from(tcp_std);
224      tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
225
226      let tcp_std = tcp_socket2.into();
227      ConnectionData {
228        connection: Connection::Tcp(tcp_std),
229        client_address: remote_address,
230        server_address: local_address,
231      }
232    };
233    #[cfg(feature = "runtime-tokio")]
234    let tcp_data = {
235      tcp.set_nodelay(true).unwrap_or_default();
236
237      ConnectionData {
238        connection: Connection::Tcp(tcp),
239        client_address: remote_address,
240        server_address: local_address,
241      }
242    };
243
244    let tcp_tx = tx.clone();
245    crate::runtime::spawn(async move {
246      // Send the `TcpStream` and socket addresses to the request handlers
247      tcp_tx.send(tcp_data).await.unwrap_or_default();
248    });
249  }
250}