1#[cfg(feature = "sync")]
4use std::sync::Arc;
5use std::{
6 cell::UnsafeCell,
7 io,
8 mem::ManuallyDrop,
9 os::unix::prelude::{AsRawFd, RawFd},
10 rc::Rc,
11 task::{Context, Poll},
12 time::Duration,
13};
14
15#[cfg(feature = "sync")]
16use crossbeam_queue::SegQueue;
17use io_uring::{cqueue, opcode, types::Timespec, IoUring};
18use lifecycle::MaybeFdLifecycle;
19
20use super::{
21 op::{CompletionMeta, Op, OpAble},
22 util::timespec,
23 Driver, Inner, CURRENT,
24};
25use crate::utils::slab::Slab;
26
27mod lifecycle;
28#[cfg(feature = "sync")]
29mod waker;
30#[cfg(feature = "sync")]
31pub(crate) use waker::UnparkHandle;
32
33#[allow(unused)]
34pub(crate) const CANCEL_USERDATA: u64 = u64::MAX;
35pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1;
36#[allow(unused)]
37pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2;
38#[cfg(feature = "poll-io")]
39pub(crate) const POLLER_USERDATA: u64 = u64::MAX - 3;
40
41pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 3;
42
43pub struct IoUringDriver {
45 inner: Rc<UnsafeCell<UringInner>>,
46
47 #[cfg(feature = "sync")]
49 thread_id: usize,
50}
51
52pub(crate) struct UringInner {
53 ops: Ops,
55
56 timespec: Timespec,
58
59 #[cfg(feature = "sync")]
61 eventfd_read_dst: [u8; 8],
62
63 #[cfg(feature = "poll-io")]
64 poll: super::poll::Poll,
65 #[cfg(feature = "poll-io")]
66 poller_installed: bool,
67
68 uring: ManuallyDrop<IoUring>,
70
71 #[cfg(feature = "sync")]
73 shared_waker: std::sync::Arc<waker::EventWaker>,
74
75 #[cfg(feature = "sync")]
77 eventfd_installed: bool,
78
79 #[cfg(feature = "sync")]
81 waker_queue: Arc<SegQueue<std::task::Waker>>,
82
83 ext_arg: bool,
85}
86
87struct Ops {
90 slab: Slab<MaybeFdLifecycle>,
91}
92
93impl IoUringDriver {
94 const DEFAULT_ENTRIES: u32 = 1024;
95
96 pub(crate) fn new(b: &io_uring::Builder) -> io::Result<IoUringDriver> {
97 Self::new_with_entries(b, Self::DEFAULT_ENTRIES)
98 }
99
100 #[cfg(not(feature = "sync"))]
101 pub(crate) fn new_with_entries(
102 urb: &io_uring::Builder,
103 entries: u32,
104 ) -> io::Result<IoUringDriver> {
105 let uring = ManuallyDrop::new(urb.build(entries)?);
106
107 let inner = Rc::new(UnsafeCell::new(UringInner {
108 #[cfg(feature = "poll-io")]
109 poll: super::poll::Poll::with_capacity(entries as usize)?,
110 #[cfg(feature = "poll-io")]
111 poller_installed: false,
112 ops: Ops::new(),
113 timespec: Timespec::new(),
114 ext_arg: uring.params().is_feature_ext_arg(),
115 uring,
116 }));
117
118 Ok(IoUringDriver { inner })
119 }
120
121 #[cfg(feature = "sync")]
122 pub(crate) fn new_with_entries(
123 urb: &io_uring::Builder,
124 entries: u32,
125 ) -> io::Result<IoUringDriver> {
126 use std::sync::Arc;
127
128 use crossbeam_queue::SegQueue;
129
130 let uring = ManuallyDrop::new(urb.build(entries)?);
131
132 let waker = {
134 let fd = crate::syscall!(eventfd@RAW(0, libc::EFD_CLOEXEC))?;
135 unsafe {
136 use std::os::unix::io::FromRawFd;
137 std::fs::File::from_raw_fd(fd)
138 }
139 };
140
141 let waker_queue = Arc::new(SegQueue::new());
142
143 let inner = Rc::new(UnsafeCell::new(UringInner {
144 #[cfg(feature = "poll-io")]
145 poller_installed: false,
146 #[cfg(feature = "poll-io")]
147 poll: super::poll::Poll::with_capacity(entries as usize)?,
148 ops: Ops::new(),
149 timespec: Timespec::new(),
150 ext_arg: uring.params().is_feature_ext_arg(),
151 uring,
152 shared_waker: std::sync::Arc::new(waker::EventWaker::new(waker)),
153 eventfd_installed: false,
154 eventfd_read_dst: [0_u8; 8],
155 waker_queue: waker_queue.clone(),
156 }));
157
158 let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
159 let driver = IoUringDriver { inner, thread_id };
160
161 super::thread::register_unpark_handle(thread_id, driver.unpark().into());
163 super::thread::register_waker_queue(thread_id, waker_queue);
164 Ok(driver)
165 }
166
167 #[allow(unused)]
168 fn num_operations(&self) -> usize {
169 let inner = self.inner.get();
170 unsafe { (*inner).ops.slab.len() }
171 }
172
173 fn flush_space(inner: &mut UringInner, need: usize) -> io::Result<()> {
175 let sq = inner.uring.submission();
176 debug_assert!(sq.capacity() >= need);
177 if sq.len() + need > sq.capacity() {
178 drop(sq);
179 inner.submit()?;
180 }
181 Ok(())
182 }
183
184 #[cfg(feature = "sync")]
185 fn install_eventfd(&self, inner: &mut UringInner, fd: RawFd) {
186 let entry = opcode::Read::new(
187 io_uring::types::Fd(fd),
188 inner.eventfd_read_dst.as_mut_ptr(),
189 8,
190 )
191 .build()
192 .user_data(EVENTFD_USERDATA);
193
194 let mut sq = inner.uring.submission();
195 let _ = unsafe { sq.push(&entry) };
196 inner.eventfd_installed = true;
197 }
198
199 #[cfg(feature = "poll-io")]
200 fn install_poller(&self, inner: &mut UringInner, fd: RawFd) {
201 let entry = opcode::PollAdd::new(io_uring::types::Fd(fd), libc::POLLIN as _)
202 .build()
203 .user_data(POLLER_USERDATA);
204
205 let mut sq = inner.uring.submission();
206 let _ = unsafe { sq.push(&entry) };
207 inner.poller_installed = true;
208 }
209
210 fn install_timeout(&self, inner: &mut UringInner, duration: Duration) {
211 inner.timespec = timespec(duration);
212 let entry = opcode::Timeout::new(&inner.timespec as *const Timespec)
213 .build()
214 .user_data(TIMEOUT_USERDATA);
215
216 let mut sq = inner.uring.submission();
217 let _ = unsafe { sq.push(&entry) };
218 }
219
220 fn inner_park(&self, timeout: Option<Duration>) -> io::Result<()> {
221 let inner = unsafe { &mut *self.inner.get() };
222
223 #[allow(unused_mut)]
224 let mut need_wait = true;
225
226 #[cfg(feature = "sync")]
227 {
228 while let Some(w) = inner.waker_queue.pop() {
230 w.wake();
231 need_wait = false;
232 }
233
234 if need_wait {
236 inner
237 .shared_waker
238 .awake
239 .store(false, std::sync::atomic::Ordering::Release);
240 }
241
242 while let Some(w) = inner.waker_queue.pop() {
244 w.wake();
245 need_wait = false;
246 }
247 }
248
249 if need_wait {
250 let mut space = 0;
254 #[cfg(feature = "sync")]
255 if !inner.eventfd_installed {
256 space += 1;
257 }
258 #[cfg(feature = "poll-io")]
259 if !inner.poller_installed {
260 space += 1;
261 }
262 if timeout.is_some() {
263 space += 1;
264 }
265 if space != 0 {
266 Self::flush_space(inner, space)?;
267 }
268
269 #[cfg(feature = "poll-io")]
271 if !inner.poller_installed {
272 self.install_poller(inner, inner.poll.as_raw_fd());
273 }
274
275 #[cfg(feature = "sync")]
277 if !inner.eventfd_installed {
278 self.install_eventfd(inner, inner.shared_waker.as_raw_fd());
279 }
280
281 if let Some(duration) = timeout {
283 match inner.ext_arg {
284 false => {
287 self.install_timeout(inner, duration);
288 inner.uring.submit_and_wait(1)?;
289 }
290 true => {
293 let timespec = timespec(duration);
294 let args = io_uring::types::SubmitArgs::new().timespec(×pec);
295 if let Err(e) = inner.uring.submitter().submit_with_args(1, &args) {
296 if e.raw_os_error() != Some(libc::ETIME) {
297 return Err(e);
298 }
299 }
300 }
301 }
302 } else {
303 inner.uring.submit_and_wait(1)?;
305 }
306 } else {
307 inner.uring.submit()?;
309 }
310
311 #[cfg(feature = "sync")]
313 inner
314 .shared_waker
315 .awake
316 .store(true, std::sync::atomic::Ordering::Release);
317
318 inner.tick()?;
320
321 Ok(())
322 }
323
324 #[cfg(feature = "poll-io")]
325 #[inline]
326 pub(crate) fn register_poll_io(
327 this: &Rc<UnsafeCell<UringInner>>,
328 source: &mut impl mio::event::Source,
329 interest: mio::Interest,
330 ) -> io::Result<usize> {
331 let inner = unsafe { &mut *this.get() };
332 inner.poll.register(source, interest)
333 }
334
335 #[cfg(feature = "poll-io")]
336 #[inline]
337 pub(crate) fn deregister_poll_io(
338 this: &Rc<UnsafeCell<UringInner>>,
339 source: &mut impl mio::event::Source,
340 token: usize,
341 ) -> io::Result<()> {
342 let inner = unsafe { &mut *this.get() };
343 inner.poll.deregister(source, token)
344 }
345}
346
347impl Driver for IoUringDriver {
348 fn with<R>(&self, f: impl FnOnce() -> R) -> R {
350 let inner = Inner::Uring(self.inner.clone());
352 CURRENT.set(&inner, f)
353 }
354
355 fn submit(&self) -> io::Result<()> {
356 let inner = unsafe { &mut *self.inner.get() };
357 inner.submit()?;
358 inner.tick()?;
359 Ok(())
360 }
361
362 fn park(&self) -> io::Result<()> {
363 self.inner_park(None)
364 }
365
366 fn park_timeout(&self, duration: Duration) -> io::Result<()> {
367 self.inner_park(Some(duration))
368 }
369
370 #[cfg(feature = "sync")]
371 type Unpark = waker::UnparkHandle;
372
373 #[cfg(feature = "sync")]
374 fn unpark(&self) -> Self::Unpark {
375 UringInner::unpark(&self.inner)
376 }
377}
378
379impl UringInner {
380 fn tick(&mut self) -> io::Result<()> {
381 let cq = self.uring.completion();
382
383 for cqe in cq {
384 let index = cqe.user_data();
385 match index {
386 #[cfg(feature = "sync")]
387 EVENTFD_USERDATA => self.eventfd_installed = false,
388 #[cfg(feature = "poll-io")]
389 POLLER_USERDATA => {
390 self.poller_installed = false;
391 self.poll.tick(Some(Duration::ZERO))?;
392 }
393 _ if index >= MIN_REVERSED_USERDATA => (),
394 _ => unsafe { self.ops.complete(index as _, resultify(&cqe), cqe.flags()) },
397 }
398 }
399 Ok(())
400 }
401
402 fn submit(&mut self) -> io::Result<()> {
403 loop {
404 match self.uring.submit() {
405 #[cfg(feature = "unstable")]
406 Err(ref e)
407 if matches!(e.kind(), io::ErrorKind::Other | io::ErrorKind::ResourceBusy) =>
408 {
409 self.tick()?;
410 }
411 #[cfg(not(feature = "unstable"))]
412 Err(ref e)
413 if matches!(e.raw_os_error(), Some(libc::EAGAIN) | Some(libc::EBUSY)) =>
414 {
415 self.tick()?;
420 }
421 e => return e.map(|_| ()),
422 }
423 }
424 }
425
426 fn new_op<T: OpAble>(data: T, inner: &mut UringInner, driver: Inner) -> Op<T> {
427 Op {
428 driver,
429 index: inner.ops.insert(T::RET_IS_FD),
430 data: Some(data),
431 }
432 }
433
434 pub(crate) fn submit_with_data<T>(
435 this: &Rc<UnsafeCell<UringInner>>,
436 data: T,
437 ) -> io::Result<Op<T>>
438 where
439 T: OpAble,
440 {
441 let inner = unsafe { &mut *this.get() };
442 if inner.uring.submission().is_full() {
444 inner.submit()?;
445 }
446
447 let mut op = Self::new_op(data, inner, Inner::Uring(this.clone()));
449
450 let data_mut = unsafe { op.data.as_mut().unwrap_unchecked() };
452 let sqe = OpAble::uring_op(data_mut).user_data(op.index as _);
453
454 {
455 let mut sq = inner.uring.submission();
456
457 if unsafe { sq.push(&sqe).is_err() } {
459 unimplemented!("when is this hit?");
460 }
461 }
462
463 Ok(op)
473 }
474
475 pub(crate) fn poll_op(
476 this: &Rc<UnsafeCell<UringInner>>,
477 index: usize,
478 cx: &mut Context<'_>,
479 ) -> Poll<CompletionMeta> {
480 let inner = unsafe { &mut *this.get() };
481 let lifecycle = unsafe { inner.ops.slab.get(index).unwrap_unchecked() };
482 lifecycle.poll_op(cx)
483 }
484
485 #[cfg(feature = "poll-io")]
486 pub(crate) fn poll_legacy_op<T: OpAble>(
487 this: &Rc<UnsafeCell<Self>>,
488 data: &mut T,
489 cx: &mut Context<'_>,
490 ) -> Poll<CompletionMeta> {
491 let inner = unsafe { &mut *this.get() };
492 let (direction, index) = match data.legacy_interest() {
493 Some(x) => x,
494 None => {
495 return Poll::Ready(CompletionMeta {
498 result: OpAble::legacy_call(data),
499 flags: 0,
500 });
501 }
502 };
503
504 inner
506 .poll
507 .poll_syscall(cx, index, direction, || OpAble::legacy_call(data))
508 }
509
510 pub(crate) fn drop_op<T: 'static>(
511 this: &Rc<UnsafeCell<UringInner>>,
512 index: usize,
513 data: &mut Option<T>,
514 _skip_cancel: bool,
515 ) {
516 let inner = unsafe { &mut *this.get() };
517 if index == usize::MAX {
518 return;
520 }
521 if let Some(lifecycle) = inner.ops.slab.get(index) {
522 let _must_finished = lifecycle.drop_op(data);
523 #[cfg(feature = "async-cancel")]
524 if !_must_finished && !_skip_cancel {
525 unsafe {
526 let cancel = opcode::AsyncCancel::new(index as u64)
527 .build()
528 .user_data(u64::MAX);
529
530 if inner.uring.submission().push(&cancel).is_err() {
532 let _ = inner.submit();
533 let _ = inner.uring.submission().push(&cancel);
534 }
535 }
536 }
537 }
538 }
539
540 pub(crate) unsafe fn cancel_op(this: &Rc<UnsafeCell<UringInner>>, index: usize) {
541 let inner = &mut *this.get();
542 let cancel = opcode::AsyncCancel::new(index as u64)
543 .build()
544 .user_data(u64::MAX);
545 if inner.uring.submission().push(&cancel).is_err() {
546 let _ = inner.submit();
547 let _ = inner.uring.submission().push(&cancel);
548 }
549 }
550
551 #[cfg(feature = "sync")]
552 pub(crate) fn unpark(this: &Rc<UnsafeCell<UringInner>>) -> waker::UnparkHandle {
553 let inner = unsafe { &*this.get() };
554 let weak = std::sync::Arc::downgrade(&inner.shared_waker);
555 waker::UnparkHandle(weak)
556 }
557}
558
559impl AsRawFd for IoUringDriver {
560 fn as_raw_fd(&self) -> RawFd {
561 unsafe { (*self.inner.get()).uring.as_raw_fd() }
562 }
563}
564
565impl Drop for IoUringDriver {
566 fn drop(&mut self) {
567 trace!("MONOIO DEBUG[IoUringDriver]: drop");
568
569 #[cfg(feature = "sync")]
571 {
572 use crate::driver::thread::{unregister_unpark_handle, unregister_waker_queue};
573 unregister_unpark_handle(self.thread_id);
574 unregister_waker_queue(self.thread_id);
575 }
576 }
577}
578
579impl Drop for UringInner {
580 fn drop(&mut self) {
581 let _ = self.uring.submitter().submit();
583 unsafe {
584 ManuallyDrop::drop(&mut self.uring);
585 }
586 }
587}
588
589impl Ops {
590 const fn new() -> Self {
591 Ops { slab: Slab::new() }
592 }
593
594 #[inline]
596 pub(crate) fn insert(&mut self, is_fd: bool) -> usize {
597 self.slab.insert(MaybeFdLifecycle::new(is_fd))
598 }
599
600 #[inline]
604 unsafe fn complete(&mut self, index: usize, result: io::Result<u32>, flags: u32) {
605 let lifecycle = unsafe { self.slab.get(index).unwrap_unchecked() };
606 lifecycle.complete(result, flags);
607 }
608}
609
610#[inline]
611fn resultify(cqe: &cqueue::Entry) -> io::Result<u32> {
612 let res = cqe.result();
613
614 if res >= 0 {
615 Ok(res as u32)
616 } else {
617 Err(io::Error::from_raw_os_error(-res))
618 }
619}