monoio/net/unix/
listener.rs1use std::{
2 io,
3 mem::{ManuallyDrop, MaybeUninit},
4 os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
5 path::Path,
6};
7
8use super::{socket_addr::SocketAddr, UnixStream};
9use crate::{
10 driver::{op::Op, shared_fd::SharedFd},
11 io::{stream::Stream, CancelHandle},
12 net::ListenerOpts,
13};
14
15pub struct UnixListener {
17 fd: SharedFd,
18 sys_listener: Option<std::os::unix::net::UnixListener>,
19}
20
21impl UnixListener {
22 pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
23 let sys_listener = unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd.raw_fd()) };
24 Self {
25 fd,
26 sys_listener: Some(sys_listener),
27 }
28 }
29
30 pub fn bind_with_config<P: AsRef<Path>>(
33 path: P,
34 config: &ListenerOpts,
35 ) -> io::Result<UnixListener> {
36 let sys_listener =
37 socket2::Socket::new(socket2::Domain::UNIX, socket2::Type::STREAM, None)?;
38 let addr = socket2::SockAddr::unix(path)?;
39
40 if config.reuse_port {
41 }
45 if config.reuse_addr {
46 sys_listener.set_reuse_address(true)?;
47 }
48 if let Some(send_buf_size) = config.send_buf_size {
49 sys_listener.set_send_buffer_size(send_buf_size)?;
50 }
51 if let Some(recv_buf_size) = config.recv_buf_size {
52 sys_listener.set_recv_buffer_size(recv_buf_size)?;
53 }
54
55 sys_listener.bind(&addr)?;
56 sys_listener.listen(config.backlog)?;
57
58 let fd = SharedFd::new::<false>(sys_listener.into_raw_fd())?;
59
60 Ok(Self::from_shared_fd(fd))
61 }
62
63 pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
66 Self::bind_with_config(path, &ListenerOpts::default())
67 }
68
69 pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
71 let op = Op::accept(&self.fd)?;
72
73 let completion = op.await;
75
76 let fd = completion.meta.result?;
78
79 let stream = UnixStream::from_shared_fd(SharedFd::new::<false>(fd.into_inner() as _)?);
81
82 let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) };
84 let storage: *mut libc::sockaddr_storage = &mut storage as *mut _;
85 let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() };
86 let raw_addr_len = completion.data.addr.1;
87
88 let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len);
89
90 Ok((stream, addr))
91 }
92
93 pub async fn cancelable_accept(&self, c: CancelHandle) -> io::Result<(UnixStream, SocketAddr)> {
95 use crate::io::operation_canceled;
96
97 if c.canceled() {
98 return Err(operation_canceled());
99 }
100 let op = Op::accept(&self.fd)?;
101 let _guard = c.associate_op(op.op_canceller());
102
103 let completion = op.await;
105
106 let fd = completion.meta.result?;
108
109 let stream = UnixStream::from_shared_fd(SharedFd::new::<false>(fd.into_inner() as _)?);
111
112 let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) };
114 let storage: *mut libc::sockaddr_storage = &mut storage as *mut _;
115 let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() };
116 let raw_addr_len = completion.data.addr.1;
117
118 let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len);
119
120 Ok((stream, addr))
121 }
122
123 pub async fn readable(&self, relaxed: bool) -> io::Result<()> {
134 let op = Op::poll_read(&self.fd, relaxed).unwrap();
135 op.wait().await
136 }
137
138 pub fn from_std(sys_listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
140 match SharedFd::new::<false>(sys_listener.as_raw_fd()) {
141 Ok(shared) => Ok(Self {
142 fd: shared,
143 sys_listener: Some(sys_listener),
144 }),
145 Err(e) => Err(e),
146 }
147 }
148}
149
150impl Stream for UnixListener {
151 type Item = io::Result<(UnixStream, SocketAddr)>;
152
153 #[inline]
154 async fn next(&mut self) -> Option<Self::Item> {
155 Some(self.accept().await)
156 }
157}
158
159impl std::fmt::Debug for UnixListener {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 f.debug_struct("UnixListener")
162 .field("fd", &self.fd)
163 .finish()
164 }
165}
166
167impl IntoRawFd for UnixListener {
168 #[inline]
169 fn into_raw_fd(self) -> RawFd {
170 let mut this = ManuallyDrop::new(self);
171 #[allow(invalid_value)]
172 #[allow(clippy::uninit_assumed_init)]
173 let (mut fd, mut sys_listener) = unsafe {
174 (
175 MaybeUninit::uninit().assume_init(),
176 MaybeUninit::uninit().assume_init(),
177 )
178 };
179 std::mem::swap(&mut this.fd, &mut fd);
180 std::mem::swap(&mut this.sys_listener, &mut sys_listener);
181 let _ = sys_listener.take().unwrap().into_raw_fd();
182
183 fd.try_unwrap()
184 .expect("unexpected multiple reference to rawfd")
185 }
186}
187
188impl AsRawFd for UnixListener {
189 #[inline]
190 fn as_raw_fd(&self) -> RawFd {
191 self.fd.raw_fd()
192 }
193}
194
195impl Drop for UnixListener {
196 #[inline]
197 fn drop(&mut self) {
198 let _ = self.sys_listener.take().unwrap().into_raw_fd();
199 }
200}