1#[cfg(feature = "sync")]
4use std::sync::Arc;
5use std::{
6 cell::UnsafeCell,
7 io,
8 mem::ManuallyDrop,
9 os::unix::prelude::{AsRawFd, RawFd},
10 rc::Rc,
11 task::{Context, Poll},
12 time::Duration,
13};
14
15#[cfg(feature = "sync")]
16use crossbeam_queue::SegQueue;
17use io_uring::{cqueue, opcode, types::Timespec, IoUring};
18use lifecycle::MaybeFdLifecycle;
19
20use super::{
21 op::{CompletionMeta, Op, OpAble},
22 util::timespec,
23 Driver, Inner, CURRENT,
24};
25use crate::utils::slab::Slab;
26
27mod lifecycle;
28#[cfg(feature = "sync")]
29mod waker;
30#[cfg(feature = "sync")]
31pub(crate) use waker::UnparkHandle;
32
33#[allow(unused)]
34pub(crate) const CANCEL_USERDATA: u64 = u64::MAX;
35pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1;
36#[allow(unused)]
37pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2;
38#[cfg(feature = "poll-io")]
39pub(crate) const POLLER_USERDATA: u64 = u64::MAX - 3;
40
41pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 3;
42
43pub struct IoUringDriver {
45 inner: Rc<UnsafeCell<UringInner>>,
46
47 timespec: *mut Timespec,
49
50 #[cfg(feature = "sync")]
52 eventfd_read_dst: *mut u8,
53
54 #[cfg(feature = "sync")]
56 thread_id: usize,
57}
58
59pub(crate) struct UringInner {
60 ops: Ops,
62
63 #[cfg(feature = "poll-io")]
64 poll: super::poll::Poll,
65 #[cfg(feature = "poll-io")]
66 poller_installed: bool,
67
68 uring: ManuallyDrop<IoUring>,
70
71 #[cfg(feature = "sync")]
73 shared_waker: std::sync::Arc<waker::EventWaker>,
74
75 #[cfg(feature = "sync")]
77 eventfd_installed: bool,
78
79 #[cfg(feature = "sync")]
81 waker_queue: Arc<SegQueue<std::task::Waker>>,
82
83 ext_arg: bool,
85}
86
87struct Ops {
90 slab: Slab<MaybeFdLifecycle>,
91}
92
93impl IoUringDriver {
94 const DEFAULT_ENTRIES: u32 = 1024;
95
96 pub(crate) fn new(b: &io_uring::Builder) -> io::Result<IoUringDriver> {
97 Self::new_with_entries(b, Self::DEFAULT_ENTRIES)
98 }
99
100 #[cfg(not(feature = "sync"))]
101 pub(crate) fn new_with_entries(
102 urb: &io_uring::Builder,
103 entries: u32,
104 ) -> io::Result<IoUringDriver> {
105 let uring = ManuallyDrop::new(urb.build(entries)?);
106
107 let inner = Rc::new(UnsafeCell::new(UringInner {
108 #[cfg(feature = "poll-io")]
109 poll: super::poll::Poll::with_capacity(entries as usize)?,
110 #[cfg(feature = "poll-io")]
111 poller_installed: false,
112 ops: Ops::new(),
113 ext_arg: uring.params().is_feature_ext_arg(),
114 uring,
115 }));
116
117 Ok(IoUringDriver {
118 inner,
119 timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
120 })
121 }
122
123 #[cfg(feature = "sync")]
124 pub(crate) fn new_with_entries(
125 urb: &io_uring::Builder,
126 entries: u32,
127 ) -> io::Result<IoUringDriver> {
128 use std::sync::Arc;
129
130 use crossbeam_queue::SegQueue;
131
132 let uring = ManuallyDrop::new(urb.build(entries)?);
133
134 let waker = {
136 let fd = crate::syscall!(eventfd@RAW(0, libc::EFD_CLOEXEC))?;
137 unsafe {
138 use std::os::unix::io::FromRawFd;
139 std::fs::File::from_raw_fd(fd)
140 }
141 };
142
143 let waker_queue = Arc::new(SegQueue::new());
144
145 let inner = Rc::new(UnsafeCell::new(UringInner {
146 #[cfg(feature = "poll-io")]
147 poller_installed: false,
148 #[cfg(feature = "poll-io")]
149 poll: super::poll::Poll::with_capacity(entries as usize)?,
150 ops: Ops::new(),
151 ext_arg: uring.params().is_feature_ext_arg(),
152 uring,
153 shared_waker: std::sync::Arc::new(waker::EventWaker::new(waker)),
154 eventfd_installed: false,
155 waker_queue: waker_queue.clone(),
156 }));
157
158 let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
159 let driver = IoUringDriver {
160 inner,
161 timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
162 eventfd_read_dst: Box::leak(Box::new([0_u8; 8])) as *mut u8,
163 thread_id,
164 };
165
166 super::thread::register_unpark_handle(thread_id, driver.unpark().into());
168 super::thread::register_waker_queue(thread_id, waker_queue);
169 Ok(driver)
170 }
171
172 #[allow(unused)]
173 fn num_operations(&self) -> usize {
174 let inner = self.inner.get();
175 unsafe { (*inner).ops.slab.len() }
176 }
177
178 fn flush_space(inner: &mut UringInner, need: usize) -> io::Result<()> {
180 let sq = inner.uring.submission();
181 debug_assert!(sq.capacity() >= need);
182 if sq.len() + need > sq.capacity() {
183 drop(sq);
184 inner.submit()?;
185 }
186 Ok(())
187 }
188
189 #[cfg(feature = "sync")]
190 fn install_eventfd(&self, inner: &mut UringInner, fd: RawFd) {
191 let entry = opcode::Read::new(io_uring::types::Fd(fd), self.eventfd_read_dst, 8)
192 .build()
193 .user_data(EVENTFD_USERDATA);
194
195 let mut sq = inner.uring.submission();
196 let _ = unsafe { sq.push(&entry) };
197 inner.eventfd_installed = true;
198 }
199
200 #[cfg(feature = "poll-io")]
201 fn install_poller(&self, inner: &mut UringInner, fd: RawFd) {
202 let entry = opcode::PollAdd::new(io_uring::types::Fd(fd), libc::POLLIN as _)
203 .build()
204 .user_data(POLLER_USERDATA);
205
206 let mut sq = inner.uring.submission();
207 let _ = unsafe { sq.push(&entry) };
208 inner.poller_installed = true;
209 }
210
211 fn install_timeout(&self, inner: &mut UringInner, duration: Duration) {
212 unsafe {
213 std::ptr::replace(self.timespec, timespec(duration));
214 }
215 let entry = opcode::Timeout::new(self.timespec)
216 .build()
217 .user_data(TIMEOUT_USERDATA);
218
219 let mut sq = inner.uring.submission();
220 let _ = unsafe { sq.push(&entry) };
221 }
222
223 fn inner_park(&self, timeout: Option<Duration>) -> io::Result<()> {
224 let inner = unsafe { &mut *self.inner.get() };
225
226 #[allow(unused_mut)]
227 let mut need_wait = true;
228
229 #[cfg(feature = "sync")]
230 {
231 while let Some(w) = inner.waker_queue.pop() {
233 w.wake();
234 need_wait = false;
235 }
236
237 if need_wait {
239 inner
240 .shared_waker
241 .awake
242 .store(false, std::sync::atomic::Ordering::Release);
243 }
244
245 while let Some(w) = inner.waker_queue.pop() {
247 w.wake();
248 need_wait = false;
249 }
250 }
251
252 if need_wait {
253 let mut space = 0;
257 #[cfg(feature = "sync")]
258 if !inner.eventfd_installed {
259 space += 1;
260 }
261 #[cfg(feature = "poll-io")]
262 if !inner.poller_installed {
263 space += 1;
264 }
265 if timeout.is_some() {
266 space += 1;
267 }
268 if space != 0 {
269 Self::flush_space(inner, space)?;
270 }
271
272 #[cfg(feature = "poll-io")]
274 if !inner.poller_installed {
275 self.install_poller(inner, inner.poll.as_raw_fd());
276 }
277
278 #[cfg(feature = "sync")]
280 if !inner.eventfd_installed {
281 self.install_eventfd(inner, inner.shared_waker.as_raw_fd());
282 }
283
284 if let Some(duration) = timeout {
286 match inner.ext_arg {
287 false => {
290 self.install_timeout(inner, duration);
291 inner.uring.submit_and_wait(1)?;
292 }
293 true => {
296 let timespec = timespec(duration);
297 let args = io_uring::types::SubmitArgs::new().timespec(×pec);
298 if let Err(e) = inner.uring.submitter().submit_with_args(1, &args) {
299 if e.raw_os_error() != Some(libc::ETIME) {
300 return Err(e);
301 }
302 }
303 }
304 }
305 } else {
306 inner.uring.submit_and_wait(1)?;
308 }
309 } else {
310 inner.uring.submit()?;
312 }
313
314 #[cfg(feature = "sync")]
316 inner
317 .shared_waker
318 .awake
319 .store(true, std::sync::atomic::Ordering::Release);
320
321 inner.tick()?;
323
324 Ok(())
325 }
326
327 #[cfg(feature = "poll-io")]
328 #[inline]
329 pub(crate) fn register_poll_io(
330 this: &Rc<UnsafeCell<UringInner>>,
331 source: &mut impl mio::event::Source,
332 interest: mio::Interest,
333 ) -> io::Result<usize> {
334 let inner = unsafe { &mut *this.get() };
335 inner.poll.register(source, interest)
336 }
337
338 #[cfg(feature = "poll-io")]
339 #[inline]
340 pub(crate) fn deregister_poll_io(
341 this: &Rc<UnsafeCell<UringInner>>,
342 source: &mut impl mio::event::Source,
343 token: usize,
344 ) -> io::Result<()> {
345 let inner = unsafe { &mut *this.get() };
346 inner.poll.deregister(source, token)
347 }
348}
349
350impl Driver for IoUringDriver {
351 fn with<R>(&self, f: impl FnOnce() -> R) -> R {
353 let inner = Inner::Uring(self.inner.clone());
355 CURRENT.set(&inner, f)
356 }
357
358 fn submit(&self) -> io::Result<()> {
359 let inner = unsafe { &mut *self.inner.get() };
360 inner.submit()?;
361 inner.tick()?;
362 Ok(())
363 }
364
365 fn park(&self) -> io::Result<()> {
366 self.inner_park(None)
367 }
368
369 fn park_timeout(&self, duration: Duration) -> io::Result<()> {
370 self.inner_park(Some(duration))
371 }
372
373 #[cfg(feature = "sync")]
374 type Unpark = waker::UnparkHandle;
375
376 #[cfg(feature = "sync")]
377 fn unpark(&self) -> Self::Unpark {
378 UringInner::unpark(&self.inner)
379 }
380}
381
382impl UringInner {
383 fn tick(&mut self) -> io::Result<()> {
384 let cq = self.uring.completion();
385
386 for cqe in cq {
387 let index = cqe.user_data();
388 match index {
389 #[cfg(feature = "sync")]
390 EVENTFD_USERDATA => self.eventfd_installed = false,
391 #[cfg(feature = "poll-io")]
392 POLLER_USERDATA => {
393 self.poller_installed = false;
394 self.poll.tick(Some(Duration::ZERO))?;
395 }
396 _ if index >= MIN_REVERSED_USERDATA => (),
397 _ => unsafe { self.ops.complete(index as _, resultify(&cqe), cqe.flags()) },
400 }
401 }
402 Ok(())
403 }
404
405 fn submit(&mut self) -> io::Result<()> {
406 loop {
407 match self.uring.submit() {
408 #[cfg(feature = "unstable")]
409 Err(ref e)
410 if matches!(e.kind(), io::ErrorKind::Other | io::ErrorKind::ResourceBusy) =>
411 {
412 self.tick()?;
413 }
414 #[cfg(not(feature = "unstable"))]
415 Err(ref e)
416 if matches!(e.raw_os_error(), Some(libc::EAGAIN) | Some(libc::EBUSY)) =>
417 {
418 self.tick()?;
423 }
424 e => return e.map(|_| ()),
425 }
426 }
427 }
428
429 fn new_op<T: OpAble>(data: T, inner: &mut UringInner, driver: Inner) -> Op<T> {
430 Op {
431 driver,
432 index: inner.ops.insert(T::RET_IS_FD),
433 data: Some(data),
434 }
435 }
436
437 pub(crate) fn submit_with_data<T>(
438 this: &Rc<UnsafeCell<UringInner>>,
439 data: T,
440 ) -> io::Result<Op<T>>
441 where
442 T: OpAble,
443 {
444 let inner = unsafe { &mut *this.get() };
445 if inner.uring.submission().is_full() {
447 inner.submit()?;
448 }
449
450 let mut op = Self::new_op(data, inner, Inner::Uring(this.clone()));
452
453 let data_mut = unsafe { op.data.as_mut().unwrap_unchecked() };
455 let sqe = OpAble::uring_op(data_mut).user_data(op.index as _);
456
457 {
458 let mut sq = inner.uring.submission();
459
460 if unsafe { sq.push(&sqe).is_err() } {
462 unimplemented!("when is this hit?");
463 }
464 }
465
466 Ok(op)
476 }
477
478 pub(crate) fn poll_op(
479 this: &Rc<UnsafeCell<UringInner>>,
480 index: usize,
481 cx: &mut Context<'_>,
482 ) -> Poll<CompletionMeta> {
483 let inner = unsafe { &mut *this.get() };
484 let lifecycle = unsafe { inner.ops.slab.get(index).unwrap_unchecked() };
485 lifecycle.poll_op(cx)
486 }
487
488 #[cfg(feature = "poll-io")]
489 pub(crate) fn poll_legacy_op<T: OpAble>(
490 this: &Rc<UnsafeCell<Self>>,
491 data: &mut T,
492 cx: &mut Context<'_>,
493 ) -> Poll<CompletionMeta> {
494 let inner = unsafe { &mut *this.get() };
495 let (direction, index) = match data.legacy_interest() {
496 Some(x) => x,
497 None => {
498 return Poll::Ready(CompletionMeta {
501 result: OpAble::legacy_call(data),
502 flags: 0,
503 });
504 }
505 };
506
507 inner
509 .poll
510 .poll_syscall(cx, index, direction, || OpAble::legacy_call(data))
511 }
512
513 pub(crate) fn drop_op<T: 'static>(
514 this: &Rc<UnsafeCell<UringInner>>,
515 index: usize,
516 data: &mut Option<T>,
517 _skip_cancel: bool,
518 ) {
519 let inner = unsafe { &mut *this.get() };
520 if index == usize::MAX {
521 return;
523 }
524 if let Some(lifecycle) = inner.ops.slab.get(index) {
525 let _must_finished = lifecycle.drop_op(data);
526 #[cfg(feature = "async-cancel")]
527 if !_must_finished && !_skip_cancel {
528 unsafe {
529 let cancel = opcode::AsyncCancel::new(index as u64)
530 .build()
531 .user_data(u64::MAX);
532
533 if inner.uring.submission().push(&cancel).is_err() {
535 let _ = inner.submit();
536 let _ = inner.uring.submission().push(&cancel);
537 }
538 }
539 }
540 }
541 }
542
543 pub(crate) unsafe fn cancel_op(this: &Rc<UnsafeCell<UringInner>>, index: usize) {
544 let inner = &mut *this.get();
545 let cancel = opcode::AsyncCancel::new(index as u64)
546 .build()
547 .user_data(u64::MAX);
548 if inner.uring.submission().push(&cancel).is_err() {
549 let _ = inner.submit();
550 let _ = inner.uring.submission().push(&cancel);
551 }
552 }
553
554 #[cfg(feature = "sync")]
555 pub(crate) fn unpark(this: &Rc<UnsafeCell<UringInner>>) -> waker::UnparkHandle {
556 let inner = unsafe { &*this.get() };
557 let weak = std::sync::Arc::downgrade(&inner.shared_waker);
558 waker::UnparkHandle(weak)
559 }
560}
561
562impl AsRawFd for IoUringDriver {
563 fn as_raw_fd(&self) -> RawFd {
564 unsafe { (*self.inner.get()).uring.as_raw_fd() }
565 }
566}
567
568impl Drop for IoUringDriver {
569 fn drop(&mut self) {
570 trace!("MONOIO DEBUG[IoUringDriver]: drop");
571
572 unsafe { std::ptr::drop_in_place(self.timespec) };
574
575 #[cfg(feature = "sync")]
576 unsafe {
577 std::ptr::drop_in_place(self.eventfd_read_dst)
578 };
579
580 #[cfg(feature = "sync")]
582 {
583 use crate::driver::thread::{unregister_unpark_handle, unregister_waker_queue};
584 unregister_unpark_handle(self.thread_id);
585 unregister_waker_queue(self.thread_id);
586 }
587 }
588}
589
590impl Drop for UringInner {
591 fn drop(&mut self) {
592 let _ = self.uring.submitter().submit();
594 unsafe {
595 ManuallyDrop::drop(&mut self.uring);
596 }
597 }
598}
599
600impl Ops {
601 const fn new() -> Self {
602 Ops { slab: Slab::new() }
603 }
604
605 #[inline]
607 pub(crate) fn insert(&mut self, is_fd: bool) -> usize {
608 self.slab.insert(MaybeFdLifecycle::new(is_fd))
609 }
610
611 #[inline]
615 unsafe fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
616 let lifecycle = unsafe { self.slab.get(index).unwrap_unchecked() };
617 lifecycle.complete(result, flags);
618 }
619}
620
621#[inline]
622fn resultify(cqe: &cqueue::Entry) -> io::Result<u32> {
623 let res = cqe.result();
624
625 if res >= 0 {
626 Ok(res as u32)
627 } else {
628 Err(io::Error::from_raw_os_error(-res))
629 }
630}