1#[cfg(unix)]
4use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd};
5#[cfg(windows)]
6use std::os::windows::prelude::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
7use std::{
8 io,
9 net::{SocketAddr, ToSocketAddrs},
10};
11
12use crate::{
13 buf::{IoBuf, IoBufMut},
14 driver::{op::Op, shared_fd::SharedFd},
15 io::{operation_canceled, CancelHandle, Split},
16};
17
18#[derive(Debug)]
28pub struct UdpSocket {
29 fd: SharedFd,
30}
31
32unsafe impl Split for UdpSocket {}
34
35impl UdpSocket {
36 pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
37 Self { fd }
38 }
39
40 #[cfg(feature = "legacy")]
41 fn set_non_blocking(_socket: &socket2::Socket) -> io::Result<()> {
42 crate::driver::CURRENT.with(|x| match x {
43 #[cfg(all(target_os = "linux", feature = "iouring"))]
45 crate::driver::Inner::Uring(_) => Ok(()),
46 crate::driver::Inner::Legacy(_) => _socket.set_nonblocking(true),
47 })
48 }
49
50 pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
52 let addr = addr
53 .to_socket_addrs()?
54 .next()
55 .ok_or_else(|| io::Error::other("empty address"))?;
56 let domain = if addr.is_ipv6() {
57 socket2::Domain::IPV6
58 } else {
59 socket2::Domain::IPV4
60 };
61 let socket =
62 socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?;
63 #[cfg(feature = "legacy")]
64 Self::set_non_blocking(&socket)?;
65
66 let addr = socket2::SockAddr::from(addr);
67 socket.bind(&addr)?;
68
69 #[cfg(unix)]
70 let fd = socket.into_raw_fd();
71 #[cfg(windows)]
72 let fd = socket.into_raw_socket();
73
74 Ok(Self::from_shared_fd(SharedFd::new::<false>(fd)?))
75 }
76
77 pub async fn recv_from<T: IoBufMut>(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> {
80 let op = Op::recv_msg(self.fd.clone(), buf).unwrap();
81 op.wait().await
82 }
83
84 pub async fn send_to<T: IoBuf>(
87 &self,
88 buf: T,
89 socket_addr: SocketAddr,
90 ) -> crate::BufResult<usize, T> {
91 let op = Op::send_msg(self.fd.clone(), buf, Some(socket_addr)).unwrap();
92 op.wait().await
93 }
94
95 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
97 #[cfg(unix)]
98 let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
99 #[cfg(windows)]
100 let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
101 let addr = socket.peer_addr();
102 #[cfg(unix)]
103 let _ = socket.into_raw_fd();
104 #[cfg(windows)]
105 let _ = socket.into_raw_socket();
106 addr?
107 .as_socket()
108 .ok_or_else(|| io::ErrorKind::InvalidInput.into())
109 }
110
111 pub fn local_addr(&self) -> io::Result<SocketAddr> {
113 #[cfg(unix)]
114 let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
115 #[cfg(windows)]
116 let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
117 let addr = socket.local_addr();
118 #[cfg(unix)]
119 let _ = socket.into_raw_fd();
120 #[cfg(windows)]
121 let _ = socket.into_raw_socket();
122 addr?
123 .as_socket()
124 .ok_or_else(|| io::ErrorKind::InvalidInput.into())
125 }
126
127 pub async fn connect(&self, socket_addr: SocketAddr) -> io::Result<()> {
131 let op = Op::connect(self.fd.clone(), socket_addr, false)?;
132 let completion = op.await;
133 completion.meta.result?;
134 Ok(())
135 }
136
137 pub async fn send<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
139 let op = Op::send_msg(self.fd.clone(), buf, None).unwrap();
140 op.wait().await
141 }
142
143 pub async fn recv<T: IoBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
146 let op = Op::recv(self.fd.clone(), buf).unwrap();
147 op.result().await
148 }
149
150 pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
152 #[cfg(unix)]
153 let fd = socket.as_raw_fd();
154 #[cfg(windows)]
155 let fd = socket.as_raw_socket();
156 match SharedFd::new::<false>(fd) {
157 Ok(shared) => {
158 #[cfg(unix)]
159 let _ = socket.into_raw_fd();
160 #[cfg(windows)]
161 let _ = socket.into_raw_socket();
162 Ok(Self::from_shared_fd(shared))
163 }
164 Err(e) => Err(e),
165 }
166 }
167
168 #[allow(unused_variables)]
170 pub fn set_reuse_address(&self, reuse: bool) -> io::Result<()> {
171 #[cfg(unix)]
172 let r = {
173 let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
174 let r = socket.set_reuse_address(reuse);
175 let _ = socket.into_raw_fd();
176 r
177 };
178 #[cfg(windows)]
179 let r = {
180 let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
181 let _ = socket.into_raw_socket();
182 Ok(())
183 };
184 r
185 }
186
187 #[allow(unused_variables)]
189 pub fn set_reuse_port(&self, reuse: bool) -> io::Result<()> {
190 #[cfg(unix)]
191 let r = {
192 let socket = unsafe { socket2::Socket::from_raw_fd(self.fd.as_raw_fd()) };
193 let r = socket.set_reuse_port(reuse);
194 let _ = socket.into_raw_fd();
195 r
196 };
197 #[cfg(windows)]
198 let r = {
199 let socket = unsafe { socket2::Socket::from_raw_socket(self.fd.as_raw_socket()) };
200 let _ = socket.into_raw_socket();
201 Ok(())
202 };
203 r
204 }
205
206 pub async fn readable(&self, relaxed: bool) -> io::Result<()> {
217 let op = Op::poll_read(&self.fd, relaxed).unwrap();
218 op.wait().await
219 }
220
221 pub async fn writable(&self, relaxed: bool) -> io::Result<()> {
232 let op = Op::poll_write(&self.fd, relaxed).unwrap();
233 op.wait().await
234 }
235}
236
237#[cfg(unix)]
238impl AsRawFd for UdpSocket {
239 fn as_raw_fd(&self) -> std::os::fd::RawFd {
240 self.fd.raw_fd()
241 }
242}
243
244#[cfg(windows)]
245impl AsRawSocket for UdpSocket {
246 fn as_raw_socket(&self) -> RawSocket {
247 self.fd.raw_socket()
248 }
249}
250
251impl UdpSocket {
253 pub async fn cancelable_recv_from<T: IoBufMut>(
256 &self,
257 buf: T,
258 c: CancelHandle,
259 ) -> crate::BufResult<(usize, SocketAddr), T> {
260 if c.canceled() {
261 return (Err(operation_canceled()), buf);
262 }
263
264 let op = Op::recv_msg(self.fd.clone(), buf).unwrap();
265 let _guard = c.associate_op(op.op_canceller());
266 op.wait().await
267 }
268
269 pub async fn cancelable_send_to<T: IoBuf>(
272 &self,
273 buf: T,
274 socket_addr: SocketAddr,
275 c: CancelHandle,
276 ) -> crate::BufResult<usize, T> {
277 if c.canceled() {
278 return (Err(operation_canceled()), buf);
279 }
280
281 let op = Op::send_msg(self.fd.clone(), buf, Some(socket_addr)).unwrap();
282 let _guard = c.associate_op(op.op_canceller());
283 op.wait().await
284 }
285
286 pub async fn cancelable_send<T: IoBuf>(
288 &self,
289 buf: T,
290 c: CancelHandle,
291 ) -> crate::BufResult<usize, T> {
292 if c.canceled() {
293 return (Err(operation_canceled()), buf);
294 }
295
296 let op = Op::send_msg(self.fd.clone(), buf, None).unwrap();
297 let _guard = c.associate_op(op.op_canceller());
298 op.wait().await
299 }
300
301 pub async fn cancelable_recv<T: IoBufMut>(
304 &self,
305 buf: T,
306 c: CancelHandle,
307 ) -> crate::BufResult<usize, T> {
308 if c.canceled() {
309 return (Err(operation_canceled()), buf);
310 }
311
312 let op = Op::recv(self.fd.clone(), buf).unwrap();
313 let _guard = c.associate_op(op.op_canceller());
314 op.result().await
315 }
316}