monoio/net/
udp.rs

1//! UDP impl.
2
3#[cfg(unix)]
4use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd};
5#[cfg(windows)]
6use std::os::windows::prelude::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
7use std::{
8    io,
9    net::{SocketAddr, ToSocketAddrs},
10};
11
12use crate::{
13    buf::{IoBuf, IoBufMut},
14    driver::{op::Op, shared_fd::SharedFd},
15    io::{operation_canceled, CancelHandle, Split},
16};
17
18/// A UDP socket.
19///
20/// After creating a `UdpSocket` by [`bind`]ing it to a socket address, data can be
21/// [sent to] and [received from] any other socket address.
22///
23/// Although UDP is a connectionless protocol, this implementation provides an interface
24/// to set an address where data should be sent and received from. After setting a remote
25/// address with [`connect`], data can be sent to and received from that address with
26/// [`send`] and [`recv`].
27#[derive(Debug)]
28pub struct UdpSocket {
29    fd: SharedFd,
30}
31
32/// UdpSocket is safe to split to two parts
33unsafe impl Split for UdpSocket {}
34
35impl UdpSocket {
36    pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
37        Self { fd }
38    }
39
40    #[cfg(feature = "legacy")]
41    fn set_non_blocking(_socket: &socket2::Socket) -> io::Result<()> {
42        crate::driver::CURRENT.with(|x| match x {
43            // TODO: windows ioring support
44            #[cfg(all(target_os = "linux", feature = "iouring"))]
45            crate::driver::Inner::Uring(_) => Ok(()),
46            crate::driver::Inner::Legacy(_) => _socket.set_nonblocking(true),
47        })
48    }
49
50    /// Creates a UDP socket from the given address.
51    pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
52        let addr = addr
53            .to_socket_addrs()?
54            .next()
55            .ok_or_else(|| io::Error::other("empty address"))?;
56        let domain = if addr.is_ipv6() {
57            socket2::Domain::IPV6
58        } else {
59            socket2::Domain::IPV4
60        };
61        let socket =
62            socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?;
63        #[cfg(feature = "legacy")]
64        Self::set_non_blocking(&socket)?;
65
66        let addr = socket2::SockAddr::from(addr);
67        socket.bind(&addr)?;
68
69        #[cfg(unix)]
70        let fd = socket.into_raw_fd();
71        #[cfg(windows)]
72        let fd = socket.into_raw_socket();
73
74        Ok(Self::from_shared_fd(SharedFd::new::<false>(fd)?))
75    }
76
77    /// Receives a single datagram message on the socket. On success, returns the number
78    /// of bytes read and the origin.
79    pub async fn recv_from<T: IoBufMut>(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> {
80        let op = Op::recv_msg(self.fd.clone(), buf).unwrap();
81        op.wait().await
82    }
83
84    /// Sends data on the socket to the given address. On success, returns the
85    /// number of bytes written.
86    pub async fn send_to<T: IoBuf>(
87        &self,
88        buf: T,
89        socket_addr: SocketAddr,
90    ) -> crate::BufResult<usize, T> {
91        let op = Op::send_msg(self.fd.clone(), buf, Some(socket_addr)).unwrap();
92        op.wait().await
93    }
94
95    /// Returns the socket address of the remote peer this socket was connected to.
96    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
97        #[cfg(unix)]
98        let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
99        #[cfg(windows)]
100        let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
101        let addr = socket.peer_addr();
102        #[cfg(unix)]
103        let _ = socket.into_raw_fd();
104        #[cfg(windows)]
105        let _ = socket.into_raw_socket();
106        addr?
107            .as_socket()
108            .ok_or_else(|| io::ErrorKind::InvalidInput.into())
109    }
110
111    /// Returns the socket address that this socket was created from.
112    pub fn local_addr(&self) -> io::Result<SocketAddr> {
113        #[cfg(unix)]
114        let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
115        #[cfg(windows)]
116        let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
117        let addr = socket.local_addr();
118        #[cfg(unix)]
119        let _ = socket.into_raw_fd();
120        #[cfg(windows)]
121        let _ = socket.into_raw_socket();
122        addr?
123            .as_socket()
124            .ok_or_else(|| io::ErrorKind::InvalidInput.into())
125    }
126
127    /// Connects this UDP socket to a remote address, allowing the `send` and
128    /// `recv` syscalls to be used to send data and also applies filters to only
129    /// receive data from the specified address.
130    pub async fn connect(&self, socket_addr: SocketAddr) -> io::Result<()> {
131        let op = Op::connect(self.fd.clone(), socket_addr, false)?;
132        let completion = op.await;
133        completion.meta.result?;
134        Ok(())
135    }
136
137    /// Sends data on the socket to the remote address to which it is connected.
138    pub async fn send<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
139        let op = Op::send_msg(self.fd.clone(), buf, None).unwrap();
140        op.wait().await
141    }
142
143    /// Receives a single datagram message on the socket from the remote address to
144    /// which it is connected. On success, returns the number of bytes read.
145    pub async fn recv<T: IoBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
146        let op = Op::recv(self.fd.clone(), buf).unwrap();
147        op.result().await
148    }
149
150    /// Creates new `UdpSocket` from a `std::net::UdpSocket`.
151    pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
152        #[cfg(unix)]
153        let fd = socket.as_raw_fd();
154        #[cfg(windows)]
155        let fd = socket.as_raw_socket();
156        match SharedFd::new::<false>(fd) {
157            Ok(shared) => {
158                #[cfg(unix)]
159                let _ = socket.into_raw_fd();
160                #[cfg(windows)]
161                let _ = socket.into_raw_socket();
162                Ok(Self::from_shared_fd(shared))
163            }
164            Err(e) => Err(e),
165        }
166    }
167
168    /// Set value for the `SO_REUSEADDR` option on this socket.
169    #[allow(unused_variables)]
170    pub fn set_reuse_address(&self, reuse: bool) -> io::Result<()> {
171        #[cfg(unix)]
172        let r = {
173            let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
174            let r = socket.set_reuse_address(reuse);
175            let _ = socket.into_raw_fd();
176            r
177        };
178        #[cfg(windows)]
179        let r = {
180            let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
181            let _ = socket.into_raw_socket();
182            Ok(())
183        };
184        r
185    }
186
187    /// Set value for the `SO_REUSEPORT` option on this socket.
188    #[allow(unused_variables)]
189    pub fn set_reuse_port(&self, reuse: bool) -> io::Result<()> {
190        #[cfg(unix)]
191        let r = {
192            let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
193            let r = socket.set_reuse_port(reuse);
194            let _ = socket.into_raw_fd();
195            r
196        };
197        #[cfg(windows)]
198        let r = {
199            let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
200            let _ = socket.into_raw_socket();
201            Ok(())
202        };
203        r
204    }
205
206    /// Wait for read readiness.
207    /// Note: Do not use it before every io. It is different from other runtimes!
208    ///
209    /// Everytime call to this method may pay a syscall cost.
210    /// In uring impl, it will push a PollAdd op; in epoll impl, it will use use
211    /// inner readiness state; if !relaxed, it will call syscall poll after that.
212    ///
213    /// If relaxed, on legacy driver it may return false positive result.
214    /// If you want to do io by your own, you must maintain io readiness and wait
215    /// for io ready with relaxed=false.
216    pub async fn readable(&self, relaxed: bool) -> io::Result<()> {
217        let op = Op::poll_read(&self.fd, relaxed).unwrap();
218        op.wait().await
219    }
220
221    /// Wait for write readiness.
222    /// Note: Do not use it before every io. It is different from other runtimes!
223    ///
224    /// Everytime call to this method may pay a syscall cost.
225    /// In uring impl, it will push a PollAdd op; in epoll impl, it will use use
226    /// inner readiness state; if !relaxed, it will call syscall poll after that.
227    ///
228    /// If relaxed, on legacy driver it may return false positive result.
229    /// If you want to do io by your own, you must maintain io readiness and wait
230    /// for io ready with relaxed=false.
231    pub async fn writable(&self, relaxed: bool) -> io::Result<()> {
232        let op = Op::poll_write(&self.fd, relaxed).unwrap();
233        op.wait().await
234    }
235}
236
237#[cfg(unix)]
238impl AsRawFd for UdpSocket {
239    fn as_raw_fd(&self) -> std::os::fd::RawFd {
240        self.fd.raw_fd()
241    }
242}
243
244#[cfg(windows)]
245impl AsRawSocket for UdpSocket {
246    fn as_raw_socket(&self) -> RawSocket {
247        self.fd.raw_socket()
248    }
249}
250
251/// Cancelable related methods
252impl UdpSocket {
253    /// Receives a single datagram message on the socket. On success, returns the number
254    /// of bytes read and the origin.
255    pub async fn cancelable_recv_from<T: IoBufMut>(
256        &self,
257        buf: T,
258        c: CancelHandle,
259    ) -> crate::BufResult<(usize, SocketAddr), T> {
260        if c.canceled() {
261            return (Err(operation_canceled()), buf);
262        }
263
264        let op = Op::recv_msg(self.fd.clone(), buf).unwrap();
265        let _guard = c.associate_op(op.op_canceller());
266        op.wait().await
267    }
268
269    /// Sends data on the socket to the given address. On success, returns the
270    /// number of bytes written.
271    pub async fn cancelable_send_to<T: IoBuf>(
272        &self,
273        buf: T,
274        socket_addr: SocketAddr,
275        c: CancelHandle,
276    ) -> crate::BufResult<usize, T> {
277        if c.canceled() {
278            return (Err(operation_canceled()), buf);
279        }
280
281        let op = Op::send_msg(self.fd.clone(), buf, Some(socket_addr)).unwrap();
282        let _guard = c.associate_op(op.op_canceller());
283        op.wait().await
284    }
285
286    /// Sends data on the socket to the remote address to which it is connected.
287    pub async fn cancelable_send<T: IoBuf>(
288        &self,
289        buf: T,
290        c: CancelHandle,
291    ) -> crate::BufResult<usize, T> {
292        if c.canceled() {
293            return (Err(operation_canceled()), buf);
294        }
295
296        let op = Op::send_msg(self.fd.clone(), buf, None).unwrap();
297        let _guard = c.associate_op(op.op_canceller());
298        op.wait().await
299    }
300
301    /// Receives a single datagram message on the socket from the remote address to
302    /// which it is connected. On success, returns the number of bytes read.
303    pub async fn cancelable_recv<T: IoBufMut>(
304        &self,
305        buf: T,
306        c: CancelHandle,
307    ) -> crate::BufResult<usize, T> {
308        if c.canceled() {
309            return (Err(operation_canceled()), buf);
310        }
311
312        let op = Op::recv(self.fd.clone(), buf).unwrap();
313        let _guard = c.associate_op(op.op_canceller());
314        op.result().await
315    }
316}