monoio/driver/
op.rs

1use std::{
2    future::Future,
3    io,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use crate::driver;
9
10pub(crate) mod close;
11pub(crate) mod read;
12pub(crate) mod write;
13
14mod accept;
15mod connect;
16mod fsync;
17mod open;
18mod poll;
19mod recv;
20mod send;
21#[cfg(unix)]
22mod statx;
23
24#[cfg(feature = "mkdirat")]
25mod mkdir;
26
27#[cfg(feature = "unlinkat")]
28mod unlink;
29
30#[cfg(feature = "renameat")]
31mod rename;
32
33#[cfg(all(unix, feature = "symlinkat"))]
34mod symlink;
35
36#[cfg(all(target_os = "linux", feature = "splice"))]
37mod splice;
38
39/// In-flight operation
40pub(crate) struct Op<T: 'static + OpAble> {
41    // Driver running the operation
42    pub(super) driver: driver::Inner,
43
44    // Operation index in the slab(useless for legacy)
45    pub(super) index: usize,
46
47    // Per-operation data
48    pub(super) data: Option<T>,
49}
50
51/// Operation completion. Returns stored state with the result of the operation.
52#[derive(Debug)]
53pub(crate) struct Completion<T> {
54    pub(crate) data: T,
55    pub(crate) meta: CompletionMeta,
56}
57
58/// Operation completion meta info.
59#[derive(Debug)]
60pub(crate) struct CompletionMeta {
61    pub(crate) result: io::Result<MaybeFd>,
62    #[allow(unused)]
63    pub(crate) flags: u32,
64}
65
66/// MaybeFd is a wrapper for fd or a normal number. If it is marked as fd, it will close the fd when
67/// dropped.
68/// Use `into_inner` to take the inner fd or number and skip the drop.
69///
70/// This wrapper is designed to be used in the syscall return value. It can prevent fd leak when the
71/// operation is cancelled.
72#[derive(Debug)]
73pub(crate) struct MaybeFd {
74    is_fd: bool,
75    fd: u32,
76}
77
78impl MaybeFd {
79    #[inline]
80    pub(crate) unsafe fn new_result(fdr: io::Result<u32>, is_fd: bool) -> io::Result<Self> {
81        fdr.map(|fd| Self { is_fd, fd })
82    }
83
84    #[inline]
85    pub(crate) unsafe fn new_fd_result(fdr: io::Result<u32>) -> io::Result<Self> {
86        fdr.map(|fd| Self { is_fd: true, fd })
87    }
88
89    #[inline]
90    pub(crate) fn new_non_fd_result(fdr: io::Result<u32>) -> io::Result<Self> {
91        fdr.map(|fd| Self { is_fd: false, fd })
92    }
93
94    #[inline]
95    pub(crate) const unsafe fn new_fd(fd: u32) -> Self {
96        Self { is_fd: true, fd }
97    }
98
99    #[inline]
100    pub(crate) const fn new_non_fd(fd: u32) -> Self {
101        Self { is_fd: false, fd }
102    }
103
104    #[inline]
105    pub(crate) const fn into_inner(self) -> u32 {
106        let fd = self.fd;
107        std::mem::forget(self);
108        fd
109    }
110
111    #[inline]
112    pub(crate) const fn zero() -> Self {
113        Self {
114            is_fd: false,
115            fd: 0,
116        }
117    }
118
119    #[inline]
120    pub(crate) fn fd(&self) -> u32 {
121        self.fd
122    }
123}
124
125impl Drop for MaybeFd {
126    fn drop(&mut self) {
127        // The fd close only executed when:
128        // 1. the operation is cancelled
129        // 2. the cancellation failed
130        // 3. the returned result is a fd
131        // So this is a relatively cold path. For simplicity, we just do a close syscall here
132        // instead of pushing close op.
133        if self.is_fd {
134            unsafe {
135                libc::close(self.fd as libc::c_int);
136            }
137        }
138    }
139}
140
141pub(crate) trait OpAble {
142    #[cfg(all(target_os = "linux", feature = "iouring"))]
143    const RET_IS_FD: bool = false;
144    #[cfg(all(target_os = "linux", feature = "iouring"))]
145    const SKIP_CANCEL: bool = false;
146    #[cfg(all(target_os = "linux", feature = "iouring"))]
147    fn uring_op(&mut self) -> io_uring::squeue::Entry;
148
149    #[cfg(any(feature = "legacy", feature = "poll-io"))]
150    fn legacy_interest(&self) -> Option<(super::ready::Direction, usize)>;
151    #[cfg(any(feature = "legacy", feature = "poll-io"))]
152    fn legacy_call(&mut self) -> io::Result<MaybeFd>;
153}
154
155/// If legacy is enabled and iouring is not, we can expose io interface in a poll-like way.
156/// This can provide better compatibility for crates programmed in poll-like way.
157#[allow(dead_code)]
158#[cfg(any(feature = "legacy", feature = "poll-io"))]
159pub(crate) trait PollLegacy {
160    #[cfg(feature = "legacy")]
161    fn poll_legacy(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<CompletionMeta>;
162    #[cfg(feature = "poll-io")]
163    fn poll_io(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<CompletionMeta>;
164}
165
166#[cfg(any(feature = "legacy", feature = "poll-io"))]
167impl<T: OpAble> PollLegacy for T {
168    #[cfg(feature = "legacy")]
169    #[inline]
170    fn poll_legacy(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<CompletionMeta> {
171        #[cfg(all(feature = "iouring", feature = "tokio-compat"))]
172        unsafe {
173            extern "C" {
174                #[link_name = "tokio-compat can only be enabled when legacy feature is enabled and \
175                               iouring is not"]
176                fn trigger() -> !;
177            }
178            trigger()
179        }
180
181        #[cfg(not(all(feature = "iouring", feature = "tokio-compat")))]
182        driver::CURRENT.with(|this| this.poll_op(self, 0, _cx))
183    }
184
185    #[cfg(feature = "poll-io")]
186    #[inline]
187    fn poll_io(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<CompletionMeta> {
188        driver::CURRENT.with(|this| this.poll_legacy_op(self, cx))
189    }
190}
191
192impl<T: OpAble> Op<T> {
193    /// Submit an operation to uring.
194    ///
195    /// `state` is stored during the operation tracking any state submitted to
196    /// the kernel.
197    pub(super) fn submit_with(data: T) -> io::Result<Op<T>> {
198        driver::CURRENT.with(|this| this.submit_with(data))
199    }
200
201    /// Try submitting an operation to uring
202    #[allow(unused)]
203    pub(super) fn try_submit_with(data: T) -> io::Result<Op<T>> {
204        if driver::CURRENT.is_set() {
205            Op::submit_with(data)
206        } else {
207            Err(io::ErrorKind::Other.into())
208        }
209    }
210
211    pub(crate) fn op_canceller(&self) -> OpCanceller {
212        #[cfg(feature = "legacy")]
213        if is_legacy() {
214            return if let Some((dir, id)) = self.data.as_ref().unwrap().legacy_interest() {
215                OpCanceller {
216                    index: id,
217                    direction: Some(dir),
218                }
219            } else {
220                OpCanceller {
221                    index: 0,
222                    direction: None,
223                }
224            };
225        }
226        OpCanceller {
227            index: self.index,
228            #[cfg(feature = "legacy")]
229            direction: None,
230        }
231    }
232}
233
234impl<T> Future for Op<T>
235where
236    T: Unpin + OpAble + 'static,
237{
238    type Output = Completion<T>;
239
240    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
241        let me = &mut *self;
242        let data_mut = me.data.as_mut().expect("unexpected operation state");
243        let meta = ready!(me.driver.poll_op::<T>(data_mut, me.index, cx));
244
245        me.index = usize::MAX;
246        let data = me.data.take().expect("unexpected operation state");
247        Poll::Ready(Completion { data, meta })
248    }
249}
250
251#[cfg(all(target_os = "linux", feature = "iouring"))]
252impl<T: OpAble> Drop for Op<T> {
253    #[inline]
254    fn drop(&mut self) {
255        self.driver
256            .drop_op(self.index, &mut self.data, T::SKIP_CANCEL);
257    }
258}
259
260/// Check if current driver is legacy.
261#[allow(unused)]
262#[cfg(not(target_os = "linux"))]
263#[inline]
264pub const fn is_legacy() -> bool {
265    true
266}
267
268/// Check if current driver is legacy.
269#[cfg(target_os = "linux")]
270#[inline]
271pub fn is_legacy() -> bool {
272    super::CURRENT.with(|inner| inner.is_legacy())
273}
274
275#[derive(Debug, Eq, PartialEq, Clone, Hash)]
276pub(crate) struct OpCanceller {
277    pub(super) index: usize,
278    #[cfg(feature = "legacy")]
279    pub(super) direction: Option<super::ready::Direction>,
280}
281
282impl OpCanceller {
283    pub(crate) unsafe fn cancel(&self) {
284        super::CURRENT.with(|inner| inner.cancel_op(self))
285    }
286}