1use std::{
4 cell::UnsafeCell,
5 io,
6 mem::ManuallyDrop,
7 os::unix::prelude::{AsRawFd, RawFd},
8 rc::Rc,
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use io_uring::{cqueue, opcode, types::Timespec, IoUring};
14use lifecycle::MaybeFdLifecycle;
15
16use super::{
17 op::{CompletionMeta, Op, OpAble},
18 util::timespec,
21 Driver,
22 Inner,
23 CURRENT,
24};
25use crate::utils::slab::Slab;
26
27mod lifecycle;
28#[cfg(feature = "sync")]
29mod waker;
30#[cfg(feature = "sync")]
31pub(crate) use waker::UnparkHandle;
32
33#[allow(unused)]
34pub(crate) const CANCEL_USERDATA: u64 = u64::MAX;
35pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1;
36#[allow(unused)]
37pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2;
38#[cfg(feature = "poll-io")]
39pub(crate) const POLLER_USERDATA: u64 = u64::MAX - 3;
40
41pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 3;
42
43pub struct IoUringDriver {
45 inner: Rc<UnsafeCell<UringInner>>,
46
47 timespec: *mut Timespec,
49
50 #[cfg(feature = "sync")]
52 eventfd_read_dst: *mut u8,
53
54 #[cfg(feature = "sync")]
56 thread_id: usize,
57}
58
59pub(crate) struct UringInner {
60 ops: Ops,
62
63 #[cfg(feature = "poll-io")]
64 poll: super::poll::Poll,
65 #[cfg(feature = "poll-io")]
66 poller_installed: bool,
67
68 uring: ManuallyDrop<IoUring>,
70
71 #[cfg(feature = "sync")]
73 shared_waker: std::sync::Arc<waker::EventWaker>,
74
75 #[cfg(feature = "sync")]
77 eventfd_installed: bool,
78
79 #[cfg(feature = "sync")]
81 waker_receiver: flume::Receiver<std::task::Waker>,
82
83 ext_arg: bool,
85}
86
87struct Ops {
90 slab: Slab<MaybeFdLifecycle>,
91}
92
93impl IoUringDriver {
94 const DEFAULT_ENTRIES: u32 = 1024;
95
96 pub(crate) fn new(b: &io_uring::Builder) -> io::Result<IoUringDriver> {
97 Self::new_with_entries(b, Self::DEFAULT_ENTRIES)
98 }
99
100 #[cfg(not(feature = "sync"))]
101 pub(crate) fn new_with_entries(
102 urb: &io_uring::Builder,
103 entries: u32,
104 ) -> io::Result<IoUringDriver> {
105 let uring = ManuallyDrop::new(urb.build(entries)?);
106
107 let inner = Rc::new(UnsafeCell::new(UringInner {
108 #[cfg(feature = "poll-io")]
109 poll: super::poll::Poll::with_capacity(entries as usize)?,
110 #[cfg(feature = "poll-io")]
111 poller_installed: false,
112 ops: Ops::new(),
113 ext_arg: uring.params().is_feature_ext_arg(),
114 uring,
115 }));
116
117 Ok(IoUringDriver {
118 inner,
119 timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
120 })
121 }
122
123 #[cfg(feature = "sync")]
124 pub(crate) fn new_with_entries(
125 urb: &io_uring::Builder,
126 entries: u32,
127 ) -> io::Result<IoUringDriver> {
128 let uring = ManuallyDrop::new(urb.build(entries)?);
129
130 let waker = {
132 let fd = crate::syscall!(eventfd@RAW(0, libc::EFD_CLOEXEC))?;
133 unsafe {
134 use std::os::unix::io::FromRawFd;
135 std::fs::File::from_raw_fd(fd)
136 }
137 };
138
139 let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
140
141 let inner = Rc::new(UnsafeCell::new(UringInner {
142 #[cfg(feature = "poll-io")]
143 poller_installed: false,
144 #[cfg(feature = "poll-io")]
145 poll: super::poll::Poll::with_capacity(entries as usize)?,
146 ops: Ops::new(),
147 ext_arg: uring.params().is_feature_ext_arg(),
148 uring,
149 shared_waker: std::sync::Arc::new(waker::EventWaker::new(waker)),
150 eventfd_installed: false,
151 waker_receiver,
152 }));
153
154 let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
155 let driver = IoUringDriver {
156 inner,
157 timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec,
158 eventfd_read_dst: Box::leak(Box::new([0_u8; 8])) as *mut u8,
159 thread_id,
160 };
161
162 super::thread::register_unpark_handle(thread_id, driver.unpark().into());
164 super::thread::register_waker_sender(thread_id, waker_sender);
165 Ok(driver)
166 }
167
168 #[allow(unused)]
169 fn num_operations(&self) -> usize {
170 let inner = self.inner.get();
171 unsafe { (*inner).ops.slab.len() }
172 }
173
174 fn flush_space(inner: &mut UringInner, need: usize) -> io::Result<()> {
176 let sq = inner.uring.submission();
177 debug_assert!(sq.capacity() >= need);
178 if sq.len() + need > sq.capacity() {
179 drop(sq);
180 inner.submit()?;
181 }
182 Ok(())
183 }
184
185 #[cfg(feature = "sync")]
186 fn install_eventfd(&self, inner: &mut UringInner, fd: RawFd) {
187 let entry = opcode::Read::new(io_uring::types::Fd(fd), self.eventfd_read_dst, 8)
188 .build()
189 .user_data(EVENTFD_USERDATA);
190
191 let mut sq = inner.uring.submission();
192 let _ = unsafe { sq.push(&entry) };
193 inner.eventfd_installed = true;
194 }
195
196 #[cfg(feature = "poll-io")]
197 fn install_poller(&self, inner: &mut UringInner, fd: RawFd) {
198 let entry = opcode::PollAdd::new(io_uring::types::Fd(fd), libc::POLLIN as _)
199 .build()
200 .user_data(POLLER_USERDATA);
201
202 let mut sq = inner.uring.submission();
203 let _ = unsafe { sq.push(&entry) };
204 inner.poller_installed = true;
205 }
206
207 fn install_timeout(&self, inner: &mut UringInner, duration: Duration) {
208 let timespec = timespec(duration);
209 unsafe {
210 std::ptr::replace(self.timespec, timespec);
211 }
212 let entry = opcode::Timeout::new(self.timespec as *const Timespec)
213 .build()
214 .user_data(TIMEOUT_USERDATA);
215
216 let mut sq = inner.uring.submission();
217 let _ = unsafe { sq.push(&entry) };
218 }
219
220 fn inner_park(&self, timeout: Option<Duration>) -> io::Result<()> {
221 let inner = unsafe { &mut *self.inner.get() };
222
223 #[allow(unused_mut)]
224 let mut need_wait = true;
225
226 #[cfg(feature = "sync")]
227 {
228 while let Ok(w) = inner.waker_receiver.try_recv() {
230 w.wake();
231 need_wait = false;
232 }
233
234 if need_wait {
236 inner
237 .shared_waker
238 .awake
239 .store(false, std::sync::atomic::Ordering::Release);
240 }
241
242 while let Ok(w) = inner.waker_receiver.try_recv() {
244 w.wake();
245 need_wait = false;
246 }
247 }
248
249 if need_wait {
250 let mut space = 0;
254 #[cfg(feature = "sync")]
255 if !inner.eventfd_installed {
256 space += 1;
257 }
258 #[cfg(feature = "poll-io")]
259 if !inner.poller_installed {
260 space += 1;
261 }
262 if timeout.is_some() {
263 space += 1;
264 }
265 if space != 0 {
266 Self::flush_space(inner, space)?;
267 }
268
269 #[cfg(feature = "poll-io")]
271 if !inner.poller_installed {
272 self.install_poller(inner, inner.poll.as_raw_fd());
273 }
274
275 #[cfg(feature = "sync")]
277 if !inner.eventfd_installed {
278 self.install_eventfd(inner, inner.shared_waker.as_raw_fd());
279 }
280
281 if let Some(duration) = timeout {
283 match inner.ext_arg {
284 false => {
287 self.install_timeout(inner, duration);
288 inner.uring.submit_and_wait(1)?;
289 }
290 true => {
293 let timespec = timespec(duration);
294 let args = io_uring::types::SubmitArgs::new().timespec(×pec);
295 if let Err(e) = inner.uring.submitter().submit_with_args(1, &args) {
296 if e.raw_os_error() != Some(libc::ETIME) {
297 return Err(e);
298 }
299 }
300 }
301 }
302 } else {
303 inner.uring.submit_and_wait(1)?;
305 }
306 } else {
307 inner.uring.submit()?;
309 }
310
311 #[cfg(feature = "sync")]
313 inner
314 .shared_waker
315 .awake
316 .store(true, std::sync::atomic::Ordering::Release);
317
318 inner.tick()?;
320
321 Ok(())
322 }
323
324 #[cfg(feature = "poll-io")]
325 #[inline]
326 pub(crate) fn register_poll_io(
327 this: &Rc<UnsafeCell<UringInner>>,
328 source: &mut impl mio::event::Source,
329 interest: mio::Interest,
330 ) -> io::Result<usize> {
331 let inner = unsafe { &mut *this.get() };
332 inner.poll.register(source, interest)
333 }
334
335 #[cfg(feature = "poll-io")]
336 #[inline]
337 pub(crate) fn deregister_poll_io(
338 this: &Rc<UnsafeCell<UringInner>>,
339 source: &mut impl mio::event::Source,
340 token: usize,
341 ) -> io::Result<()> {
342 let inner = unsafe { &mut *this.get() };
343 inner.poll.deregister(source, token)
344 }
345}
346
347impl Driver for IoUringDriver {
348 fn with<R>(&self, f: impl FnOnce() -> R) -> R {
350 let inner = Inner::Uring(self.inner.clone());
352 CURRENT.set(&inner, f)
353 }
354
355 fn submit(&self) -> io::Result<()> {
356 let inner = unsafe { &mut *self.inner.get() };
357 inner.submit()?;
358 inner.tick()?;
359 Ok(())
360 }
361
362 fn park(&self) -> io::Result<()> {
363 self.inner_park(None)
364 }
365
366 fn park_timeout(&self, duration: Duration) -> io::Result<()> {
367 self.inner_park(Some(duration))
368 }
369
370 #[cfg(feature = "sync")]
371 type Unpark = waker::UnparkHandle;
372
373 #[cfg(feature = "sync")]
374 fn unpark(&self) -> Self::Unpark {
375 UringInner::unpark(&self.inner)
376 }
377}
378
379impl UringInner {
380 fn tick(&mut self) -> io::Result<()> {
381 let cq = self.uring.completion();
382
383 for cqe in cq {
384 let index = cqe.user_data();
385 match index {
386 #[cfg(feature = "sync")]
387 EVENTFD_USERDATA => self.eventfd_installed = false,
388 #[cfg(feature = "poll-io")]
389 POLLER_USERDATA => {
390 self.poller_installed = false;
391 self.poll.tick(Some(Duration::ZERO))?;
392 }
393 _ if index >= MIN_REVERSED_USERDATA => (),
394 _ => unsafe { self.ops.complete(index as _, resultify(&cqe), cqe.flags()) },
397 }
398 }
399 Ok(())
400 }
401
402 fn submit(&mut self) -> io::Result<()> {
403 loop {
404 match self.uring.submit() {
405 #[cfg(feature = "unstable")]
406 Err(ref e)
407 if matches!(e.kind(), io::ErrorKind::Other | io::ErrorKind::ResourceBusy) =>
408 {
409 self.tick()?;
410 }
411 #[cfg(not(feature = "unstable"))]
412 Err(ref e)
413 if matches!(e.raw_os_error(), Some(libc::EAGAIN) | Some(libc::EBUSY)) =>
414 {
415 self.tick()?;
420 }
421 e => return e.map(|_| ()),
422 }
423 }
424 }
425
426 fn new_op<T: OpAble>(data: T, inner: &mut UringInner, driver: Inner) -> Op<T> {
427 Op {
428 driver,
429 index: inner.ops.insert(T::RET_IS_FD),
430 data: Some(data),
431 }
432 }
433
434 pub(crate) fn submit_with_data<T>(
435 this: &Rc<UnsafeCell<UringInner>>,
436 data: T,
437 ) -> io::Result<Op<T>>
438 where
439 T: OpAble,
440 {
441 let inner = unsafe { &mut *this.get() };
442 if inner.uring.submission().is_full() {
444 inner.submit()?;
445 }
446
447 let mut op = Self::new_op(data, inner, Inner::Uring(this.clone()));
449
450 let data_mut = unsafe { op.data.as_mut().unwrap_unchecked() };
452 let sqe = OpAble::uring_op(data_mut).user_data(op.index as _);
453
454 {
455 let mut sq = inner.uring.submission();
456
457 if unsafe { sq.push(&sqe).is_err() } {
459 unimplemented!("when is this hit?");
460 }
461 }
462
463 Ok(op)
473 }
474
475 pub(crate) fn poll_op(
476 this: &Rc<UnsafeCell<UringInner>>,
477 index: usize,
478 cx: &mut Context<'_>,
479 ) -> Poll<CompletionMeta> {
480 let inner = unsafe { &mut *this.get() };
481 let lifecycle = unsafe { inner.ops.slab.get(index).unwrap_unchecked() };
482 lifecycle.poll_op(cx)
483 }
484
485 #[cfg(feature = "poll-io")]
486 pub(crate) fn poll_legacy_op<T: OpAble>(
487 this: &Rc<UnsafeCell<Self>>,
488 data: &mut T,
489 cx: &mut Context<'_>,
490 ) -> Poll<CompletionMeta> {
491 let inner = unsafe { &mut *this.get() };
492 let (direction, index) = match data.legacy_interest() {
493 Some(x) => x,
494 None => {
495 return Poll::Ready(CompletionMeta {
498 result: OpAble::legacy_call(data),
499 flags: 0,
500 });
501 }
502 };
503
504 inner
506 .poll
507 .poll_syscall(cx, index, direction, || OpAble::legacy_call(data))
508 }
509
510 pub(crate) fn drop_op<T: 'static>(
511 this: &Rc<UnsafeCell<UringInner>>,
512 index: usize,
513 data: &mut Option<T>,
514 _skip_cancel: bool,
515 ) {
516 let inner = unsafe { &mut *this.get() };
517 if index == usize::MAX {
518 return;
520 }
521 if let Some(lifecycle) = inner.ops.slab.get(index) {
522 let _must_finished = lifecycle.drop_op(data);
523 #[cfg(feature = "async-cancel")]
524 if !_must_finished && !_skip_cancel {
525 unsafe {
526 let cancel = opcode::AsyncCancel::new(index as u64)
527 .build()
528 .user_data(u64::MAX);
529
530 if inner.uring.submission().push(&cancel).is_err() {
532 let _ = inner.submit();
533 let _ = inner.uring.submission().push(&cancel);
534 }
535 }
536 }
537 }
538 }
539
540 pub(crate) unsafe fn cancel_op(this: &Rc<UnsafeCell<UringInner>>, index: usize) {
541 let inner = &mut *this.get();
542 let cancel = opcode::AsyncCancel::new(index as u64)
543 .build()
544 .user_data(u64::MAX);
545 if inner.uring.submission().push(&cancel).is_err() {
546 let _ = inner.submit();
547 let _ = inner.uring.submission().push(&cancel);
548 }
549 }
550
551 #[cfg(feature = "sync")]
552 pub(crate) fn unpark(this: &Rc<UnsafeCell<UringInner>>) -> waker::UnparkHandle {
553 let inner = unsafe { &*this.get() };
554 let weak = std::sync::Arc::downgrade(&inner.shared_waker);
555 waker::UnparkHandle(weak)
556 }
557}
558
559impl AsRawFd for IoUringDriver {
560 fn as_raw_fd(&self) -> RawFd {
561 unsafe { (*self.inner.get()).uring.as_raw_fd() }
562 }
563}
564
565impl Drop for IoUringDriver {
566 fn drop(&mut self) {
567 trace!("MONOIO DEBUG[IoUringDriver]: drop");
568
569 unsafe { std::ptr::drop_in_place(self.timespec) };
571
572 #[cfg(feature = "sync")]
573 unsafe {
574 std::ptr::drop_in_place(self.eventfd_read_dst)
575 };
576
577 #[cfg(feature = "sync")]
579 {
580 use crate::driver::thread::{unregister_unpark_handle, unregister_waker_sender};
581 unregister_unpark_handle(self.thread_id);
582 unregister_waker_sender(self.thread_id);
583 }
584 }
585}
586
587impl Drop for UringInner {
588 fn drop(&mut self) {
589 let _ = self.uring.submitter().submit();
591 unsafe {
592 ManuallyDrop::drop(&mut self.uring);
593 }
594 }
595}
596
597impl Ops {
598 const fn new() -> Self {
599 Ops { slab: Slab::new() }
600 }
601
602 #[inline]
604 pub(crate) fn insert(&mut self, is_fd: bool) -> usize {
605 self.slab.insert(MaybeFdLifecycle::new(is_fd))
606 }
607
608 #[inline]
612 unsafe fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
613 let lifecycle = unsafe { self.slab.get(index).unwrap_unchecked() };
614 lifecycle.complete(result, flags);
615 }
616}
617
618#[inline]
619fn resultify(cqe: &cqueue::Entry) -> io::Result<u32> {
620 let res = cqe.result();
621
622 if res >= 0 {
623 Ok(res as u32)
624 } else {
625 Err(io::Error::from_raw_os_error(-res))
626 }
627}