monoio/net/unix/
listener.rs

1use std::{
2    io,
3    mem::{ManuallyDrop, MaybeUninit},
4    os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
5    path::Path,
6};
7
8use super::{socket_addr::SocketAddr, UnixStream};
9use crate::{
10    driver::{op::Op, shared_fd::SharedFd},
11    io::{stream::Stream, CancelHandle},
12    net::ListenerOpts,
13};
14
15/// UnixListener
16pub struct UnixListener {
17    fd: SharedFd,
18    sys_listener: Option<std::os::unix::net::UnixListener>,
19}
20
21impl UnixListener {
22    pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
23        let sys_listener = unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd.raw_fd()) };
24        Self {
25            fd,
26            sys_listener: Some(sys_listener),
27        }
28    }
29
30    /// Creates a new `UnixListener` bound to the specified socket with custom
31    /// config.
32    pub fn bind_with_config<P: AsRef<Path>>(
33        path: P,
34        config: &ListenerOpts,
35    ) -> io::Result<UnixListener> {
36        let sys_listener =
37            socket2::Socket::new(socket2::Domain::UNIX, socket2::Type::STREAM, None)?;
38        let addr = socket2::SockAddr::unix(path)?;
39
40        if config.reuse_port {
41            // TODO: properly handle this. Warn?
42            // this seems to cause an error on current (>6.x) kernels:
43            // sys_listener.set_reuse_port(true)?;
44        }
45        if config.reuse_addr {
46            sys_listener.set_reuse_address(true)?;
47        }
48        if let Some(send_buf_size) = config.send_buf_size {
49            sys_listener.set_send_buffer_size(send_buf_size)?;
50        }
51        if let Some(recv_buf_size) = config.recv_buf_size {
52            sys_listener.set_recv_buffer_size(recv_buf_size)?;
53        }
54
55        sys_listener.bind(&addr)?;
56        sys_listener.listen(config.backlog)?;
57
58        let fd = SharedFd::new::<false>(sys_listener.into_raw_fd())?;
59
60        Ok(Self::from_shared_fd(fd))
61    }
62
63    /// Creates a new `UnixListener` bound to the specified socket with default
64    /// config.
65    pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
66        Self::bind_with_config(path, &ListenerOpts::default())
67    }
68
69    /// Accept
70    pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
71        let op = Op::accept(&self.fd)?;
72
73        // Await the completion of the event
74        let completion = op.await;
75
76        // Convert fd
77        let fd = completion.meta.result?;
78
79        // Construct stream
80        let stream = UnixStream::from_shared_fd(SharedFd::new::<false>(fd.into_inner() as _)?);
81
82        // Construct SocketAddr
83        let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) };
84        let storage: *mut libc::sockaddr_storage = &mut storage as *mut _;
85        let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() };
86        let raw_addr_len = completion.data.addr.1;
87
88        let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len);
89
90        Ok((stream, addr))
91    }
92
93    /// Cancelable accept
94    pub async fn cancelable_accept(&self, c: CancelHandle) -> io::Result<(UnixStream, SocketAddr)> {
95        use crate::io::operation_canceled;
96
97        if c.canceled() {
98            return Err(operation_canceled());
99        }
100        let op = Op::accept(&self.fd)?;
101        let _guard = c.associate_op(op.op_canceller());
102
103        // Await the completion of the event
104        let completion = op.await;
105
106        // Convert fd
107        let fd = completion.meta.result?;
108
109        // Construct stream
110        let stream = UnixStream::from_shared_fd(SharedFd::new::<false>(fd.into_inner() as _)?);
111
112        // Construct SocketAddr
113        let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) };
114        let storage: *mut libc::sockaddr_storage = &mut storage as *mut _;
115        let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() };
116        let raw_addr_len = completion.data.addr.1;
117
118        let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len);
119
120        Ok((stream, addr))
121    }
122
123    /// Wait for read readiness.
124    /// Note: Do not use it before every io. It is different from other runtimes!
125    ///
126    /// Everytime call to this method may pay a syscall cost.
127    /// In uring impl, it will push a PollAdd op; in epoll impl, it will use use
128    /// inner readiness state; if !relaxed, it will call syscall poll after that.
129    ///
130    /// If relaxed, on legacy driver it may return false positive result.
131    /// If you want to do io by your own, you must maintain io readiness and wait
132    /// for io ready with relaxed=false.
133    pub async fn readable(&self, relaxed: bool) -> io::Result<()> {
134        let op = Op::poll_read(&self.fd, relaxed).unwrap();
135        op.wait().await
136    }
137
138    /// Creates new `UnixListener` from a `std::os::unix::net::UnixListener`.
139    pub fn from_std(sys_listener: std::os::unix::net::UnixListener) -> io::Result<Self> {
140        match SharedFd::new::<false>(sys_listener.as_raw_fd()) {
141            Ok(shared) => Ok(Self {
142                fd: shared,
143                sys_listener: Some(sys_listener),
144            }),
145            Err(e) => Err(e),
146        }
147    }
148}
149
150impl Stream for UnixListener {
151    type Item = io::Result<(UnixStream, SocketAddr)>;
152
153    #[inline]
154    async fn next(&mut self) -> Option<Self::Item> {
155        Some(self.accept().await)
156    }
157}
158
159impl std::fmt::Debug for UnixListener {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        f.debug_struct("UnixListener")
162            .field("fd", &self.fd)
163            .finish()
164    }
165}
166
167impl IntoRawFd for UnixListener {
168    #[inline]
169    fn into_raw_fd(self) -> RawFd {
170        let mut this = ManuallyDrop::new(self);
171        #[allow(invalid_value)]
172        #[allow(clippy::uninit_assumed_init)]
173        let (mut fd, mut sys_listener) = unsafe {
174            (
175                MaybeUninit::uninit().assume_init(),
176                MaybeUninit::uninit().assume_init(),
177            )
178        };
179        std::mem::swap(&mut this.fd, &mut fd);
180        std::mem::swap(&mut this.sys_listener, &mut sys_listener);
181        let _ = sys_listener.take().unwrap().into_raw_fd();
182
183        fd.try_unwrap()
184            .expect("unexpected multiple reference to rawfd")
185    }
186}
187
188impl AsRawFd for UnixListener {
189    #[inline]
190    fn as_raw_fd(&self) -> RawFd {
191        self.fd.raw_fd()
192    }
193}
194
195impl Drop for UnixListener {
196    #[inline]
197    fn drop(&mut self) {
198        let _ = self.sys_listener.take().unwrap().into_raw_fd();
199    }
200}