monoio/driver/legacy/
mod.rs

1//! Monoio Legacy Driver.
2
3#[cfg(feature = "sync")]
4use std::sync::Arc;
5use std::{
6    cell::UnsafeCell,
7    io,
8    rc::Rc,
9    task::{Context, Poll},
10    time::Duration,
11};
12
13use super::{
14    op::{CompletionMeta, Op, OpAble},
15    ready::{self, Ready},
16    scheduled_io::ScheduledIo,
17    Driver, Inner, CURRENT,
18};
19use crate::utils::slab::Slab;
20
21#[cfg(feature = "sync")]
22mod waker;
23#[cfg(feature = "sync")]
24use crossbeam_queue::SegQueue;
25#[cfg(feature = "sync")]
26pub(crate) use waker::UnparkHandle;
27
28pub(crate) struct LegacyInner {
29    pub(crate) io_dispatch: Slab<ScheduledIo>,
30    #[cfg(unix)]
31    events: mio::Events,
32    #[cfg(unix)]
33    poll: mio::Poll,
34    #[cfg(windows)]
35    events: crate::driver::iocp::Events,
36    #[cfg(windows)]
37    poll: crate::driver::iocp::Poller,
38
39    #[cfg(feature = "sync")]
40    shared_waker: std::sync::Arc<waker::EventWaker>,
41
42    // Waker receiver
43    #[cfg(feature = "sync")]
44    waker_queue: Arc<SegQueue<std::task::Waker>>,
45}
46
47/// Driver with Poll-like syscall.
48#[allow(unreachable_pub)]
49pub struct LegacyDriver {
50    inner: Rc<UnsafeCell<LegacyInner>>,
51
52    // Used for drop
53    #[cfg(feature = "sync")]
54    thread_id: usize,
55}
56
57#[cfg(feature = "sync")]
58const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
59
60#[allow(dead_code)]
61impl LegacyDriver {
62    const DEFAULT_ENTRIES: u32 = 1024;
63
64    pub(crate) fn new() -> io::Result<Self> {
65        Self::new_with_entries(Self::DEFAULT_ENTRIES)
66    }
67
68    pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
69        #[cfg(unix)]
70        let poll = mio::Poll::new()?;
71        #[cfg(windows)]
72        let poll = crate::driver::iocp::Poller::new()?;
73
74        #[cfg(all(unix, feature = "sync"))]
75        let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
76            poll.registry(),
77            TOKEN_WAKEUP,
78        )?));
79        #[cfg(all(windows, feature = "sync"))]
80        let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
81            crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
82        ));
83        #[cfg(feature = "sync")]
84        let waker_queue = Arc::new(SegQueue::new());
85        #[cfg(feature = "sync")]
86        let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
87
88        let inner = LegacyInner {
89            io_dispatch: Slab::new(),
90            #[cfg(unix)]
91            events: mio::Events::with_capacity(entries as usize),
92            #[cfg(unix)]
93            poll,
94            #[cfg(windows)]
95            events: crate::driver::iocp::Events::with_capacity(entries as usize),
96            #[cfg(windows)]
97            poll,
98            #[cfg(feature = "sync")]
99            shared_waker,
100            #[cfg(feature = "sync")]
101            waker_queue: waker_queue.clone(),
102        };
103        let driver = Self {
104            inner: Rc::new(UnsafeCell::new(inner)),
105            #[cfg(feature = "sync")]
106            thread_id,
107        };
108
109        // Register unpark handle
110        #[cfg(feature = "sync")]
111        {
112            let unpark = driver.unpark();
113            super::thread::register_unpark_handle(thread_id, unpark.into());
114            super::thread::register_waker_queue(thread_id, waker_queue);
115        }
116
117        Ok(driver)
118    }
119
120    fn inner_park(&self, mut timeout: Option<Duration>) -> io::Result<()> {
121        let inner = unsafe { &mut *self.inner.get() };
122
123        #[allow(unused_mut)]
124        let mut need_wait = true;
125        #[cfg(feature = "sync")]
126        {
127            // Process foreign wakers
128            while let Some(w) = inner.waker_queue.pop() {
129                w.wake();
130                need_wait = false;
131            }
132
133            // Set status as not awake if we are going to sleep
134            if need_wait {
135                inner
136                    .shared_waker
137                    .awake
138                    .store(false, std::sync::atomic::Ordering::Release);
139            }
140
141            // Process foreign wakers left
142            while let Some(w) = inner.waker_queue.pop() {
143                w.wake();
144                need_wait = false;
145            }
146        }
147
148        if !need_wait {
149            timeout = Some(Duration::ZERO);
150        }
151
152        // here we borrow 2 mut self, but its safe.
153        let events = unsafe { &mut (*self.inner.get()).events };
154        match inner.poll.poll(events, timeout) {
155            Ok(_) => {}
156            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
157            Err(e) => return Err(e),
158        }
159        #[cfg(unix)]
160        let iter = events.iter();
161        #[cfg(windows)]
162        let iter = events.events.iter();
163        for event in iter {
164            let token = event.token();
165
166            #[cfg(feature = "sync")]
167            if token != TOKEN_WAKEUP {
168                inner.dispatch(token, Ready::from_mio(event));
169            }
170
171            #[cfg(not(feature = "sync"))]
172            inner.dispatch(token, Ready::from_mio(event));
173        }
174        Ok(())
175    }
176
177    #[cfg(windows)]
178    pub(crate) fn register(
179        this: &Rc<UnsafeCell<LegacyInner>>,
180        state: &mut crate::driver::iocp::SocketState,
181        interest: mio::Interest,
182    ) -> io::Result<usize> {
183        let inner = unsafe { &mut *this.get() };
184        let io = ScheduledIo::new(state.inner.clone());
185        let token = inner.io_dispatch.insert(io);
186
187        match inner.poll.register(state, mio::Token(token), interest) {
188            Ok(_) => Ok(token),
189            Err(e) => {
190                inner.io_dispatch.remove(token);
191                Err(e)
192            }
193        }
194    }
195
196    #[cfg(windows)]
197    pub(crate) fn deregister(
198        this: &Rc<UnsafeCell<LegacyInner>>,
199        token: usize,
200        state: &mut crate::driver::iocp::SocketState,
201    ) -> io::Result<()> {
202        let inner = unsafe { &mut *this.get() };
203
204        // try to deregister fd first, on success we will remove it from slab.
205        match inner.poll.deregister(state) {
206            Ok(_) => {
207                inner.io_dispatch.remove(token);
208                Ok(())
209            }
210            Err(e) => Err(e),
211        }
212    }
213
214    #[cfg(unix)]
215    pub(crate) fn register(
216        this: &Rc<UnsafeCell<LegacyInner>>,
217        source: &mut impl mio::event::Source,
218        interest: mio::Interest,
219    ) -> io::Result<usize> {
220        let inner = unsafe { &mut *this.get() };
221        let token = inner.io_dispatch.insert(ScheduledIo::new());
222
223        let registry = inner.poll.registry();
224        match registry.register(source, mio::Token(token), interest) {
225            Ok(_) => Ok(token),
226            Err(e) => {
227                inner.io_dispatch.remove(token);
228                Err(e)
229            }
230        }
231    }
232
233    #[cfg(unix)]
234    pub(crate) fn deregister(
235        this: &Rc<UnsafeCell<LegacyInner>>,
236        token: usize,
237        source: &mut impl mio::event::Source,
238    ) -> io::Result<()> {
239        let inner = unsafe { &mut *this.get() };
240
241        // try to deregister fd first, on success we will remove it from slab.
242        match inner.poll.registry().deregister(source) {
243            Ok(_) => {
244                inner.io_dispatch.remove(token);
245                Ok(())
246            }
247            Err(e) => Err(e),
248        }
249    }
250}
251
252impl LegacyInner {
253    fn dispatch(&mut self, token: mio::Token, ready: Ready) {
254        let mut sio = match self.io_dispatch.get(token.0) {
255            Some(io) => io,
256            None => {
257                return;
258            }
259        };
260        let ref_mut = sio.as_mut();
261        ref_mut.set_readiness(|curr| curr | ready);
262        ref_mut.wake(ready);
263    }
264
265    pub(crate) fn poll_op<T: OpAble>(
266        this: &Rc<UnsafeCell<Self>>,
267        data: &mut T,
268        cx: &mut Context<'_>,
269    ) -> Poll<CompletionMeta> {
270        let inner = unsafe { &mut *this.get() };
271        let (direction, index) = match data.legacy_interest() {
272            Some(x) => x,
273            None => {
274                // if there is no index provided, it means the action does not rely on fd
275                // readiness. do syscall right now.
276                return Poll::Ready(CompletionMeta {
277                    result: OpAble::legacy_call(data),
278                    flags: 0,
279                });
280            }
281        };
282
283        // wait io ready and do syscall
284        // Note: scheduled_io may be missing if the fd was deregistered
285        // concurrently with task polling. Treat as cancellation.
286        let Some(mut scheduled_io) = inner.io_dispatch.get(index) else {
287            return Poll::Ready(CompletionMeta {
288                result: Err(io::Error::from_raw_os_error(125)),
289                flags: 0,
290            });
291        };
292        let ref_mut = scheduled_io.as_mut();
293
294        let readiness = ready!(ref_mut.poll_readiness(cx, direction));
295
296        // check if canceled
297        if readiness.is_canceled() {
298            // clear CANCELED part only
299            ref_mut.clear_readiness(readiness & Ready::CANCELED);
300            return Poll::Ready(CompletionMeta {
301                result: Err(io::Error::from_raw_os_error(125)),
302                flags: 0,
303            });
304        }
305
306        match OpAble::legacy_call(data) {
307            Ok(n) => Poll::Ready(CompletionMeta {
308                result: Ok(n),
309                flags: 0,
310            }),
311            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
312                #[cfg(windows)]
313                {
314                    if let Some((sock_state, token, interest)) = {
315                        let socket_state_lock = ref_mut.state.lock().unwrap();
316                        socket_state_lock.inner.clone().map(|inner| {
317                            (inner, socket_state_lock.token, socket_state_lock.interest)
318                        })
319                    } {
320                        if let Err(e) = inner.poll.reregister(sock_state, token, interest) {
321                            return Poll::Ready(CompletionMeta {
322                                result: Err(e),
323                                flags: 0,
324                            });
325                        }
326                    }
327                }
328
329                ref_mut.clear_readiness(direction.mask());
330                ref_mut.set_waker(cx, direction);
331                Poll::Pending
332            }
333            Err(e) => Poll::Ready(CompletionMeta {
334                result: Err(e),
335                flags: 0,
336            }),
337        }
338    }
339
340    pub(crate) fn cancel_op(
341        this: &Rc<UnsafeCell<LegacyInner>>,
342        index: usize,
343        direction: ready::Direction,
344    ) {
345        let inner = unsafe { &mut *this.get() };
346        let ready = match direction {
347            ready::Direction::Read => Ready::READ_CANCELED,
348            ready::Direction::Write => Ready::WRITE_CANCELED,
349        };
350        inner.dispatch(mio::Token(index), ready);
351    }
352
353    pub(crate) fn submit_with_data<T>(
354        this: &Rc<UnsafeCell<LegacyInner>>,
355        data: T,
356    ) -> io::Result<Op<T>>
357    where
358        T: OpAble,
359    {
360        Ok(Op {
361            driver: Inner::Legacy(this.clone()),
362            // useless for legacy
363            index: 0,
364            data: Some(data),
365        })
366    }
367
368    #[cfg(feature = "sync")]
369    pub(crate) fn unpark(this: &Rc<UnsafeCell<LegacyInner>>) -> waker::UnparkHandle {
370        let inner = unsafe { &*this.get() };
371        let weak = std::sync::Arc::downgrade(&inner.shared_waker);
372        waker::UnparkHandle(weak)
373    }
374}
375
376impl Driver for LegacyDriver {
377    fn with<R>(&self, f: impl FnOnce() -> R) -> R {
378        let inner = Inner::Legacy(self.inner.clone());
379        CURRENT.set(&inner, f)
380    }
381
382    fn submit(&self) -> io::Result<()> {
383        // wait with timeout = 0
384        self.park_timeout(Duration::ZERO)
385    }
386
387    fn park(&self) -> io::Result<()> {
388        self.inner_park(None)
389    }
390
391    fn park_timeout(&self, duration: Duration) -> io::Result<()> {
392        self.inner_park(Some(duration))
393    }
394
395    #[cfg(feature = "sync")]
396    type Unpark = waker::UnparkHandle;
397
398    #[cfg(feature = "sync")]
399    fn unpark(&self) -> Self::Unpark {
400        LegacyInner::unpark(&self.inner)
401    }
402}
403
404impl Drop for LegacyDriver {
405    fn drop(&mut self) {
406        // Deregister thread id
407        #[cfg(feature = "sync")]
408        {
409            use crate::driver::thread::{unregister_unpark_handle, unregister_waker_queue};
410            unregister_unpark_handle(self.thread_id);
411            unregister_waker_queue(self.thread_id);
412        }
413    }
414}