ferron/
listener_tcp.rs

1use std::net::SocketAddr;
2use std::{error::Error, time::Duration};
3
4use async_channel::Sender;
5#[cfg(feature = "runtime-monoio")]
6use monoio::net::{ListenerOpts, TcpListener};
7#[cfg(feature = "runtime-tokio")]
8use tokio::net::TcpListener;
9use tokio_util::sync::CancellationToken;
10
11use crate::listener_handler_communication::{Connection, ConnectionData};
12use crate::logging::LogMessage;
13
14/// Creates a TCP listener
15pub fn create_tcp_listener(
16  address: SocketAddr,
17  encrypted: bool,
18  tx: Sender<ConnectionData>,
19  enable_uring: bool,
20  logging_tx: Option<Sender<LogMessage>>,
21  first_startup: bool,
22  tcp_buffer_sizes: (Option<usize>, Option<usize>),
23) -> Result<CancellationToken, Box<dyn Error + Send + Sync>> {
24  let shutdown_tx = CancellationToken::new();
25  let shutdown_rx = shutdown_tx.clone();
26  let (listen_error_tx, listen_error_rx) = async_channel::unbounded();
27  std::thread::Builder::new()
28        .name(format!("TCP listener for {address}"))
29        .spawn(move || {
30            crate::runtime::new_runtime(async move {
31      crate::runtime::select! {
32      result = tcp_listener_fn(address, encrypted, tx, &listen_error_tx, logging_tx, first_startup, tcp_buffer_sizes) => {
33          if let Some(error) = result.err() {
34              listen_error_tx.send(Some(error)).await.unwrap_or_default();
35          }
36        }
37        _ = shutdown_rx.cancelled() => {
38
39        }
40      }
41    }, enable_uring).unwrap();
42        })?;
43
44  if let Some(error) = listen_error_rx.recv_blocking()? {
45    Err(error)?;
46  }
47
48  Ok(shutdown_tx)
49}
50
51/// TCP listener function
52async fn tcp_listener_fn(
53  address: SocketAddr,
54  encrypted: bool,
55  tx: Sender<ConnectionData>,
56  listen_error_tx: &Sender<Option<Box<dyn Error + Send + Sync>>>,
57  logging_tx: Option<Sender<LogMessage>>,
58  first_startup: bool,
59  tcp_buffer_sizes: (Option<usize>, Option<usize>),
60) -> Result<(), Box<dyn Error + Send + Sync>> {
61  let mut listener_result;
62  let mut tries: u64 = 0;
63  loop {
64    #[cfg(feature = "runtime-monoio")]
65    let listener_opts = {
66      let mut listener_opts = ListenerOpts::new()
67        .reuse_addr(!cfg!(windows))
68        .reuse_port(false)
69        .backlog(-1);
70      if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
71        listener_opts = listener_opts.send_buf_size(tcp_send_buffer_size);
72      }
73      if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
74        listener_opts = listener_opts.recv_buf_size(tcp_recv_buffer_size);
75      }
76      listener_opts
77    };
78    #[cfg(feature = "runtime-monoio")]
79    let listener_result2 = TcpListener::bind_with_config(address, &listener_opts);
80    #[cfg(feature = "runtime-tokio")]
81    let listener_result2 = (|| {
82      let listener = std::net::TcpListener::bind(address)?;
83      listener.set_nonblocking(true).unwrap_or_default();
84      let listener_socket2 = socket2::Socket::from(listener);
85      if let Some(tcp_send_buffer_size) = tcp_buffer_sizes.0 {
86        listener_socket2
87          .set_send_buffer_size(tcp_send_buffer_size)
88          .unwrap_or_default();
89      }
90      if let Some(tcp_recv_buffer_size) = tcp_buffer_sizes.1 {
91        listener_socket2
92          .set_recv_buffer_size(tcp_recv_buffer_size)
93          .unwrap_or_default();
94      }
95      TcpListener::from_std(listener_socket2.into())
96    })();
97    listener_result = listener_result2;
98    if first_startup || listener_result.is_ok() {
99      break;
100    }
101    tries += 1;
102    if tries >= 10 {
103      if encrypted {
104        println!("HTTPS port is used at try #{tries}, skipping...");
105      } else {
106        println!("HTTP port is used at try #{tries}, skipping...");
107      }
108      listen_error_tx.send(None).await.unwrap_or_default();
109      break;
110    }
111    let duration = Duration::from_millis(1000);
112    if encrypted {
113      println!("HTTPS port is used at try #{tries}, retrying in {duration:?}...");
114    } else {
115      println!("HTTP port is used at try #{tries}, retrying in {duration:?}...");
116    }
117    crate::runtime::sleep(duration).await;
118  }
119  let listener = match listener_result {
120    Ok(listener) => listener,
121    Err(err) => {
122      if encrypted {
123        Err(anyhow::anyhow!(format!("Cannot listen to HTTPS port: {}", err)))?
124      } else {
125        Err(anyhow::anyhow!(format!("Cannot listen to HTTP port: {}", err)))?
126      }
127    }
128  };
129
130  if encrypted {
131    println!("HTTPS server is listening on {address}...");
132  } else {
133    println!("HTTP server is listening on {address}...");
134  }
135  listen_error_tx.send(None).await.unwrap_or_default();
136
137  loop {
138    let (tcp, remote_address) = match listener.accept().await {
139      Ok(data) => data,
140      Err(err) => {
141        if let Some(logging_tx) = &logging_tx {
142          logging_tx
143            .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
144            .await
145            .unwrap_or_default();
146        }
147        continue;
148      }
149    };
150    let local_address: SocketAddr = match tcp.local_addr() {
151      Ok(data) => data,
152      Err(err) => {
153        if let Some(logging_tx) = &logging_tx {
154          logging_tx
155            .send(LogMessage::new(format!("Cannot accept a connection: {err}"), true))
156            .await
157            .unwrap_or_default();
158        }
159        continue;
160      }
161    };
162
163    #[cfg(feature = "runtime-monoio")]
164    let tcp_data = {
165      #[cfg(unix)]
166      let tcp_std = {
167        use std::os::fd::{FromRawFd, IntoRawFd};
168        let raw_fd = tcp.into_raw_fd();
169        // Safety: We just extracted the raw file descriptor from the Monoio TcpStream,
170        // and we are immediately wrapping it in a std::net::TcpStream. No other code
171        // has access to the raw_fd in the interim, so we uphold the invariant that
172        // the fd is owned by only one entity at a time.
173        unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }
174      };
175      #[cfg(windows)]
176      let tcp_std = {
177        use std::os::windows::io::{FromRawSocket, IntoRawSocket};
178        let raw_fd = tcp.into_raw_socket();
179        // Safety: We extracted the raw socket from the Monoio TcpStream and are
180        // immediately converting it into a std::net::TcpStream. No duplication or
181        // other use of the raw socket occurs, so ownership and safety invariants are preserved.
182        unsafe { std::net::TcpStream::from_raw_socket(raw_fd) }
183      };
184
185      // Set TCP_NODELAY
186      let tcp_socket2 = socket2::Socket::from(tcp_std);
187      tcp_socket2.set_tcp_nodelay(true).unwrap_or_default();
188
189      let tcp_std = tcp_socket2.into();
190      ConnectionData {
191        connection: Connection::Tcp(tcp_std),
192        client_address: remote_address,
193        server_address: local_address,
194      }
195    };
196    #[cfg(feature = "runtime-tokio")]
197    let tcp_data = {
198      tcp.set_nodelay(true).unwrap_or_default();
199
200      ConnectionData {
201        connection: Connection::Tcp(tcp),
202        client_address: remote_address,
203        server_address: local_address,
204      }
205    };
206
207    let tcp_tx = tx.clone();
208    crate::runtime::spawn(async move {
209      // Send the `TcpStream` and socket addresses to the request handlers
210      tcp_tx.send(tcp_data).await.unwrap_or_default();
211    });
212  }
213}