1#[cfg(feature = "sync")]
4use std::sync::Arc;
5use std::{
6 cell::UnsafeCell,
7 io,
8 rc::Rc,
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use super::{
14 op::{CompletionMeta, Op, OpAble},
15 ready::{self, Ready},
16 scheduled_io::ScheduledIo,
17 Driver, Inner, CURRENT,
18};
19use crate::utils::slab::Slab;
20
21#[cfg(feature = "sync")]
22mod waker;
23#[cfg(feature = "sync")]
24use crossbeam_queue::SegQueue;
25#[cfg(feature = "sync")]
26pub(crate) use waker::UnparkHandle;
27
28pub(crate) struct LegacyInner {
29 pub(crate) io_dispatch: Slab<ScheduledIo>,
30 #[cfg(unix)]
31 events: mio::Events,
32 #[cfg(unix)]
33 poll: mio::Poll,
34 #[cfg(windows)]
35 events: crate::driver::iocp::Events,
36 #[cfg(windows)]
37 poll: crate::driver::iocp::Poller,
38
39 #[cfg(feature = "sync")]
40 shared_waker: std::sync::Arc<waker::EventWaker>,
41
42 #[cfg(feature = "sync")]
44 waker_queue: Arc<SegQueue<std::task::Waker>>,
45}
46
47#[allow(unreachable_pub)]
49pub struct LegacyDriver {
50 inner: Rc<UnsafeCell<LegacyInner>>,
51
52 #[cfg(feature = "sync")]
54 thread_id: usize,
55}
56
57#[cfg(feature = "sync")]
58const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
59
60#[allow(dead_code)]
61impl LegacyDriver {
62 const DEFAULT_ENTRIES: u32 = 1024;
63
64 pub(crate) fn new() -> io::Result<Self> {
65 Self::new_with_entries(Self::DEFAULT_ENTRIES)
66 }
67
68 pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
69 #[cfg(unix)]
70 let poll = mio::Poll::new()?;
71 #[cfg(windows)]
72 let poll = crate::driver::iocp::Poller::new()?;
73
74 #[cfg(all(unix, feature = "sync"))]
75 let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
76 poll.registry(),
77 TOKEN_WAKEUP,
78 )?));
79 #[cfg(all(windows, feature = "sync"))]
80 let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
81 crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
82 ));
83 #[cfg(feature = "sync")]
84 let waker_queue = Arc::new(SegQueue::new());
85 #[cfg(feature = "sync")]
86 let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);
87
88 let inner = LegacyInner {
89 io_dispatch: Slab::new(),
90 #[cfg(unix)]
91 events: mio::Events::with_capacity(entries as usize),
92 #[cfg(unix)]
93 poll,
94 #[cfg(windows)]
95 events: crate::driver::iocp::Events::with_capacity(entries as usize),
96 #[cfg(windows)]
97 poll,
98 #[cfg(feature = "sync")]
99 shared_waker,
100 #[cfg(feature = "sync")]
101 waker_queue: waker_queue.clone(),
102 };
103 let driver = Self {
104 inner: Rc::new(UnsafeCell::new(inner)),
105 #[cfg(feature = "sync")]
106 thread_id,
107 };
108
109 #[cfg(feature = "sync")]
111 {
112 let unpark = driver.unpark();
113 super::thread::register_unpark_handle(thread_id, unpark.into());
114 super::thread::register_waker_queue(thread_id, waker_queue);
115 }
116
117 Ok(driver)
118 }
119
120 fn inner_park(&self, mut timeout: Option<Duration>) -> io::Result<()> {
121 let inner = unsafe { &mut *self.inner.get() };
122
123 #[allow(unused_mut)]
124 let mut need_wait = true;
125 #[cfg(feature = "sync")]
126 {
127 while let Some(w) = inner.waker_queue.pop() {
129 w.wake();
130 need_wait = false;
131 }
132
133 if need_wait {
135 inner
136 .shared_waker
137 .awake
138 .store(false, std::sync::atomic::Ordering::Release);
139 }
140
141 while let Some(w) = inner.waker_queue.pop() {
143 w.wake();
144 need_wait = false;
145 }
146 }
147
148 if !need_wait {
149 timeout = Some(Duration::ZERO);
150 }
151
152 let events = unsafe { &mut (*self.inner.get()).events };
154 match inner.poll.poll(events, timeout) {
155 Ok(_) => {}
156 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
157 Err(e) => return Err(e),
158 }
159 #[cfg(unix)]
160 let iter = events.iter();
161 #[cfg(windows)]
162 let iter = events.events.iter();
163 for event in iter {
164 let token = event.token();
165
166 #[cfg(feature = "sync")]
167 if token != TOKEN_WAKEUP {
168 inner.dispatch(token, Ready::from_mio(event));
169 }
170
171 #[cfg(not(feature = "sync"))]
172 inner.dispatch(token, Ready::from_mio(event));
173 }
174 Ok(())
175 }
176
177 #[cfg(windows)]
178 pub(crate) fn register(
179 this: &Rc<UnsafeCell<LegacyInner>>,
180 state: &mut crate::driver::iocp::SocketState,
181 interest: mio::Interest,
182 ) -> io::Result<usize> {
183 let inner = unsafe { &mut *this.get() };
184 let io = ScheduledIo::new(state.inner.clone());
185 let token = inner.io_dispatch.insert(io);
186
187 match inner.poll.register(state, mio::Token(token), interest) {
188 Ok(_) => Ok(token),
189 Err(e) => {
190 inner.io_dispatch.remove(token);
191 Err(e)
192 }
193 }
194 }
195
196 #[cfg(windows)]
197 pub(crate) fn deregister(
198 this: &Rc<UnsafeCell<LegacyInner>>,
199 token: usize,
200 state: &mut crate::driver::iocp::SocketState,
201 ) -> io::Result<()> {
202 let inner = unsafe { &mut *this.get() };
203
204 match inner.poll.deregister(state) {
206 Ok(_) => {
207 inner.io_dispatch.remove(token);
208 Ok(())
209 }
210 Err(e) => Err(e),
211 }
212 }
213
214 #[cfg(unix)]
215 pub(crate) fn register(
216 this: &Rc<UnsafeCell<LegacyInner>>,
217 source: &mut impl mio::event::Source,
218 interest: mio::Interest,
219 ) -> io::Result<usize> {
220 let inner = unsafe { &mut *this.get() };
221 let token = inner.io_dispatch.insert(ScheduledIo::new());
222
223 let registry = inner.poll.registry();
224 match registry.register(source, mio::Token(token), interest) {
225 Ok(_) => Ok(token),
226 Err(e) => {
227 inner.io_dispatch.remove(token);
228 Err(e)
229 }
230 }
231 }
232
233 #[cfg(unix)]
234 pub(crate) fn deregister(
235 this: &Rc<UnsafeCell<LegacyInner>>,
236 token: usize,
237 source: &mut impl mio::event::Source,
238 ) -> io::Result<()> {
239 let inner = unsafe { &mut *this.get() };
240
241 match inner.poll.registry().deregister(source) {
243 Ok(_) => {
244 inner.io_dispatch.remove(token);
245 Ok(())
246 }
247 Err(e) => Err(e),
248 }
249 }
250}
251
252impl LegacyInner {
253 fn dispatch(&mut self, token: mio::Token, ready: Ready) {
254 let mut sio = match self.io_dispatch.get(token.0) {
255 Some(io) => io,
256 None => {
257 return;
258 }
259 };
260 let ref_mut = sio.as_mut();
261 ref_mut.set_readiness(|curr| curr | ready);
262 ref_mut.wake(ready);
263 }
264
265 pub(crate) fn poll_op<T: OpAble>(
266 this: &Rc<UnsafeCell<Self>>,
267 data: &mut T,
268 cx: &mut Context<'_>,
269 ) -> Poll<CompletionMeta> {
270 let inner = unsafe { &mut *this.get() };
271 let (direction, index) = match data.legacy_interest() {
272 Some(x) => x,
273 None => {
274 return Poll::Ready(CompletionMeta {
277 result: OpAble::legacy_call(data),
278 flags: 0,
279 });
280 }
281 };
282
283 let Some(mut scheduled_io) = inner.io_dispatch.get(index) else {
287 return Poll::Ready(CompletionMeta {
288 result: Err(io::Error::from_raw_os_error(125)),
289 flags: 0,
290 });
291 };
292 let ref_mut = scheduled_io.as_mut();
293
294 let readiness = ready!(ref_mut.poll_readiness(cx, direction));
295
296 if readiness.is_canceled() {
298 ref_mut.clear_readiness(readiness & Ready::CANCELED);
300 return Poll::Ready(CompletionMeta {
301 result: Err(io::Error::from_raw_os_error(125)),
302 flags: 0,
303 });
304 }
305
306 match OpAble::legacy_call(data) {
307 Ok(n) => Poll::Ready(CompletionMeta {
308 result: Ok(n),
309 flags: 0,
310 }),
311 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
312 #[cfg(windows)]
313 {
314 if let Some((sock_state, token, interest)) = {
315 let socket_state_lock = ref_mut.state.lock().unwrap();
316 socket_state_lock.inner.clone().map(|inner| {
317 (inner, socket_state_lock.token, socket_state_lock.interest)
318 })
319 } {
320 if let Err(e) = inner.poll.reregister(sock_state, token, interest) {
321 return Poll::Ready(CompletionMeta {
322 result: Err(e),
323 flags: 0,
324 });
325 }
326 }
327 }
328
329 ref_mut.clear_readiness(direction.mask());
330 ref_mut.set_waker(cx, direction);
331 Poll::Pending
332 }
333 Err(e) => Poll::Ready(CompletionMeta {
334 result: Err(e),
335 flags: 0,
336 }),
337 }
338 }
339
340 pub(crate) fn cancel_op(
341 this: &Rc<UnsafeCell<LegacyInner>>,
342 index: usize,
343 direction: ready::Direction,
344 ) {
345 let inner = unsafe { &mut *this.get() };
346 let ready = match direction {
347 ready::Direction::Read => Ready::READ_CANCELED,
348 ready::Direction::Write => Ready::WRITE_CANCELED,
349 };
350 inner.dispatch(mio::Token(index), ready);
351 }
352
353 pub(crate) fn submit_with_data<T>(
354 this: &Rc<UnsafeCell<LegacyInner>>,
355 data: T,
356 ) -> io::Result<Op<T>>
357 where
358 T: OpAble,
359 {
360 Ok(Op {
361 driver: Inner::Legacy(this.clone()),
362 index: 0,
364 data: Some(data),
365 })
366 }
367
368 #[cfg(feature = "sync")]
369 pub(crate) fn unpark(this: &Rc<UnsafeCell<LegacyInner>>) -> waker::UnparkHandle {
370 let inner = unsafe { &*this.get() };
371 let weak = std::sync::Arc::downgrade(&inner.shared_waker);
372 waker::UnparkHandle(weak)
373 }
374}
375
376impl Driver for LegacyDriver {
377 fn with<R>(&self, f: impl FnOnce() -> R) -> R {
378 let inner = Inner::Legacy(self.inner.clone());
379 CURRENT.set(&inner, f)
380 }
381
382 fn submit(&self) -> io::Result<()> {
383 self.park_timeout(Duration::ZERO)
385 }
386
387 fn park(&self) -> io::Result<()> {
388 self.inner_park(None)
389 }
390
391 fn park_timeout(&self, duration: Duration) -> io::Result<()> {
392 self.inner_park(Some(duration))
393 }
394
395 #[cfg(feature = "sync")]
396 type Unpark = waker::UnparkHandle;
397
398 #[cfg(feature = "sync")]
399 fn unpark(&self) -> Self::Unpark {
400 LegacyInner::unpark(&self.inner)
401 }
402}
403
404impl Drop for LegacyDriver {
405 fn drop(&mut self) {
406 #[cfg(feature = "sync")]
408 {
409 use crate::driver::thread::{unregister_unpark_handle, unregister_waker_queue};
410 unregister_unpark_handle(self.thread_id);
411 unregister_waker_queue(self.thread_id);
412 }
413 }
414}