monoio/driver/uring/
mod.rs

1//! Monoio Uring Driver.
2
3#[cfg(feature = "sync")]
4use std::sync::Arc;
5use std::{
6    cell::UnsafeCell,
7    io,
8    mem::ManuallyDrop,
9    os::unix::prelude::{AsRawFd, RawFd},
10    rc::Rc,
11    task::{Context, Poll},
12    time::Duration,
13};
14
15#[cfg(feature = "sync")]
16use crossbeam_queue::SegQueue;
17use io_uring::{cqueue, opcode, types::Timespec, IoUring};
18use lifecycle::MaybeFdLifecycle;
19
20use super::{
21    op::{CompletionMeta, Op, OpAble},
22    util::timespec,
23    Driver, Inner, CURRENT,
24};
25use crate::utils::slab::Slab;
26
27mod lifecycle;
28#[cfg(feature = "sync")]
29mod waker;
30#[cfg(feature = "sync")]
31pub(crate) use waker::UnparkHandle;
32
33#[allow(unused)]
34pub(crate) const CANCEL_USERDATA: u64 = u64::MAX;
35pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1;
36#[allow(unused)]
37pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2;
38#[cfg(feature = "poll-io")]
39pub(crate) const POLLER_USERDATA: u64 = u64::MAX - 3;
40
41pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 3;
42
43/// Driver with uring.
44pub struct IoUringDriver {
45    inner: Rc<UnsafeCell<UringInner>>,
46
47    // Used as timeout buffer
48    timespec: *mut Timespec,
49
50    // Used as read eventfd buffer
51    #[cfg(feature = "sync")]
52    eventfd_read_dst: *mut u8,
53
54    // Used for drop
55    #[cfg(feature = "sync")]
56    thread_id: usize,
57}
58
59pub(crate) struct UringInner {
60    /// In-flight operations
61    ops: Ops,
62
63    #[cfg(feature = "poll-io")]
64    poll: super::poll::Poll,
65    #[cfg(feature = "poll-io")]
66    poller_installed: bool,
67
68    /// IoUring bindings
69    uring: ManuallyDrop<IoUring>,
70
71    /// Shared waker
72    #[cfg(feature = "sync")]
73    shared_waker: std::sync::Arc<waker::EventWaker>,
74
75    // Mark if eventfd is in the ring
76    #[cfg(feature = "sync")]
77    eventfd_installed: bool,
78
79    // Waker receiver
80    #[cfg(feature = "sync")]
81    waker_queue: Arc<SegQueue<std::task::Waker>>,
82
83    // Uring support ext_arg
84    ext_arg: bool,
85}
86
87// When dropping the driver, all in-flight operations must have completed. This
88// type wraps the slab and ensures that, on drop, the slab is empty.
89struct Ops {
90    slab: Slab<MaybeFdLifecycle>,
91}
92
93impl IoUringDriver {
94    const DEFAULT_ENTRIES: u32 = 1024;
95
96    pub(crate) fn new(b: &io_uring::Builder) -> io::Result<IoUringDriver> {
97        Self::new_with_entries(b, Self::DEFAULT_ENTRIES)
98    }
99
100    #[cfg(not(feature = "sync"))]
101    pub(crate) fn new_with_entries(
102        urb: &io_uring::Builder,
103        entries: u32,
104    ) -> io::Result<IoUringDriver> {
105        let uring = ManuallyDrop::new(urb.build(entries)?);
106
107        let inner = Rc::new(UnsafeCell::new(UringInner {
108            #[cfg(feature = "poll-io")]
109            poll: super::poll::Poll::with_capacity(entries as usize)?,
110            #[cfg(feature = "poll-io")]
111            poller_installed: false,
112            ops: Ops::new(),
113            ext_arg: uring.params().is_feature_ext_arg(),
114            uring,
115        }));
116
117        Ok(IoUringDriver {
118            inner,
119            timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
120        })
121    }
122
123    #[cfg(feature = "sync")]
124    pub(crate) fn new_with_entries(
125        urb: &io_uring::Builder,
126        entries: u32,
127    ) -> io::Result<IoUringDriver> {
128        use std::sync::Arc;
129
130        use crossbeam_queue::SegQueue;
131
132        let uring = ManuallyDrop::new(urb.build(entries)?);
133
134        // Create eventfd and register it to the ring.
135        let waker = {
136            let fd = crate::syscall!(eventfd@RAW(0, libc::EFD_CLOEXEC))?;
137            unsafe {
138                use std::os::unix::io::FromRawFd;
139                std::fs::File::from_raw_fd(fd)
140            }
141        };
142
143        let waker_queue = Arc::new(SegQueue::new());
144
145        let inner = Rc::new(UnsafeCell::new(UringInner {
146            #[cfg(feature = "poll-io")]
147            poller_installed: false,
148            #[cfg(feature = "poll-io")]
149            poll: super::poll::Poll::with_capacity(entries as usize)?,
150            ops: Ops::new(),
151            ext_arg: uring.params().is_feature_ext_arg(),
152            uring,
153            shared_waker: std::sync::Arc::new(waker::EventWaker::new(waker)),
154            eventfd_installed: false,
155            waker_queue: waker_queue.clone(),
156        }));
157
158        let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
159        let driver = IoUringDriver {
160            inner,
161            timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
162            eventfd_read_dst: Box::leak(Box::new([0_u8; 8])) as *mut u8,
163            thread_id,
164        };
165
166        // Register unpark handle
167        super::thread::register_unpark_handle(thread_id, driver.unpark().into());
168        super::thread::register_waker_queue(thread_id, waker_queue);
169        Ok(driver)
170    }
171
172    #[allow(unused)]
173    fn num_operations(&self) -> usize {
174        let inner = self.inner.get();
175        unsafe { (*inner).ops.slab.len() }
176    }
177
178    // Flush to make enough space
179    fn flush_space(inner: &mut UringInner, need: usize) -> io::Result<()> {
180        let sq = inner.uring.submission();
181        debug_assert!(sq.capacity() >= need);
182        if sq.len() + need > sq.capacity() {
183            drop(sq);
184            inner.submit()?;
185        }
186        Ok(())
187    }
188
189    #[cfg(feature = "sync")]
190    fn install_eventfd(&self, inner: &mut UringInner, fd: RawFd) {
191        let entry = opcode::Read::new(io_uring::types::Fd(fd), self.eventfd_read_dst, 8)
192            .build()
193            .user_data(EVENTFD_USERDATA);
194
195        let mut sq = inner.uring.submission();
196        let _ = unsafe { sq.push(&entry) };
197        inner.eventfd_installed = true;
198    }
199
200    #[cfg(feature = "poll-io")]
201    fn install_poller(&self, inner: &mut UringInner, fd: RawFd) {
202        let entry = opcode::PollAdd::new(io_uring::types::Fd(fd), libc::POLLIN as _)
203            .build()
204            .user_data(POLLER_USERDATA);
205
206        let mut sq = inner.uring.submission();
207        let _ = unsafe { sq.push(&entry) };
208        inner.poller_installed = true;
209    }
210
211    fn install_timeout(&self, inner: &mut UringInner, duration: Duration) {
212        unsafe {
213            std::ptr::replace(self.timespec, timespec(duration));
214        }
215        let entry = opcode::Timeout::new(self.timespec)
216            .build()
217            .user_data(TIMEOUT_USERDATA);
218
219        let mut sq = inner.uring.submission();
220        let _ = unsafe { sq.push(&entry) };
221    }
222
223    fn inner_park(&self, timeout: Option<Duration>) -> io::Result<()> {
224        let inner = unsafe { &mut *self.inner.get() };
225
226        #[allow(unused_mut)]
227        let mut need_wait = true;
228
229        #[cfg(feature = "sync")]
230        {
231            // Process foreign wakers
232            while let Some(w) = inner.waker_queue.pop() {
233                w.wake();
234                need_wait = false;
235            }
236
237            // Set status as not awake if we are going to sleep
238            if need_wait {
239                inner
240                    .shared_waker
241                    .awake
242                    .store(false, std::sync::atomic::Ordering::Release);
243            }
244
245            // Process foreign wakers left
246            while let Some(w) = inner.waker_queue.pop() {
247                w.wake();
248                need_wait = false;
249            }
250        }
251
252        if need_wait {
253            // Install timeout and eventfd for unpark if sync is enabled
254
255            // 1. alloc spaces
256            let mut space = 0;
257            #[cfg(feature = "sync")]
258            if !inner.eventfd_installed {
259                space += 1;
260            }
261            #[cfg(feature = "poll-io")]
262            if !inner.poller_installed {
263                space += 1;
264            }
265            if timeout.is_some() {
266                space += 1;
267            }
268            if space != 0 {
269                Self::flush_space(inner, space)?;
270            }
271
272            // 2.1 install poller
273            #[cfg(feature = "poll-io")]
274            if !inner.poller_installed {
275                self.install_poller(inner, inner.poll.as_raw_fd());
276            }
277
278            // 2.2 install eventfd and timeout
279            #[cfg(feature = "sync")]
280            if !inner.eventfd_installed {
281                self.install_eventfd(inner, inner.shared_waker.as_raw_fd());
282            }
283
284            // 2.3 install timeout and submit_and_wait with timeout
285            if let Some(duration) = timeout {
286                match inner.ext_arg {
287                    // Submit and Wait with timeout in an TimeoutOp way.
288                    // Better compatibility(5.4+).
289                    false => {
290                        self.install_timeout(inner, duration);
291                        inner.uring.submit_and_wait(1)?;
292                    }
293                    // Submit and Wait with enter args.
294                    // Better performance(5.11+).
295                    true => {
296                        let timespec = timespec(duration);
297                        let args = io_uring::types::SubmitArgs::new().timespec(&timespec);
298                        if let Err(e) = inner.uring.submitter().submit_with_args(1, &args) {
299                            if e.raw_os_error() != Some(libc::ETIME) {
300                                return Err(e);
301                            }
302                        }
303                    }
304                }
305            } else {
306                // Submit and Wait without timeout
307                inner.uring.submit_and_wait(1)?;
308            }
309        } else {
310            // Submit only
311            inner.uring.submit()?;
312        }
313
314        // Set status as awake
315        #[cfg(feature = "sync")]
316        inner
317            .shared_waker
318            .awake
319            .store(true, std::sync::atomic::Ordering::Release);
320
321        // Process CQ
322        inner.tick()?;
323
324        Ok(())
325    }
326
327    #[cfg(feature = "poll-io")]
328    #[inline]
329    pub(crate) fn register_poll_io(
330        this: &Rc<UnsafeCell<UringInner>>,
331        source: &mut impl mio::event::Source,
332        interest: mio::Interest,
333    ) -> io::Result<usize> {
334        let inner = unsafe { &mut *this.get() };
335        inner.poll.register(source, interest)
336    }
337
338    #[cfg(feature = "poll-io")]
339    #[inline]
340    pub(crate) fn deregister_poll_io(
341        this: &Rc<UnsafeCell<UringInner>>,
342        source: &mut impl mio::event::Source,
343        token: usize,
344    ) -> io::Result<()> {
345        let inner = unsafe { &mut *this.get() };
346        inner.poll.deregister(source, token)
347    }
348}
349
350impl Driver for IoUringDriver {
351    /// Enter the driver context. This enables using uring types.
352    fn with<R>(&self, f: impl FnOnce() -> R) -> R {
353        // TODO(ihciah): remove clone
354        let inner = Inner::Uring(self.inner.clone());
355        CURRENT.set(&inner, f)
356    }
357
358    fn submit(&self) -> io::Result<()> {
359        let inner = unsafe { &mut *self.inner.get() };
360        inner.submit()?;
361        inner.tick()?;
362        Ok(())
363    }
364
365    fn park(&self) -> io::Result<()> {
366        self.inner_park(None)
367    }
368
369    fn park_timeout(&self, duration: Duration) -> io::Result<()> {
370        self.inner_park(Some(duration))
371    }
372
373    #[cfg(feature = "sync")]
374    type Unpark = waker::UnparkHandle;
375
376    #[cfg(feature = "sync")]
377    fn unpark(&self) -> Self::Unpark {
378        UringInner::unpark(&self.inner)
379    }
380}
381
382impl UringInner {
383    fn tick(&mut self) -> io::Result<()> {
384        let cq = self.uring.completion();
385
386        for cqe in cq {
387            let index = cqe.user_data();
388            match index {
389                #[cfg(feature = "sync")]
390                EVENTFD_USERDATA => self.eventfd_installed = false,
391                #[cfg(feature = "poll-io")]
392                POLLER_USERDATA => {
393                    self.poller_installed = false;
394                    self.poll.tick(Some(Duration::ZERO))?;
395                }
396                _ if index >= MIN_REVERSED_USERDATA => (),
397                // # Safety
398                // Here we can make sure the result is valid.
399                _ => unsafe { self.ops.complete(index as _, resultify(&cqe), cqe.flags()) },
400            }
401        }
402        Ok(())
403    }
404
405    fn submit(&mut self) -> io::Result<()> {
406        loop {
407            match self.uring.submit() {
408                #[cfg(feature = "unstable")]
409                Err(ref e)
410                    if matches!(e.kind(), io::ErrorKind::Other | io::ErrorKind::ResourceBusy) =>
411                {
412                    self.tick()?;
413                }
414                #[cfg(not(feature = "unstable"))]
415                Err(ref e)
416                    if matches!(e.raw_os_error(), Some(libc::EAGAIN) | Some(libc::EBUSY)) =>
417                {
418                    // This error is constructed with io::Error::last_os_error():
419                    // https://github.com/tokio-rs/io-uring/blob/01c83bbce965d4aaf93ebfaa08c3aa8b7b0f5335/src/sys/mod.rs#L32
420                    // So we can use https://doc.rust-lang.org/nightly/std/io/struct.Error.html#method.raw_os_error
421                    // to get the raw error code.
422                    self.tick()?;
423                }
424                e => return e.map(|_| ()),
425            }
426        }
427    }
428
429    fn new_op<T: OpAble>(data: T, inner: &mut UringInner, driver: Inner) -> Op<T> {
430        Op {
431            driver,
432            index: inner.ops.insert(T::RET_IS_FD),
433            data: Some(data),
434        }
435    }
436
437    pub(crate) fn submit_with_data<T>(
438        this: &Rc<UnsafeCell<UringInner>>,
439        data: T,
440    ) -> io::Result<Op<T>>
441    where
442        T: OpAble,
443    {
444        let inner = unsafe { &mut *this.get() };
445        // If the submission queue is full, flush it to the kernel
446        if inner.uring.submission().is_full() {
447            inner.submit()?;
448        }
449
450        // Create the operation
451        let mut op = Self::new_op(data, inner, Inner::Uring(this.clone()));
452
453        // Configure the SQE
454        let data_mut = unsafe { op.data.as_mut().unwrap_unchecked() };
455        let sqe = OpAble::uring_op(data_mut).user_data(op.index as _);
456
457        {
458            let mut sq = inner.uring.submission();
459
460            // Push the new operation
461            if unsafe { sq.push(&sqe).is_err() } {
462                unimplemented!("when is this hit?");
463            }
464        }
465
466        // Submit the new operation. At this point, the operation has been
467        // pushed onto the queue and the tail pointer has been updated, so
468        // the submission entry is visible to the kernel. If there is an
469        // error here (probably EAGAIN), we still return the operation. A
470        // future `io_uring_enter` will fully submit the event.
471
472        // CHIHAI: We are not going to do syscall now. If we are waiting
473        // for IO, we will submit on `park`.
474        // let _ = inner.submit();
475        Ok(op)
476    }
477
478    pub(crate) fn poll_op(
479        this: &Rc<UnsafeCell<UringInner>>,
480        index: usize,
481        cx: &mut Context<'_>,
482    ) -> Poll<CompletionMeta> {
483        let inner = unsafe { &mut *this.get() };
484        let lifecycle = unsafe { inner.ops.slab.get(index).unwrap_unchecked() };
485        lifecycle.poll_op(cx)
486    }
487
488    #[cfg(feature = "poll-io")]
489    pub(crate) fn poll_legacy_op<T: OpAble>(
490        this: &Rc<UnsafeCell<Self>>,
491        data: &mut T,
492        cx: &mut Context<'_>,
493    ) -> Poll<CompletionMeta> {
494        let inner = unsafe { &mut *this.get() };
495        let (direction, index) = match data.legacy_interest() {
496            Some(x) => x,
497            None => {
498                // if there is no index provided, it means the action does not rely on fd
499                // readiness. do syscall right now.
500                return Poll::Ready(CompletionMeta {
501                    result: OpAble::legacy_call(data),
502                    flags: 0,
503                });
504            }
505        };
506
507        // wait io ready and do syscall
508        inner
509            .poll
510            .poll_syscall(cx, index, direction, || OpAble::legacy_call(data))
511    }
512
513    pub(crate) fn drop_op<T: 'static>(
514        this: &Rc<UnsafeCell<UringInner>>,
515        index: usize,
516        data: &mut Option<T>,
517        _skip_cancel: bool,
518    ) {
519        let inner = unsafe { &mut *this.get() };
520        if index == usize::MAX {
521            // already finished
522            return;
523        }
524        if let Some(lifecycle) = inner.ops.slab.get(index) {
525            let _must_finished = lifecycle.drop_op(data);
526            #[cfg(feature = "async-cancel")]
527            if !_must_finished && !_skip_cancel {
528                unsafe {
529                    let cancel = opcode::AsyncCancel::new(index as u64)
530                        .build()
531                        .user_data(u64::MAX);
532
533                    // Try push cancel, if failed, will submit and re-push.
534                    if inner.uring.submission().push(&cancel).is_err() {
535                        let _ = inner.submit();
536                        let _ = inner.uring.submission().push(&cancel);
537                    }
538                }
539            }
540        }
541    }
542
543    pub(crate) unsafe fn cancel_op(this: &Rc<UnsafeCell<UringInner>>, index: usize) {
544        let inner = &mut *this.get();
545        let cancel = opcode::AsyncCancel::new(index as u64)
546            .build()
547            .user_data(u64::MAX);
548        if inner.uring.submission().push(&cancel).is_err() {
549            let _ = inner.submit();
550            let _ = inner.uring.submission().push(&cancel);
551        }
552    }
553
554    #[cfg(feature = "sync")]
555    pub(crate) fn unpark(this: &Rc<UnsafeCell<UringInner>>) -> waker::UnparkHandle {
556        let inner = unsafe { &*this.get() };
557        let weak = std::sync::Arc::downgrade(&inner.shared_waker);
558        waker::UnparkHandle(weak)
559    }
560}
561
562impl AsRawFd for IoUringDriver {
563    fn as_raw_fd(&self) -> RawFd {
564        unsafe { (*self.inner.get()).uring.as_raw_fd() }
565    }
566}
567
568impl Drop for IoUringDriver {
569    fn drop(&mut self) {
570        trace!("MONOIO DEBUG[IoUringDriver]: drop");
571
572        // Dealloc leaked memory
573        unsafe { std::ptr::drop_in_place(self.timespec) };
574
575        #[cfg(feature = "sync")]
576        unsafe {
577            std::ptr::drop_in_place(self.eventfd_read_dst)
578        };
579
580        // Deregister thread id
581        #[cfg(feature = "sync")]
582        {
583            use crate::driver::thread::{unregister_unpark_handle, unregister_waker_queue};
584            unregister_unpark_handle(self.thread_id);
585            unregister_waker_queue(self.thread_id);
586        }
587    }
588}
589
590impl Drop for UringInner {
591    fn drop(&mut self) {
592        // no need to wait for completion, as the kernel will clean up the ring asynchronically.
593        let _ = self.uring.submitter().submit();
594        unsafe {
595            ManuallyDrop::drop(&mut self.uring);
596        }
597    }
598}
599
600impl Ops {
601    const fn new() -> Self {
602        Ops { slab: Slab::new() }
603    }
604
605    // Insert a new operation
606    #[inline]
607    pub(crate) fn insert(&mut self, is_fd: bool) -> usize {
608        self.slab.insert(MaybeFdLifecycle::new(is_fd))
609    }
610
611    // Complete an operation
612    // # Safety
613    // Caller must make sure the result is valid.
614    #[inline]
615    unsafe fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
616        let lifecycle = unsafe { self.slab.get(index).unwrap_unchecked() };
617        lifecycle.complete(result, flags);
618    }
619}
620
621#[inline]
622fn resultify(cqe: &cqueue::Entry) -> io::Result<u32> {
623    let res = cqe.result();
624
625    if res >= 0 {
626        Ok(res as u32)
627    } else {
628        Err(io::Error::from_raw_os_error(-res))
629    }
630}