monoio/driver/legacy/
mod.rs

1//! Monoio Legacy Driver.
2
3use std::{
4    cell::UnsafeCell,
5    io,
6    rc::Rc,
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use super::{
12    op::{CompletionMeta, Op, OpAble},
13    ready::{self, Ready},
14    scheduled_io::ScheduledIo,
15    Driver, Inner, CURRENT,
16};
17use crate::utils::slab::Slab;
18
19#[cfg(feature = "sync")]
20mod waker;
21#[cfg(feature = "sync")]
22pub(crate) use waker::UnparkHandle;
23
24pub(crate) struct LegacyInner {
25    pub(crate) io_dispatch: Slab<ScheduledIo>,
26    #[cfg(unix)]
27    events: mio::Events,
28    #[cfg(unix)]
29    poll: mio::Poll,
30    #[cfg(windows)]
31    events: crate::driver::iocp::Events,
32    #[cfg(windows)]
33    poll: crate::driver::iocp::Poller,
34
35    #[cfg(feature = "sync")]
36    shared_waker: std::sync::Arc<waker::EventWaker>,
37
38    // Waker receiver
39    #[cfg(feature = "sync")]
40    waker_receiver: flume::Receiver<std::task::Waker>,
41}
42
43/// Driver with Poll-like syscall.
44#[allow(unreachable_pub)]
45pub struct LegacyDriver {
46    inner: Rc<UnsafeCell<LegacyInner>>,
47
48    // Used for drop
49    #[cfg(feature = "sync")]
50    thread_id: usize,
51}
52
53#[cfg(feature = "sync")]
54const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
55
56#[allow(dead_code)]
57impl LegacyDriver {
58    const DEFAULT_ENTRIES: u32 = 1024;
59
60    pub(crate) fn new() -> io::Result<Self> {
61        Self::new_with_entries(Self::DEFAULT_ENTRIES)
62    }
63
64    pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
65        #[cfg(unix)]
66        let poll = mio::Poll::new()?;
67        #[cfg(windows)]
68        let poll = crate::driver::iocp::Poller::new()?;
69
70        #[cfg(all(unix, feature = "sync"))]
71        let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
72            poll.registry(),
73            TOKEN_WAKEUP,
74        )?));
75        #[cfg(all(windows, feature = "sync"))]
76        let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
77            crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
78        ));
79        #[cfg(feature = "sync")]
80        let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
81        #[cfg(feature = "sync")]
82        let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
83
84        let inner = LegacyInner {
85            io_dispatch: Slab::new(),
86            #[cfg(unix)]
87            events: mio::Events::with_capacity(entries as usize),
88            #[cfg(unix)]
89            poll,
90            #[cfg(windows)]
91            events: crate::driver::iocp::Events::with_capacity(entries as usize),
92            #[cfg(windows)]
93            poll,
94            #[cfg(feature = "sync")]
95            shared_waker,
96            #[cfg(feature = "sync")]
97            waker_receiver,
98        };
99        let driver = Self {
100            inner: Rc::new(UnsafeCell::new(inner)),
101            #[cfg(feature = "sync")]
102            thread_id,
103        };
104
105        // Register unpark handle
106        #[cfg(feature = "sync")]
107        {
108            let unpark = driver.unpark();
109            super::thread::register_unpark_handle(thread_id, unpark.into());
110            super::thread::register_waker_sender(thread_id, waker_sender);
111        }
112
113        Ok(driver)
114    }
115
116    fn inner_park(&self, mut timeout: Option<Duration>) -> io::Result<()> {
117        let inner = unsafe { &mut *self.inner.get() };
118
119        #[allow(unused_mut)]
120        let mut need_wait = true;
121        #[cfg(feature = "sync")]
122        {
123            // Process foreign wakers
124            while let Ok(w) = inner.waker_receiver.try_recv() {
125                w.wake();
126                need_wait = false;
127            }
128
129            // Set status as not awake if we are going to sleep
130            if need_wait {
131                inner
132                    .shared_waker
133                    .awake
134                    .store(false, std::sync::atomic::Ordering::Release);
135            }
136
137            // Process foreign wakers left
138            while let Ok(w) = inner.waker_receiver.try_recv() {
139                w.wake();
140                need_wait = false;
141            }
142        }
143
144        if !need_wait {
145            timeout = Some(Duration::ZERO);
146        }
147
148        // here we borrow 2 mut self, but its safe.
149        let events = unsafe { &mut (*self.inner.get()).events };
150        match inner.poll.poll(events, timeout) {
151            Ok(_) => {}
152            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
153            Err(e) => return Err(e),
154        }
155        #[cfg(unix)]
156        let iter = events.iter();
157        #[cfg(windows)]
158        let iter = events.events.iter();
159        for event in iter {
160            let token = event.token();
161
162            #[cfg(feature = "sync")]
163            if token != TOKEN_WAKEUP {
164                inner.dispatch(token, Ready::from_mio(event));
165            }
166
167            #[cfg(not(feature = "sync"))]
168            inner.dispatch(token, Ready::from_mio(event));
169        }
170        Ok(())
171    }
172
173    #[cfg(windows)]
174    pub(crate) fn register(
175        this: &Rc<UnsafeCell<LegacyInner>>,
176        state: &mut crate::driver::iocp::SocketState,
177        interest: mio::Interest,
178    ) -> io::Result<usize> {
179        let inner = unsafe { &mut *this.get() };
180        let io = ScheduledIo::new(state.inner.clone());
181        let token = inner.io_dispatch.insert(io);
182
183        match inner.poll.register(state, mio::Token(token), interest) {
184            Ok(_) => Ok(token),
185            Err(e) => {
186                inner.io_dispatch.remove(token);
187                Err(e)
188            }
189        }
190    }
191
192    #[cfg(windows)]
193    pub(crate) fn deregister(
194        this: &Rc<UnsafeCell<LegacyInner>>,
195        token: usize,
196        state: &mut crate::driver::iocp::SocketState,
197    ) -> io::Result<()> {
198        let inner = unsafe { &mut *this.get() };
199
200        // try to deregister fd first, on success we will remove it from slab.
201        match inner.poll.deregister(state) {
202            Ok(_) => {
203                inner.io_dispatch.remove(token);
204                Ok(())
205            }
206            Err(e) => Err(e),
207        }
208    }
209
210    #[cfg(unix)]
211    pub(crate) fn register(
212        this: &Rc<UnsafeCell<LegacyInner>>,
213        source: &mut impl mio::event::Source,
214        interest: mio::Interest,
215    ) -> io::Result<usize> {
216        let inner = unsafe { &mut *this.get() };
217        let token = inner.io_dispatch.insert(ScheduledIo::new());
218
219        let registry = inner.poll.registry();
220        match registry.register(source, mio::Token(token), interest) {
221            Ok(_) => Ok(token),
222            Err(e) => {
223                inner.io_dispatch.remove(token);
224                Err(e)
225            }
226        }
227    }
228
229    #[cfg(unix)]
230    pub(crate) fn deregister(
231        this: &Rc<UnsafeCell<LegacyInner>>,
232        token: usize,
233        source: &mut impl mio::event::Source,
234    ) -> io::Result<()> {
235        let inner = unsafe { &mut *this.get() };
236
237        // try to deregister fd first, on success we will remove it from slab.
238        match inner.poll.registry().deregister(source) {
239            Ok(_) => {
240                inner.io_dispatch.remove(token);
241                Ok(())
242            }
243            Err(e) => Err(e),
244        }
245    }
246}
247
248impl LegacyInner {
249    fn dispatch(&mut self, token: mio::Token, ready: Ready) {
250        let mut sio = match self.io_dispatch.get(token.0) {
251            Some(io) => io,
252            None => {
253                return;
254            }
255        };
256        let ref_mut = sio.as_mut();
257        ref_mut.set_readiness(|curr| curr | ready);
258        ref_mut.wake(ready);
259    }
260
261    pub(crate) fn poll_op<T: OpAble>(
262        this: &Rc<UnsafeCell<Self>>,
263        data: &mut T,
264        cx: &mut Context<'_>,
265    ) -> Poll<CompletionMeta> {
266        let inner = unsafe { &mut *this.get() };
267        let (direction, index) = match data.legacy_interest() {
268            Some(x) => x,
269            None => {
270                // if there is no index provided, it means the action does not rely on fd
271                // readiness. do syscall right now.
272                return Poll::Ready(CompletionMeta {
273                    result: OpAble::legacy_call(data),
274                    flags: 0,
275                });
276            }
277        };
278
279        // wait io ready and do syscall
280        let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost");
281        let ref_mut = scheduled_io.as_mut();
282
283        let readiness = ready!(ref_mut.poll_readiness(cx, direction));
284
285        // check if canceled
286        if readiness.is_canceled() {
287            // clear CANCELED part only
288            ref_mut.clear_readiness(readiness & Ready::CANCELED);
289            return Poll::Ready(CompletionMeta {
290                result: Err(io::Error::from_raw_os_error(125)),
291                flags: 0,
292            });
293        }
294
295        match OpAble::legacy_call(data) {
296            Ok(n) => Poll::Ready(CompletionMeta {
297                result: Ok(n),
298                flags: 0,
299            }),
300            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
301                #[cfg(windows)]
302                {
303                    if let Some((sock_state, token, interest)) = {
304                        let socket_state_lock = ref_mut.state.lock().unwrap();
305                        socket_state_lock.inner.clone().map(|inner| {
306                            (inner, socket_state_lock.token, socket_state_lock.interest)
307                        })
308                    } {
309                        if let Err(e) = inner.poll.reregister(sock_state, token, interest) {
310                            return Poll::Ready(CompletionMeta {
311                                result: Err(e),
312                                flags: 0,
313                            });
314                        }
315                    }
316                }
317
318                ref_mut.clear_readiness(direction.mask());
319                ref_mut.set_waker(cx, direction);
320                Poll::Pending
321            }
322            Err(e) => Poll::Ready(CompletionMeta {
323                result: Err(e),
324                flags: 0,
325            }),
326        }
327    }
328
329    pub(crate) fn cancel_op(
330        this: &Rc<UnsafeCell<LegacyInner>>,
331        index: usize,
332        direction: ready::Direction,
333    ) {
334        let inner = unsafe { &mut *this.get() };
335        let ready = match direction {
336            ready::Direction::Read => Ready::READ_CANCELED,
337            ready::Direction::Write => Ready::WRITE_CANCELED,
338        };
339        inner.dispatch(mio::Token(index), ready);
340    }
341
342    pub(crate) fn submit_with_data<T>(
343        this: &Rc<UnsafeCell<LegacyInner>>,
344        data: T,
345    ) -> io::Result<Op<T>>
346    where
347        T: OpAble,
348    {
349        Ok(Op {
350            driver: Inner::Legacy(this.clone()),
351            // useless for legacy
352            index: 0,
353            data: Some(data),
354        })
355    }
356
357    #[cfg(feature = "sync")]
358    pub(crate) fn unpark(this: &Rc<UnsafeCell<LegacyInner>>) -> waker::UnparkHandle {
359        let inner = unsafe { &*this.get() };
360        let weak = std::sync::Arc::downgrade(&inner.shared_waker);
361        waker::UnparkHandle(weak)
362    }
363}
364
365impl Driver for LegacyDriver {
366    fn with<R>(&self, f: impl FnOnce() -> R) -> R {
367        let inner = Inner::Legacy(self.inner.clone());
368        CURRENT.set(&inner, f)
369    }
370
371    fn submit(&self) -> io::Result<()> {
372        // wait with timeout = 0
373        self.park_timeout(Duration::ZERO)
374    }
375
376    fn park(&self) -> io::Result<()> {
377        self.inner_park(None)
378    }
379
380    fn park_timeout(&self, duration: Duration) -> io::Result<()> {
381        self.inner_park(Some(duration))
382    }
383
384    #[cfg(feature = "sync")]
385    type Unpark = waker::UnparkHandle;
386
387    #[cfg(feature = "sync")]
388    fn unpark(&self) -> Self::Unpark {
389        LegacyInner::unpark(&self.inner)
390    }
391}
392
393impl Drop for LegacyDriver {
394    fn drop(&mut self) {
395        // Deregister thread id
396        #[cfg(feature = "sync")]
397        {
398            use crate::driver::thread::{unregister_unpark_handle, unregister_waker_sender};
399            unregister_unpark_handle(self.thread_id);
400            unregister_waker_sender(self.thread_id);
401        }
402    }
403}