monoio/driver/
mod.rs

1/// Monoio Driver.
2#[allow(dead_code)]
3pub(crate) mod op;
4#[cfg(feature = "poll-io")]
5pub(crate) mod poll;
6#[cfg(any(feature = "legacy", feature = "poll-io"))]
7pub(crate) mod ready;
8#[cfg(any(feature = "legacy", feature = "poll-io"))]
9pub(crate) mod scheduled_io;
10#[allow(dead_code)]
11pub(crate) mod shared_fd;
12#[cfg(feature = "sync")]
13pub(crate) mod thread;
14
15#[cfg(feature = "legacy")]
16mod legacy;
17#[cfg(all(target_os = "linux", feature = "iouring"))]
18mod uring;
19
20#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
21#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
22pub(crate) mod iocp;
23
24mod util;
25
26use std::{
27    io,
28    task::{Context, Poll},
29    time::Duration,
30};
31
32#[allow(unreachable_pub)]
33#[cfg(feature = "legacy")]
34pub use self::legacy::LegacyDriver;
35#[cfg(feature = "legacy")]
36use self::legacy::LegacyInner;
37use self::op::{CompletionMeta, Op, OpAble};
38#[cfg(all(target_os = "linux", feature = "iouring"))]
39pub use self::uring::IoUringDriver;
40#[cfg(all(target_os = "linux", feature = "iouring"))]
41use self::uring::UringInner;
42
43/// Unpark a runtime of another thread.
44pub(crate) mod unpark {
45    #[allow(unreachable_pub, dead_code)]
46    pub trait Unpark: Sync + Send + 'static {
47        /// Unblocks a thread that is blocked by the associated `Park` handle.
48        ///
49        /// Calling `unpark` atomically makes available the unpark token, if it
50        /// is not already available.
51        ///
52        /// # Panics
53        ///
54        /// This function **should** not panic, but ultimately, panics are left
55        /// as an implementation detail. Refer to the documentation for
56        /// the specific `Unpark` implementation
57        fn unpark(&self) -> std::io::Result<()>;
58    }
59}
60
61impl unpark::Unpark for Box<dyn unpark::Unpark> {
62    fn unpark(&self) -> io::Result<()> {
63        (**self).unpark()
64    }
65}
66
67impl unpark::Unpark for std::sync::Arc<dyn unpark::Unpark> {
68    fn unpark(&self) -> io::Result<()> {
69        (**self).unpark()
70    }
71}
72
73/// Core driver trait.
74pub trait Driver {
75    /// Run with driver TLS.
76    fn with<R>(&self, f: impl FnOnce() -> R) -> R;
77    /// Submit ops to kernel and process returned events.
78    fn submit(&self) -> io::Result<()>;
79    /// Wait infinitely and process returned events.
80    fn park(&self) -> io::Result<()>;
81    /// Wait with timeout and process returned events.
82    fn park_timeout(&self, duration: Duration) -> io::Result<()>;
83
84    /// The struct to wake thread from another.
85    #[cfg(feature = "sync")]
86    type Unpark: unpark::Unpark;
87
88    /// Get Unpark.
89    #[cfg(feature = "sync")]
90    fn unpark(&self) -> Self::Unpark;
91}
92
93scoped_thread_local!(pub(crate) static CURRENT: Inner);
94
95#[derive(Clone)]
96pub(crate) enum Inner {
97    #[cfg(all(target_os = "linux", feature = "iouring"))]
98    Uring(std::rc::Rc<std::cell::UnsafeCell<UringInner>>),
99    #[cfg(feature = "legacy")]
100    Legacy(std::rc::Rc<std::cell::UnsafeCell<LegacyInner>>),
101}
102
103impl Inner {
104    fn submit_with<T: OpAble>(&self, data: T) -> io::Result<Op<T>> {
105        match self {
106            #[cfg(all(target_os = "linux", feature = "iouring"))]
107            Inner::Uring(this) => UringInner::submit_with_data(this, data),
108            #[cfg(feature = "legacy")]
109            Inner::Legacy(this) => LegacyInner::submit_with_data(this, data),
110            #[cfg(all(
111                not(feature = "legacy"),
112                not(all(target_os = "linux", feature = "iouring"))
113            ))]
114            _ => {
115                util::feature_panic();
116            }
117        }
118    }
119
120    #[allow(unused)]
121    fn poll_op<T: OpAble>(
122        &self,
123        data: &mut T,
124        index: usize,
125        cx: &mut Context<'_>,
126    ) -> Poll<CompletionMeta> {
127        match self {
128            #[cfg(all(target_os = "linux", feature = "iouring"))]
129            Inner::Uring(this) => UringInner::poll_op(this, index, cx),
130            #[cfg(feature = "legacy")]
131            Inner::Legacy(this) => LegacyInner::poll_op::<T>(this, data, cx),
132            #[cfg(all(
133                not(feature = "legacy"),
134                not(all(target_os = "linux", feature = "iouring"))
135            ))]
136            _ => {
137                util::feature_panic();
138            }
139        }
140    }
141
142    #[cfg(feature = "poll-io")]
143    fn poll_legacy_op<T: OpAble>(
144        &self,
145        data: &mut T,
146        cx: &mut Context<'_>,
147    ) -> Poll<CompletionMeta> {
148        match self {
149            #[cfg(all(target_os = "linux", feature = "iouring"))]
150            Inner::Uring(this) => UringInner::poll_legacy_op(this, data, cx),
151            #[cfg(feature = "legacy")]
152            Inner::Legacy(this) => LegacyInner::poll_op::<T>(this, data, cx),
153            #[cfg(all(
154                not(feature = "legacy"),
155                not(all(target_os = "linux", feature = "iouring"))
156            ))]
157            _ => {
158                util::feature_panic();
159            }
160        }
161    }
162
163    #[cfg(all(target_os = "linux", feature = "iouring"))]
164    #[inline]
165    fn drop_op<T: 'static>(&self, index: usize, data: &mut Option<T>, skip_cancel: bool) {
166        match self {
167            Inner::Uring(this) => UringInner::drop_op(this, index, data, skip_cancel),
168            #[cfg(feature = "legacy")]
169            Inner::Legacy(_) => {}
170        }
171    }
172
173    #[allow(unused)]
174    pub(super) unsafe fn cancel_op(&self, op_canceller: &op::OpCanceller) {
175        match self {
176            #[cfg(all(target_os = "linux", feature = "iouring"))]
177            Inner::Uring(this) => UringInner::cancel_op(this, op_canceller.index),
178            #[cfg(feature = "legacy")]
179            Inner::Legacy(this) => {
180                if let Some(direction) = op_canceller.direction {
181                    LegacyInner::cancel_op(this, op_canceller.index, direction)
182                }
183            }
184            #[cfg(all(
185                not(feature = "legacy"),
186                not(all(target_os = "linux", feature = "iouring"))
187            ))]
188            _ => {
189                util::feature_panic();
190            }
191        }
192    }
193
194    #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
195    fn is_legacy(&self) -> bool {
196        matches!(self, Inner::Legacy(..))
197    }
198
199    #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
200    fn is_legacy(&self) -> bool {
201        false
202    }
203
204    #[allow(unused)]
205    #[cfg(not(all(target_os = "linux", feature = "iouring")))]
206    fn is_legacy(&self) -> bool {
207        true
208    }
209}
210
211/// The unified UnparkHandle.
212#[cfg(feature = "sync")]
213#[derive(Clone)]
214pub(crate) enum UnparkHandle {
215    #[cfg(all(target_os = "linux", feature = "iouring"))]
216    Uring(self::uring::UnparkHandle),
217    #[cfg(feature = "legacy")]
218    Legacy(self::legacy::UnparkHandle),
219}
220
221#[cfg(feature = "sync")]
222impl unpark::Unpark for UnparkHandle {
223    fn unpark(&self) -> io::Result<()> {
224        match self {
225            #[cfg(all(target_os = "linux", feature = "iouring"))]
226            UnparkHandle::Uring(inner) => inner.unpark(),
227            #[cfg(feature = "legacy")]
228            UnparkHandle::Legacy(inner) => inner.unpark(),
229            #[cfg(all(
230                not(feature = "legacy"),
231                not(all(target_os = "linux", feature = "iouring"))
232            ))]
233            _ => {
234                util::feature_panic();
235            }
236        }
237    }
238}
239
240#[cfg(all(feature = "sync", target_os = "linux", feature = "iouring"))]
241impl From<self::uring::UnparkHandle> for UnparkHandle {
242    fn from(inner: self::uring::UnparkHandle) -> Self {
243        Self::Uring(inner)
244    }
245}
246
247#[cfg(all(feature = "sync", feature = "legacy"))]
248impl From<self::legacy::UnparkHandle> for UnparkHandle {
249    fn from(inner: self::legacy::UnparkHandle) -> Self {
250        Self::Legacy(inner)
251    }
252}
253
254#[cfg(feature = "sync")]
255impl UnparkHandle {
256    #[allow(unused)]
257    pub(crate) fn current() -> Self {
258        CURRENT.with(|inner| match inner {
259            #[cfg(all(target_os = "linux", feature = "iouring"))]
260            Inner::Uring(this) => UringInner::unpark(this).into(),
261            #[cfg(feature = "legacy")]
262            Inner::Legacy(this) => LegacyInner::unpark(this).into(),
263        })
264    }
265}