monoio/time/driver/mod.rs
1// Currently, rust warns when an unsafe fn contains an unsafe {} block. However,
2// in the future, this will change to the reverse. For now, suppress this
3// warning and generally stick with being explicit about unsafety.
4#![allow(unused_unsafe)]
5
6//! Time driver
7
8mod entry;
9use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
10
11mod handle;
12pub(crate) use self::handle::Handle;
13
14mod wheel;
15
16pub(super) mod sleep;
17
18use std::{cell::RefCell, fmt, io, num::NonZeroU64, ptr::NonNull, rc::Rc};
19
20use crate::{
21 driver::Driver,
22 time::{error::Error, Clock, Duration, Instant},
23};
24
25/// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval],
26/// and [`Timeout`][timeout].
27///
28/// A `Driver` instance tracks the state necessary for managing time and
29/// notifying the [`Sleep`][sleep] instances once their deadlines are reached.
30///
31/// It is expected that a single instance manages many individual
32/// [`Sleep`][sleep] instances. The `Driver` implementation is thread-safe and,
33/// as such, is able to handle callers from across threads.
34///
35/// After creating the `Driver` instance, the caller must repeatedly call `park`
36/// or `park_timeout`. The time driver will perform no work unless `park` or
37/// `park_timeout` is called repeatedly.
38///
39/// The driver has a resolution of one millisecond. Any unit of time that falls
40/// between milliseconds are rounded up to the next millisecond.
41///
42/// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that
43/// has not elapsed will be notified with an error. At this point, calling
44/// `poll` on the [`Sleep`][sleep] instance will result in panic.
45///
46/// # Implementation
47///
48/// The time driver is based on the [paper by Varghese and Lauck][paper].
49///
50/// A hashed timing wheel is a vector of slots, where each slot handles a time
51/// slice. As time progresses, the timer walks over the slot for the current
52/// instant, and processes each entry for that slot. When the timer reaches the
53/// end of the wheel, it starts again at the beginning.
54///
55/// The implementation maintains six wheels arranged in a set of levels. As the
56/// levels go up, the slots of the associated wheel represent larger intervals
57/// of time. At each level, the wheel has 64 slots. Each slot covers a range of
58/// time equal to the wheel at the lower level. At level zero, each slot
59/// represents one millisecond of time.
60///
61/// The wheels are:
62///
63/// * Level 0: 64 x 1 millisecond slots.
64/// * Level 1: 64 x 64 millisecond slots.
65/// * Level 2: 64 x ~4 second slots.
66/// * Level 3: 64 x ~4 minute slots.
67/// * Level 4: 64 x ~4 hour slots.
68/// * Level 5: 64 x ~12 day slots.
69///
70/// When the timer processes entries at level zero, it will notify all the
71/// `Sleep` instances as their deadlines have been reached. For all higher
72/// levels, all entries will be redistributed across the wheel at the next level
73/// down. Eventually, as time progresses, entries with [`Sleep`][sleep]
74/// instances will either be canceled (dropped) or their associated entries will
75/// reach level zero and be notified.
76///
77/// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf
78/// [sleep]: crate::time::Sleep
79/// [timeout]: crate::time::Timeout
80/// [interval]: crate::time::Interval
81#[derive(Debug)]
82pub struct TimeDriver<D: 'static> {
83 /// Timing backend in use
84 time_source: ClockTime,
85
86 /// Shared state
87 pub(crate) handle: Handle,
88
89 /// Parker to delegate to
90 park: D,
91}
92
93/// A structure which handles conversion from Instants to u64 timestamps.
94#[derive(Debug, Clone)]
95struct ClockTime {
96 clock: super::clock::Clock,
97 start_time: Instant,
98}
99
100impl ClockTime {
101 pub(self) fn new(clock: Clock) -> Self {
102 Self {
103 start_time: clock.now(),
104 clock,
105 }
106 }
107
108 pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
109 // Round up to the end of a ms
110 self.instant_to_tick(t + Duration::from_nanos(999_999))
111 }
112
113 pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
114 // round up
115 let dur: Duration = t
116 .checked_duration_since(self.start_time)
117 .unwrap_or_else(|| Duration::from_secs(0));
118 let ms = dur.as_millis();
119
120 ms.try_into().expect("Duration too far into the future")
121 }
122
123 pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
124 Duration::from_millis(t)
125 }
126
127 pub(self) fn now(&self) -> u64 {
128 self.instant_to_tick(self.clock.now())
129 }
130}
131
132/// Timer state shared between `Driver`, `Handle`, and `Registration`.
133struct Inner {
134 // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
135 pub(super) state: RefCell<InnerState>,
136}
137
138/// Time state shared which must be protected by a `Mutex`
139struct InnerState {
140 /// Timing backend in use
141 time_source: ClockTime,
142
143 /// The last published timer `elapsed` value.
144 elapsed: u64,
145
146 /// The earliest time at which we promise to wake up without unparking
147 next_wake: Option<NonZeroU64>,
148
149 /// Timer wheel
150 wheel: wheel::Wheel,
151}
152
153// ===== impl Driver =====
154
155impl<D> TimeDriver<D>
156where
157 D: Driver + 'static,
158{
159 /// Creates a new `Driver` instance that uses `park` to block the current
160 /// thread and `time_source` to get the current time and convert to ticks.
161 ///
162 /// Specifying the source of time is useful when testing.
163 pub(crate) fn new(park: D, clock: Clock) -> TimeDriver<D> {
164 let time_source = ClockTime::new(clock);
165
166 let inner = Inner::new(time_source.clone());
167
168 TimeDriver {
169 time_source,
170 handle: Handle::new(Rc::new(inner)),
171 park,
172 }
173 }
174
175 fn park_internal(&self, limit: Option<Duration>) -> io::Result<()> {
176 let mut inner_state = self.handle.get().state.borrow_mut();
177
178 let next_wake = inner_state.wheel.next_expiration_time();
179 inner_state.next_wake =
180 next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
181 drop(inner_state);
182
183 match next_wake {
184 Some(when) => {
185 let now = self.time_source.now();
186 // Note that we effectively round up to 1ms here - this avoids
187 // very short-duration microsecond-resolution sleeps that the OS
188 // might treat as zero-length.
189 let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
190
191 if duration > Duration::from_millis(0) {
192 if let Some(limit) = limit {
193 duration = std::cmp::min(limit, duration);
194 }
195
196 self.park.park_timeout(duration)?;
197 } else {
198 self.park.park_timeout(Duration::from_secs(0))?;
199 }
200 }
201 None => {
202 if let Some(duration) = limit {
203 self.park.park_timeout(duration)?;
204 } else {
205 self.park.park()?;
206 }
207 }
208 }
209
210 // Process pending timers after waking up
211 self.handle.process();
212
213 Ok(())
214 }
215}
216
217impl Handle {
218 /// Runs timer related logic, and returns the next wakeup time
219 pub(self) fn process(&self) {
220 let now = self.time_source().now();
221
222 self.process_at_time(now)
223 }
224
225 pub(self) fn process_at_time(&self, mut now: u64) {
226 let mut state = self.get().state.borrow_mut();
227
228 if now < state.elapsed {
229 // Time went backwards! This normally shouldn't happen as the Rust language
230 // guarantees that an Instant is monotonic, but can happen when running
231 // Linux in a VM on a Windows host due to std incorrectly trusting the
232 // hardware clock to be monotonic.
233 //
234 // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
235 now = state.elapsed;
236 }
237 while let Some(entry) = state.wheel.poll(now) {
238 if let Some(waker) = unsafe { entry.fire(Ok(())) } {
239 waker.wake();
240 }
241 }
242 state.elapsed = state.wheel.elapsed();
243 state.next_wake = state
244 .wheel
245 .poll_at()
246 .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
247 }
248
249 /// Removes a registered timer from the driver.
250 ///
251 /// The timer will be moved to the cancelled state. Wakers will _not_ be
252 /// invoked. If the timer is already completed, this function is a no-op.
253 ///
254 /// This function always acquires the driver lock, even if the entry does
255 /// not appear to be registered.
256 ///
257 /// SAFETY: The timer must not be registered with some other driver, and
258 /// `add_entry` must not be called concurrently.
259 pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
260 unsafe {
261 let mut state = self.get().state.borrow_mut();
262 if entry.as_ref().might_be_registered() {
263 state.wheel.remove(entry);
264 }
265
266 entry.as_ref().handle().fire(Ok(()));
267 }
268 }
269
270 /// Removes and re-adds an entry to the driver.
271 ///
272 /// SAFETY: The timer must be either unregistered, or registered with this
273 /// driver. No other threads are allowed to concurrently manipulate the
274 /// timer at all (the current thread should hold an exclusive reference to
275 /// the `TimerEntry`)
276 pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
277 let waker = unsafe {
278 let mut state = self.get().state.borrow_mut();
279
280 // We may have raced with a firing/deregistration, so check before
281 // deregistering.
282 if unsafe { entry.as_ref().might_be_registered() } {
283 state.wheel.remove(entry);
284 }
285
286 // Now that we have exclusive control of this entry, mint a handle to reinsert
287 // it.
288 let entry = entry.as_ref().handle();
289
290 entry.set_expiration(new_tick);
291
292 // Note: We don't have to worry about racing with some other resetting
293 // thread, because add_entry and reregister require exclusive control of
294 // the timer entry.
295 match unsafe { state.wheel.insert(entry) } {
296 Ok(_) => None,
297 Err((entry, super::error::InsertError::Elapsed)) => unsafe { entry.fire(Ok(())) },
298 }
299 };
300
301 // The timer was fired synchronously as a result of the reregistration.
302 // Wake the waker; this is needed because we might reset _after_ a poll,
303 // and otherwise the task won't be awoken to poll again.
304 if let Some(waker) = waker {
305 waker.wake();
306 }
307 }
308}
309
310impl<D> Driver for TimeDriver<D>
311where
312 D: Driver + 'static,
313{
314 fn with<R>(&self, f: impl FnOnce() -> R) -> R {
315 self.park.with(f)
316 }
317
318 fn submit(&self) -> io::Result<()> {
319 self.park.submit()
320 }
321
322 fn park(&self) -> io::Result<()> {
323 self.park_internal(None)
324 }
325
326 #[cfg(feature = "sync")]
327 type Unpark = D::Unpark;
328
329 fn park_timeout(&self, duration: Duration) -> io::Result<()> {
330 self.park_internal(Some(duration))
331 }
332
333 #[cfg(feature = "sync")]
334 fn unpark(&self) -> Self::Unpark {
335 self.park.unpark()
336 }
337}
338
339impl<D> Drop for TimeDriver<D>
340where
341 D: 'static,
342{
343 fn drop(&mut self) {
344 // self.shutdown();
345 }
346}
347
348// ===== impl Inner =====
349
350impl Inner {
351 pub(self) fn new(time_source: ClockTime) -> Self {
352 Inner {
353 state: RefCell::new(InnerState {
354 time_source,
355 elapsed: 0,
356 next_wake: None,
357 wheel: wheel::Wheel::new(),
358 }),
359 }
360 }
361}
362
363impl fmt::Debug for Inner {
364 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
365 fmt.debug_struct("Inner").finish()
366 }
367}