monoio/driver/uring/
mod.rs

1//! Monoio Uring Driver.
2
3use std::{
4    cell::UnsafeCell,
5    io,
6    mem::ManuallyDrop,
7    os::unix::prelude::{AsRawFd, RawFd},
8    rc::Rc,
9    task::{Context, Poll},
10    time::Duration,
11};
12
13use io_uring::{cqueue, opcode, types::Timespec, IoUring};
14use lifecycle::MaybeFdLifecycle;
15
16use super::{
17    op::{CompletionMeta, Op, OpAble},
18    // ready::Ready,
19    // scheduled_io::ScheduledIo,
20    util::timespec,
21    Driver,
22    Inner,
23    CURRENT,
24};
25use crate::utils::slab::Slab;
26
27mod lifecycle;
28#[cfg(feature = "sync")]
29mod waker;
30#[cfg(feature = "sync")]
31pub(crate) use waker::UnparkHandle;
32
33#[allow(unused)]
34pub(crate) const CANCEL_USERDATA: u64 = u64::MAX;
35pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1;
36#[allow(unused)]
37pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2;
38#[cfg(feature = "poll-io")]
39pub(crate) const POLLER_USERDATA: u64 = u64::MAX - 3;
40
41pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 3;
42
43/// Driver with uring.
44pub struct IoUringDriver {
45    inner: Rc<UnsafeCell<UringInner>>,
46
47    // Used as timeout buffer
48    timespec: *mut Timespec,
49
50    // Used as read eventfd buffer
51    #[cfg(feature = "sync")]
52    eventfd_read_dst: *mut u8,
53
54    // Used for drop
55    #[cfg(feature = "sync")]
56    thread_id: usize,
57}
58
59pub(crate) struct UringInner {
60    /// In-flight operations
61    ops: Ops,
62
63    #[cfg(feature = "poll-io")]
64    poll: super::poll::Poll,
65    #[cfg(feature = "poll-io")]
66    poller_installed: bool,
67
68    /// IoUring bindings
69    uring: ManuallyDrop<IoUring>,
70
71    /// Shared waker
72    #[cfg(feature = "sync")]
73    shared_waker: std::sync::Arc<waker::EventWaker>,
74
75    // Mark if eventfd is in the ring
76    #[cfg(feature = "sync")]
77    eventfd_installed: bool,
78
79    // Waker receiver
80    #[cfg(feature = "sync")]
81    waker_receiver: flume::Receiver<std::task::Waker>,
82
83    // Uring support ext_arg
84    ext_arg: bool,
85}
86
87// When dropping the driver, all in-flight operations must have completed. This
88// type wraps the slab and ensures that, on drop, the slab is empty.
89struct Ops {
90    slab: Slab<MaybeFdLifecycle>,
91}
92
93impl IoUringDriver {
94    const DEFAULT_ENTRIES: u32 = 1024;
95
96    pub(crate) fn new(b: &io_uring::Builder) -> io::Result<IoUringDriver> {
97        Self::new_with_entries(b, Self::DEFAULT_ENTRIES)
98    }
99
100    #[cfg(not(feature = "sync"))]
101    pub(crate) fn new_with_entries(
102        urb: &io_uring::Builder,
103        entries: u32,
104    ) -> io::Result<IoUringDriver> {
105        let uring = ManuallyDrop::new(urb.build(entries)?);
106
107        let inner = Rc::new(UnsafeCell::new(UringInner {
108            #[cfg(feature = "poll-io")]
109            poll: super::poll::Poll::with_capacity(entries as usize)?,
110            #[cfg(feature = "poll-io")]
111            poller_installed: false,
112            ops: Ops::new(),
113            ext_arg: uring.params().is_feature_ext_arg(),
114            uring,
115        }));
116
117        Ok(IoUringDriver {
118            inner,
119            timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
120        })
121    }
122
123    #[cfg(feature = "sync")]
124    pub(crate) fn new_with_entries(
125        urb: &io_uring::Builder,
126        entries: u32,
127    ) -> io::Result<IoUringDriver> {
128        let uring = ManuallyDrop::new(urb.build(entries)?);
129
130        // Create eventfd and register it to the ring.
131        let waker = {
132            let fd = crate::syscall!(eventfd@RAW(0, libc::EFD_CLOEXEC))?;
133            unsafe {
134                use std::os::unix::io::FromRawFd;
135                std::fs::File::from_raw_fd(fd)
136            }
137        };
138
139        let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
140
141        let inner = Rc::new(UnsafeCell::new(UringInner {
142            #[cfg(feature = "poll-io")]
143            poller_installed: false,
144            #[cfg(feature = "poll-io")]
145            poll: super::poll::Poll::with_capacity(entries as usize)?,
146            ops: Ops::new(),
147            ext_arg: uring.params().is_feature_ext_arg(),
148            uring,
149            shared_waker: std::sync::Arc::new(waker::EventWaker::new(waker)),
150            eventfd_installed: false,
151            waker_receiver,
152        }));
153
154        let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
155        let driver = IoUringDriver {
156            inner,
157            timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
158            eventfd_read_dst: Box::leak(Box::new([0_u8; 8])) as *mut u8,
159            thread_id,
160        };
161
162        // Register unpark handle
163        super::thread::register_unpark_handle(thread_id, driver.unpark().into());
164        super::thread::register_waker_sender(thread_id, waker_sender);
165        Ok(driver)
166    }
167
168    #[allow(unused)]
169    fn num_operations(&self) -> usize {
170        let inner = self.inner.get();
171        unsafe { (*inner).ops.slab.len() }
172    }
173
174    // Flush to make enough space
175    fn flush_space(inner: &mut UringInner, need: usize) -> io::Result<()> {
176        let sq = inner.uring.submission();
177        debug_assert!(sq.capacity() >= need);
178        if sq.len() + need > sq.capacity() {
179            drop(sq);
180            inner.submit()?;
181        }
182        Ok(())
183    }
184
185    #[cfg(feature = "sync")]
186    fn install_eventfd(&self, inner: &mut UringInner, fd: RawFd) {
187        let entry = opcode::Read::new(io_uring::types::Fd(fd), self.eventfd_read_dst, 8)
188            .build()
189            .user_data(EVENTFD_USERDATA);
190
191        let mut sq = inner.uring.submission();
192        let _ = unsafe { sq.push(&entry) };
193        inner.eventfd_installed = true;
194    }
195
196    #[cfg(feature = "poll-io")]
197    fn install_poller(&self, inner: &mut UringInner, fd: RawFd) {
198        let entry = opcode::PollAdd::new(io_uring::types::Fd(fd), libc::POLLIN as _)
199            .build()
200            .user_data(POLLER_USERDATA);
201
202        let mut sq = inner.uring.submission();
203        let _ = unsafe { sq.push(&entry) };
204        inner.poller_installed = true;
205    }
206
207    fn install_timeout(&self, inner: &mut UringInner, duration: Duration) {
208        let timespec = timespec(duration);
209        unsafe {
210            std::ptr::replace(self.timespec, timespec);
211        }
212        let entry = opcode::Timeout::new(self.timespec as *const Timespec)
213            .build()
214            .user_data(TIMEOUT_USERDATA);
215
216        let mut sq = inner.uring.submission();
217        let _ = unsafe { sq.push(&entry) };
218    }
219
220    fn inner_park(&self, timeout: Option<Duration>) -> io::Result<()> {
221        let inner = unsafe { &mut *self.inner.get() };
222
223        #[allow(unused_mut)]
224        let mut need_wait = true;
225
226        #[cfg(feature = "sync")]
227        {
228            // Process foreign wakers
229            while let Ok(w) = inner.waker_receiver.try_recv() {
230                w.wake();
231                need_wait = false;
232            }
233
234            // Set status as not awake if we are going to sleep
235            if need_wait {
236                inner
237                    .shared_waker
238                    .awake
239                    .store(false, std::sync::atomic::Ordering::Release);
240            }
241
242            // Process foreign wakers left
243            while let Ok(w) = inner.waker_receiver.try_recv() {
244                w.wake();
245                need_wait = false;
246            }
247        }
248
249        if need_wait {
250            // Install timeout and eventfd for unpark if sync is enabled
251
252            // 1. alloc spaces
253            let mut space = 0;
254            #[cfg(feature = "sync")]
255            if !inner.eventfd_installed {
256                space += 1;
257            }
258            #[cfg(feature = "poll-io")]
259            if !inner.poller_installed {
260                space += 1;
261            }
262            if timeout.is_some() {
263                space += 1;
264            }
265            if space != 0 {
266                Self::flush_space(inner, space)?;
267            }
268
269            // 2.1 install poller
270            #[cfg(feature = "poll-io")]
271            if !inner.poller_installed {
272                self.install_poller(inner, inner.poll.as_raw_fd());
273            }
274
275            // 2.2 install eventfd and timeout
276            #[cfg(feature = "sync")]
277            if !inner.eventfd_installed {
278                self.install_eventfd(inner, inner.shared_waker.as_raw_fd());
279            }
280
281            // 2.3 install timeout and submit_and_wait with timeout
282            if let Some(duration) = timeout {
283                match inner.ext_arg {
284                    // Submit and Wait with timeout in an TimeoutOp way.
285                    // Better compatibility(5.4+).
286                    false => {
287                        self.install_timeout(inner, duration);
288                        inner.uring.submit_and_wait(1)?;
289                    }
290                    // Submit and Wait with enter args.
291                    // Better performance(5.11+).
292                    true => {
293                        let timespec = timespec(duration);
294                        let args = io_uring::types::SubmitArgs::new().timespec(&timespec);
295                        if let Err(e) = inner.uring.submitter().submit_with_args(1, &args) {
296                            if e.raw_os_error() != Some(libc::ETIME) {
297                                return Err(e);
298                            }
299                        }
300                    }
301                }
302            } else {
303                // Submit and Wait without timeout
304                inner.uring.submit_and_wait(1)?;
305            }
306        } else {
307            // Submit only
308            inner.uring.submit()?;
309        }
310
311        // Set status as awake
312        #[cfg(feature = "sync")]
313        inner
314            .shared_waker
315            .awake
316            .store(true, std::sync::atomic::Ordering::Release);
317
318        // Process CQ
319        inner.tick()?;
320
321        Ok(())
322    }
323
324    #[cfg(feature = "poll-io")]
325    #[inline]
326    pub(crate) fn register_poll_io(
327        this: &Rc<UnsafeCell<UringInner>>,
328        source: &mut impl mio::event::Source,
329        interest: mio::Interest,
330    ) -> io::Result<usize> {
331        let inner = unsafe { &mut *this.get() };
332        inner.poll.register(source, interest)
333    }
334
335    #[cfg(feature = "poll-io")]
336    #[inline]
337    pub(crate) fn deregister_poll_io(
338        this: &Rc<UnsafeCell<UringInner>>,
339        source: &mut impl mio::event::Source,
340        token: usize,
341    ) -> io::Result<()> {
342        let inner = unsafe { &mut *this.get() };
343        inner.poll.deregister(source, token)
344    }
345}
346
347impl Driver for IoUringDriver {
348    /// Enter the driver context. This enables using uring types.
349    fn with<R>(&self, f: impl FnOnce() -> R) -> R {
350        // TODO(ihciah): remove clone
351        let inner = Inner::Uring(self.inner.clone());
352        CURRENT.set(&inner, f)
353    }
354
355    fn submit(&self) -> io::Result<()> {
356        let inner = unsafe { &mut *self.inner.get() };
357        inner.submit()?;
358        inner.tick()?;
359        Ok(())
360    }
361
362    fn park(&self) -> io::Result<()> {
363        self.inner_park(None)
364    }
365
366    fn park_timeout(&self, duration: Duration) -> io::Result<()> {
367        self.inner_park(Some(duration))
368    }
369
370    #[cfg(feature = "sync")]
371    type Unpark = waker::UnparkHandle;
372
373    #[cfg(feature = "sync")]
374    fn unpark(&self) -> Self::Unpark {
375        UringInner::unpark(&self.inner)
376    }
377}
378
379impl UringInner {
380    fn tick(&mut self) -> io::Result<()> {
381        let cq = self.uring.completion();
382
383        for cqe in cq {
384            let index = cqe.user_data();
385            match index {
386                #[cfg(feature = "sync")]
387                EVENTFD_USERDATA => self.eventfd_installed = false,
388                #[cfg(feature = "poll-io")]
389                POLLER_USERDATA => {
390                    self.poller_installed = false;
391                    self.poll.tick(Some(Duration::ZERO))?;
392                }
393                _ if index >= MIN_REVERSED_USERDATA => (),
394                // # Safety
395                // Here we can make sure the result is valid.
396                _ => unsafe { self.ops.complete(index as _, resultify(&cqe), cqe.flags()) },
397            }
398        }
399        Ok(())
400    }
401
402    fn submit(&mut self) -> io::Result<()> {
403        loop {
404            match self.uring.submit() {
405                #[cfg(feature = "unstable")]
406                Err(ref e)
407                    if matches!(e.kind(), io::ErrorKind::Other | io::ErrorKind::ResourceBusy) =>
408                {
409                    self.tick()?;
410                }
411                #[cfg(not(feature = "unstable"))]
412                Err(ref e)
413                    if matches!(e.raw_os_error(), Some(libc::EAGAIN) | Some(libc::EBUSY)) =>
414                {
415                    // This error is constructed with io::Error::last_os_error():
416                    // https://github.com/tokio-rs/io-uring/blob/01c83bbce965d4aaf93ebfaa08c3aa8b7b0f5335/src/sys/mod.rs#L32
417                    // So we can use https://doc.rust-lang.org/nightly/std/io/struct.Error.html#method.raw_os_error
418                    // to get the raw error code.
419                    self.tick()?;
420                }
421                e => return e.map(|_| ()),
422            }
423        }
424    }
425
426    fn new_op<T: OpAble>(data: T, inner: &mut UringInner, driver: Inner) -> Op<T> {
427        Op {
428            driver,
429            index: inner.ops.insert(T::RET_IS_FD),
430            data: Some(data),
431        }
432    }
433
434    pub(crate) fn submit_with_data<T>(
435        this: &Rc<UnsafeCell<UringInner>>,
436        data: T,
437    ) -> io::Result<Op<T>>
438    where
439        T: OpAble,
440    {
441        let inner = unsafe { &mut *this.get() };
442        // If the submission queue is full, flush it to the kernel
443        if inner.uring.submission().is_full() {
444            inner.submit()?;
445        }
446
447        // Create the operation
448        let mut op = Self::new_op(data, inner, Inner::Uring(this.clone()));
449
450        // Configure the SQE
451        let data_mut = unsafe { op.data.as_mut().unwrap_unchecked() };
452        let sqe = OpAble::uring_op(data_mut).user_data(op.index as _);
453
454        {
455            let mut sq = inner.uring.submission();
456
457            // Push the new operation
458            if unsafe { sq.push(&sqe).is_err() } {
459                unimplemented!("when is this hit?");
460            }
461        }
462
463        // Submit the new operation. At this point, the operation has been
464        // pushed onto the queue and the tail pointer has been updated, so
465        // the submission entry is visible to the kernel. If there is an
466        // error here (probably EAGAIN), we still return the operation. A
467        // future `io_uring_enter` will fully submit the event.
468
469        // CHIHAI: We are not going to do syscall now. If we are waiting
470        // for IO, we will submit on `park`.
471        // let _ = inner.submit();
472        Ok(op)
473    }
474
475    pub(crate) fn poll_op(
476        this: &Rc<UnsafeCell<UringInner>>,
477        index: usize,
478        cx: &mut Context<'_>,
479    ) -> Poll<CompletionMeta> {
480        let inner = unsafe { &mut *this.get() };
481        let lifecycle = unsafe { inner.ops.slab.get(index).unwrap_unchecked() };
482        lifecycle.poll_op(cx)
483    }
484
485    #[cfg(feature = "poll-io")]
486    pub(crate) fn poll_legacy_op<T: OpAble>(
487        this: &Rc<UnsafeCell<Self>>,
488        data: &mut T,
489        cx: &mut Context<'_>,
490    ) -> Poll<CompletionMeta> {
491        let inner = unsafe { &mut *this.get() };
492        let (direction, index) = match data.legacy_interest() {
493            Some(x) => x,
494            None => {
495                // if there is no index provided, it means the action does not rely on fd
496                // readiness. do syscall right now.
497                return Poll::Ready(CompletionMeta {
498                    result: OpAble::legacy_call(data),
499                    flags: 0,
500                });
501            }
502        };
503
504        // wait io ready and do syscall
505        inner
506            .poll
507            .poll_syscall(cx, index, direction, || OpAble::legacy_call(data))
508    }
509
510    pub(crate) fn drop_op<T: 'static>(
511        this: &Rc<UnsafeCell<UringInner>>,
512        index: usize,
513        data: &mut Option<T>,
514        _skip_cancel: bool,
515    ) {
516        let inner = unsafe { &mut *this.get() };
517        if index == usize::MAX {
518            // already finished
519            return;
520        }
521        if let Some(lifecycle) = inner.ops.slab.get(index) {
522            let _must_finished = lifecycle.drop_op(data);
523            #[cfg(feature = "async-cancel")]
524            if !_must_finished && !_skip_cancel {
525                unsafe {
526                    let cancel = opcode::AsyncCancel::new(index as u64)
527                        .build()
528                        .user_data(u64::MAX);
529
530                    // Try push cancel, if failed, will submit and re-push.
531                    if inner.uring.submission().push(&cancel).is_err() {
532                        let _ = inner.submit();
533                        let _ = inner.uring.submission().push(&cancel);
534                    }
535                }
536            }
537        }
538    }
539
540    pub(crate) unsafe fn cancel_op(this: &Rc<UnsafeCell<UringInner>>, index: usize) {
541        let inner = &mut *this.get();
542        let cancel = opcode::AsyncCancel::new(index as u64)
543            .build()
544            .user_data(u64::MAX);
545        if inner.uring.submission().push(&cancel).is_err() {
546            let _ = inner.submit();
547            let _ = inner.uring.submission().push(&cancel);
548        }
549    }
550
551    #[cfg(feature = "sync")]
552    pub(crate) fn unpark(this: &Rc<UnsafeCell<UringInner>>) -> waker::UnparkHandle {
553        let inner = unsafe { &*this.get() };
554        let weak = std::sync::Arc::downgrade(&inner.shared_waker);
555        waker::UnparkHandle(weak)
556    }
557}
558
559impl AsRawFd for IoUringDriver {
560    fn as_raw_fd(&self) -> RawFd {
561        unsafe { (*self.inner.get()).uring.as_raw_fd() }
562    }
563}
564
565impl Drop for IoUringDriver {
566    fn drop(&mut self) {
567        trace!("MONOIO DEBUG[IoUringDriver]: drop");
568
569        // Dealloc leaked memory
570        unsafe { std::ptr::drop_in_place(self.timespec) };
571
572        #[cfg(feature = "sync")]
573        unsafe {
574            std::ptr::drop_in_place(self.eventfd_read_dst)
575        };
576
577        // Deregister thread id
578        #[cfg(feature = "sync")]
579        {
580            use crate::driver::thread::{unregister_unpark_handle, unregister_waker_sender};
581            unregister_unpark_handle(self.thread_id);
582            unregister_waker_sender(self.thread_id);
583        }
584    }
585}
586
587impl Drop for UringInner {
588    fn drop(&mut self) {
589        // no need to wait for completion, as the kernel will clean up the ring asynchronically.
590        let _ = self.uring.submitter().submit();
591        unsafe {
592            ManuallyDrop::drop(&mut self.uring);
593        }
594    }
595}
596
597impl Ops {
598    const fn new() -> Self {
599        Ops { slab: Slab::new() }
600    }
601
602    // Insert a new operation
603    #[inline]
604    pub(crate) fn insert(&mut self, is_fd: bool) -> usize {
605        self.slab.insert(MaybeFdLifecycle::new(is_fd))
606    }
607
608    // Complete an operation
609    // # Safety
610    // Caller must make sure the result is valid.
611    #[inline]
612    unsafe fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
613        let lifecycle = unsafe { self.slab.get(index).unwrap_unchecked() };
614        lifecycle.complete(result, flags);
615    }
616}
617
618#[inline]
619fn resultify(cqe: &cqueue::Entry) -> io::Result<u32> {
620    let res = cqe.result();
621
622    if res >= 0 {
623        Ok(res as u32)
624    } else {
625        Err(io::Error::from_raw_os_error(-res))
626    }
627}