1use std::{
4 cell::UnsafeCell,
5 io,
6 rc::Rc,
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use super::{
12 op::{CompletionMeta, Op, OpAble},
13 ready::{self, Ready},
14 scheduled_io::ScheduledIo,
15 Driver, Inner, CURRENT,
16};
17use crate::utils::slab::Slab;
18
19#[cfg(feature = "sync")]
20mod waker;
21#[cfg(feature = "sync")]
22pub(crate) use waker::UnparkHandle;
23
24pub(crate) struct LegacyInner {
25 pub(crate) io_dispatch: Slab<ScheduledIo>,
26 #[cfg(unix)]
27 events: mio::Events,
28 #[cfg(unix)]
29 poll: mio::Poll,
30 #[cfg(windows)]
31 events: crate::driver::iocp::Events,
32 #[cfg(windows)]
33 poll: crate::driver::iocp::Poller,
34
35 #[cfg(feature = "sync")]
36 shared_waker: std::sync::Arc<waker::EventWaker>,
37
38 #[cfg(feature = "sync")]
40 waker_receiver: flume::Receiver<std::task::Waker>,
41}
42
43#[allow(unreachable_pub)]
45pub struct LegacyDriver {
46 inner: Rc<UnsafeCell<LegacyInner>>,
47
48 #[cfg(feature = "sync")]
50 thread_id: usize,
51}
52
53#[cfg(feature = "sync")]
54const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
55
56#[allow(dead_code)]
57impl LegacyDriver {
58 const DEFAULT_ENTRIES: u32 = 1024;
59
60 pub(crate) fn new() -> io::Result<Self> {
61 Self::new_with_entries(Self::DEFAULT_ENTRIES)
62 }
63
64 pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
65 #[cfg(unix)]
66 let poll = mio::Poll::new()?;
67 #[cfg(windows)]
68 let poll = crate::driver::iocp::Poller::new()?;
69
70 #[cfg(all(unix, feature = "sync"))]
71 let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
72 poll.registry(),
73 TOKEN_WAKEUP,
74 )?));
75 #[cfg(all(windows, feature = "sync"))]
76 let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
77 crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
78 ));
79 #[cfg(feature = "sync")]
80 let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
81 #[cfg(feature = "sync")]
82 let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
83
84 let inner = LegacyInner {
85 io_dispatch: Slab::new(),
86 #[cfg(unix)]
87 events: mio::Events::with_capacity(entries as usize),
88 #[cfg(unix)]
89 poll,
90 #[cfg(windows)]
91 events: crate::driver::iocp::Events::with_capacity(entries as usize),
92 #[cfg(windows)]
93 poll,
94 #[cfg(feature = "sync")]
95 shared_waker,
96 #[cfg(feature = "sync")]
97 waker_receiver,
98 };
99 let driver = Self {
100 inner: Rc::new(UnsafeCell::new(inner)),
101 #[cfg(feature = "sync")]
102 thread_id,
103 };
104
105 #[cfg(feature = "sync")]
107 {
108 let unpark = driver.unpark();
109 super::thread::register_unpark_handle(thread_id, unpark.into());
110 super::thread::register_waker_sender(thread_id, waker_sender);
111 }
112
113 Ok(driver)
114 }
115
116 fn inner_park(&self, mut timeout: Option<Duration>) -> io::Result<()> {
117 let inner = unsafe { &mut *self.inner.get() };
118
119 #[allow(unused_mut)]
120 let mut need_wait = true;
121 #[cfg(feature = "sync")]
122 {
123 while let Ok(w) = inner.waker_receiver.try_recv() {
125 w.wake();
126 need_wait = false;
127 }
128
129 if need_wait {
131 inner
132 .shared_waker
133 .awake
134 .store(false, std::sync::atomic::Ordering::Release);
135 }
136
137 while let Ok(w) = inner.waker_receiver.try_recv() {
139 w.wake();
140 need_wait = false;
141 }
142 }
143
144 if !need_wait {
145 timeout = Some(Duration::ZERO);
146 }
147
148 let events = unsafe { &mut (*self.inner.get()).events };
150 match inner.poll.poll(events, timeout) {
151 Ok(_) => {}
152 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
153 Err(e) => return Err(e),
154 }
155 #[cfg(unix)]
156 let iter = events.iter();
157 #[cfg(windows)]
158 let iter = events.events.iter();
159 for event in iter {
160 let token = event.token();
161
162 #[cfg(feature = "sync")]
163 if token != TOKEN_WAKEUP {
164 inner.dispatch(token, Ready::from_mio(event));
165 }
166
167 #[cfg(not(feature = "sync"))]
168 inner.dispatch(token, Ready::from_mio(event));
169 }
170 Ok(())
171 }
172
173 #[cfg(windows)]
174 pub(crate) fn register(
175 this: &Rc<UnsafeCell<LegacyInner>>,
176 state: &mut crate::driver::iocp::SocketState,
177 interest: mio::Interest,
178 ) -> io::Result<usize> {
179 let inner = unsafe { &mut *this.get() };
180 let io = ScheduledIo::new(state.inner.clone());
181 let token = inner.io_dispatch.insert(io);
182
183 match inner.poll.register(state, mio::Token(token), interest) {
184 Ok(_) => Ok(token),
185 Err(e) => {
186 inner.io_dispatch.remove(token);
187 Err(e)
188 }
189 }
190 }
191
192 #[cfg(windows)]
193 pub(crate) fn deregister(
194 this: &Rc<UnsafeCell<LegacyInner>>,
195 token: usize,
196 state: &mut crate::driver::iocp::SocketState,
197 ) -> io::Result<()> {
198 let inner = unsafe { &mut *this.get() };
199
200 match inner.poll.deregister(state) {
202 Ok(_) => {
203 inner.io_dispatch.remove(token);
204 Ok(())
205 }
206 Err(e) => Err(e),
207 }
208 }
209
210 #[cfg(unix)]
211 pub(crate) fn register(
212 this: &Rc<UnsafeCell<LegacyInner>>,
213 source: &mut impl mio::event::Source,
214 interest: mio::Interest,
215 ) -> io::Result<usize> {
216 let inner = unsafe { &mut *this.get() };
217 let token = inner.io_dispatch.insert(ScheduledIo::new());
218
219 let registry = inner.poll.registry();
220 match registry.register(source, mio::Token(token), interest) {
221 Ok(_) => Ok(token),
222 Err(e) => {
223 inner.io_dispatch.remove(token);
224 Err(e)
225 }
226 }
227 }
228
229 #[cfg(unix)]
230 pub(crate) fn deregister(
231 this: &Rc<UnsafeCell<LegacyInner>>,
232 token: usize,
233 source: &mut impl mio::event::Source,
234 ) -> io::Result<()> {
235 let inner = unsafe { &mut *this.get() };
236
237 match inner.poll.registry().deregister(source) {
239 Ok(_) => {
240 inner.io_dispatch.remove(token);
241 Ok(())
242 }
243 Err(e) => Err(e),
244 }
245 }
246}
247
248impl LegacyInner {
249 fn dispatch(&mut self, token: mio::Token, ready: Ready) {
250 let mut sio = match self.io_dispatch.get(token.0) {
251 Some(io) => io,
252 None => {
253 return;
254 }
255 };
256 let ref_mut = sio.as_mut();
257 ref_mut.set_readiness(|curr| curr | ready);
258 ref_mut.wake(ready);
259 }
260
261 pub(crate) fn poll_op<T: OpAble>(
262 this: &Rc<UnsafeCell<Self>>,
263 data: &mut T,
264 cx: &mut Context<'_>,
265 ) -> Poll<CompletionMeta> {
266 let inner = unsafe { &mut *this.get() };
267 let (direction, index) = match data.legacy_interest() {
268 Some(x) => x,
269 None => {
270 return Poll::Ready(CompletionMeta {
273 result: OpAble::legacy_call(data),
274 flags: 0,
275 });
276 }
277 };
278
279 let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost");
281 let ref_mut = scheduled_io.as_mut();
282
283 let readiness = ready!(ref_mut.poll_readiness(cx, direction));
284
285 if readiness.is_canceled() {
287 ref_mut.clear_readiness(readiness & Ready::CANCELED);
289 return Poll::Ready(CompletionMeta {
290 result: Err(io::Error::from_raw_os_error(125)),
291 flags: 0,
292 });
293 }
294
295 match OpAble::legacy_call(data) {
296 Ok(n) => Poll::Ready(CompletionMeta {
297 result: Ok(n),
298 flags: 0,
299 }),
300 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
301 #[cfg(windows)]
302 {
303 if let Some((sock_state, token, interest)) = {
304 let socket_state_lock = ref_mut.state.lock().unwrap();
305 socket_state_lock.inner.clone().map(|inner| {
306 (inner, socket_state_lock.token, socket_state_lock.interest)
307 })
308 } {
309 if let Err(e) = inner.poll.reregister(sock_state, token, interest) {
310 return Poll::Ready(CompletionMeta {
311 result: Err(e),
312 flags: 0,
313 });
314 }
315 }
316 }
317
318 ref_mut.clear_readiness(direction.mask());
319 ref_mut.set_waker(cx, direction);
320 Poll::Pending
321 }
322 Err(e) => Poll::Ready(CompletionMeta {
323 result: Err(e),
324 flags: 0,
325 }),
326 }
327 }
328
329 pub(crate) fn cancel_op(
330 this: &Rc<UnsafeCell<LegacyInner>>,
331 index: usize,
332 direction: ready::Direction,
333 ) {
334 let inner = unsafe { &mut *this.get() };
335 let ready = match direction {
336 ready::Direction::Read => Ready::READ_CANCELED,
337 ready::Direction::Write => Ready::WRITE_CANCELED,
338 };
339 inner.dispatch(mio::Token(index), ready);
340 }
341
342 pub(crate) fn submit_with_data<T>(
343 this: &Rc<UnsafeCell<LegacyInner>>,
344 data: T,
345 ) -> io::Result<Op<T>>
346 where
347 T: OpAble,
348 {
349 Ok(Op {
350 driver: Inner::Legacy(this.clone()),
351 index: 0,
353 data: Some(data),
354 })
355 }
356
357 #[cfg(feature = "sync")]
358 pub(crate) fn unpark(this: &Rc<UnsafeCell<LegacyInner>>) -> waker::UnparkHandle {
359 let inner = unsafe { &*this.get() };
360 let weak = std::sync::Arc::downgrade(&inner.shared_waker);
361 waker::UnparkHandle(weak)
362 }
363}
364
365impl Driver for LegacyDriver {
366 fn with<R>(&self, f: impl FnOnce() -> R) -> R {
367 let inner = Inner::Legacy(self.inner.clone());
368 CURRENT.set(&inner, f)
369 }
370
371 fn submit(&self) -> io::Result<()> {
372 self.park_timeout(Duration::ZERO)
374 }
375
376 fn park(&self) -> io::Result<()> {
377 self.inner_park(None)
378 }
379
380 fn park_timeout(&self, duration: Duration) -> io::Result<()> {
381 self.inner_park(Some(duration))
382 }
383
384 #[cfg(feature = "sync")]
385 type Unpark = waker::UnparkHandle;
386
387 #[cfg(feature = "sync")]
388 fn unpark(&self) -> Self::Unpark {
389 LegacyInner::unpark(&self.inner)
390 }
391}
392
393impl Drop for LegacyDriver {
394 fn drop(&mut self) {
395 #[cfg(feature = "sync")]
397 {
398 use crate::driver::thread::{unregister_unpark_handle, unregister_waker_sender};
399 unregister_unpark_handle(self.thread_id);
400 unregister_waker_sender(self.thread_id);
401 }
402 }
403}