monoio/
builder.rs

1use std::{io, marker::PhantomData};
2
3#[cfg(all(target_os = "linux", feature = "iouring"))]
4use crate::driver::IoUringDriver;
5#[cfg(feature = "legacy")]
6use crate::driver::LegacyDriver;
7#[cfg(any(feature = "legacy", feature = "iouring"))]
8use crate::utils::thread_id::gen_id;
9use crate::{
10    driver::Driver,
11    time::{driver::TimeDriver, Clock},
12    Runtime,
13};
14
15// ===== basic builder structure definition =====
16
17/// Runtime builder
18pub struct RuntimeBuilder<D> {
19    // iouring entries
20    entries: Option<u32>,
21
22    #[cfg(all(target_os = "linux", feature = "iouring"))]
23    urb: io_uring::Builder,
24
25    // blocking handle
26    #[cfg(feature = "sync")]
27    blocking_handle: crate::blocking::BlockingHandle,
28    // driver mark
29    _mark: PhantomData<D>,
30}
31
32scoped_thread_local!(pub(crate) static BUILD_THREAD_ID: usize);
33
34impl<T> Default for RuntimeBuilder<T> {
35    /// Create a default runtime builder.
36    ///
37    /// # Note
38    ///
39    /// When the sync feature is enabled, the default behavior of
40    /// [monoio::blocking::BlockingStrategy] is to execute tasks on the local thread. In other
41    /// words, there is no thread pool involved—all blocking I/O operations and heavy computations
42    /// will block the current thread.
43    fn default() -> Self {
44        RuntimeBuilder::<T>::new()
45    }
46}
47
48impl<T> RuntimeBuilder<T> {
49    /// Create a default runtime builder.
50    ///
51    /// # Note
52    ///
53    /// When the sync feature is enabled, the default behavior of
54    /// [monoio::blocking::BlockingStrategy] is to execute tasks on the local thread. In other
55    /// words, there is no thread pool involved—all blocking I/O operations and heavy computations
56    /// will block the current thread.
57    #[must_use]
58    pub fn new() -> Self {
59        Self {
60            entries: None,
61
62            #[cfg(all(target_os = "linux", feature = "iouring"))]
63            urb: io_uring::IoUring::builder(),
64
65            #[cfg(feature = "sync")]
66            blocking_handle: crate::blocking::BlockingStrategy::ExecuteLocal.into(),
67            _mark: PhantomData,
68        }
69    }
70}
71
72// ===== buildable trait and forward methods =====
73
74/// Buildable trait.
75pub trait Buildable: Sized {
76    /// Build the runtime.
77    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<Self>>;
78}
79
80#[allow(unused)]
81macro_rules! direct_build {
82    ($ty: ty) => {
83        impl RuntimeBuilder<$ty> {
84            /// Build the runtime.
85            pub fn build(self) -> io::Result<Runtime<$ty>> {
86                Buildable::build(self)
87            }
88        }
89    };
90}
91
92#[cfg(all(target_os = "linux", feature = "iouring"))]
93direct_build!(IoUringDriver);
94#[cfg(all(target_os = "linux", feature = "iouring"))]
95direct_build!(TimeDriver<IoUringDriver>);
96#[cfg(feature = "legacy")]
97direct_build!(LegacyDriver);
98#[cfg(feature = "legacy")]
99direct_build!(TimeDriver<LegacyDriver>);
100
101// ===== builder impl =====
102
103#[cfg(feature = "legacy")]
104impl Buildable for LegacyDriver {
105    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<LegacyDriver>> {
106        let thread_id = gen_id();
107        #[cfg(feature = "sync")]
108        let blocking_handle = this.blocking_handle;
109
110        BUILD_THREAD_ID.set(&thread_id, || {
111            let driver = match this.entries {
112                Some(entries) => LegacyDriver::new_with_entries(entries)?,
113                None => LegacyDriver::new()?,
114            };
115            #[cfg(feature = "sync")]
116            let context = crate::runtime::Context::new(blocking_handle);
117            #[cfg(not(feature = "sync"))]
118            let context = crate::runtime::Context::new();
119            Ok(Runtime::new(context, driver))
120        })
121    }
122}
123
124#[cfg(all(target_os = "linux", feature = "iouring"))]
125impl Buildable for IoUringDriver {
126    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<IoUringDriver>> {
127        let thread_id = gen_id();
128        #[cfg(feature = "sync")]
129        let blocking_handle = this.blocking_handle;
130
131        BUILD_THREAD_ID.set(&thread_id, || {
132            let driver = match this.entries {
133                Some(entries) => IoUringDriver::new_with_entries(&this.urb, entries)?,
134                None => IoUringDriver::new(&this.urb)?,
135            };
136            #[cfg(feature = "sync")]
137            let context = crate::runtime::Context::new(blocking_handle);
138            #[cfg(not(feature = "sync"))]
139            let context = crate::runtime::Context::new();
140            Ok(Runtime::new(context, driver))
141        })
142    }
143}
144
145impl<D> RuntimeBuilder<D> {
146    const MIN_ENTRIES: u32 = 256;
147
148    /// Set io_uring entries, min size is 256 and the default size is 1024.
149    #[must_use]
150    pub fn with_entries(mut self, entries: u32) -> Self {
151        // If entries is less than 256, it will be 256.
152        if entries < Self::MIN_ENTRIES {
153            self.entries = Some(Self::MIN_ENTRIES);
154            return self;
155        }
156        self.entries = Some(entries);
157        self
158    }
159
160    /// Replaces the default [`io_uring::Builder`], which controls the settings for the
161    /// inner `io_uring` API.
162    ///
163    /// Refer to the [`io_uring::Builder`] documentation for all the supported methods.
164    #[cfg(all(target_os = "linux", feature = "iouring"))]
165    #[must_use]
166    pub fn uring_builder(mut self, urb: io_uring::Builder) -> Self {
167        self.urb = urb;
168        self
169    }
170}
171
172// ===== FusionDriver =====
173
174/// Fake driver only for conditionally building.
175#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
176pub struct FusionDriver;
177
178#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
179impl RuntimeBuilder<FusionDriver> {
180    /// Build the runtime.
181    #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
182    pub fn build(self) -> io::Result<crate::FusionRuntime<IoUringDriver, LegacyDriver>> {
183        if crate::utils::detect_uring() {
184            let builder = RuntimeBuilder::<IoUringDriver> {
185                entries: self.entries,
186                urb: self.urb,
187                #[cfg(feature = "sync")]
188                blocking_handle: self.blocking_handle,
189                _mark: PhantomData,
190            };
191            info!("io_uring driver built");
192            Ok(builder.build()?.into())
193        } else {
194            let builder = RuntimeBuilder::<LegacyDriver> {
195                entries: self.entries,
196                urb: self.urb,
197                #[cfg(feature = "sync")]
198                blocking_handle: self.blocking_handle,
199                _mark: PhantomData,
200            };
201            info!("legacy driver built");
202            Ok(builder.build()?.into())
203        }
204    }
205
206    /// Build the runtime.
207    #[cfg(not(all(target_os = "linux", feature = "iouring")))]
208    pub fn build(self) -> io::Result<crate::FusionRuntime<LegacyDriver>> {
209        let builder = RuntimeBuilder::<LegacyDriver> {
210            entries: self.entries,
211            #[cfg(feature = "sync")]
212            blocking_handle: self.blocking_handle,
213            _mark: PhantomData,
214        };
215        Ok(builder.build()?.into())
216    }
217
218    /// Build the runtime.
219    #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
220    pub fn build(self) -> io::Result<crate::FusionRuntime<IoUringDriver>> {
221        let builder = RuntimeBuilder::<IoUringDriver> {
222            entries: self.entries,
223            urb: self.urb,
224            #[cfg(feature = "sync")]
225            blocking_handle: self.blocking_handle,
226            _mark: PhantomData,
227        };
228        Ok(builder.build()?.into())
229    }
230}
231
232#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
233impl RuntimeBuilder<TimeDriver<FusionDriver>> {
234    /// Build the runtime.
235    #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
236    pub fn build(
237        self,
238    ) -> io::Result<crate::FusionRuntime<TimeDriver<IoUringDriver>, TimeDriver<LegacyDriver>>> {
239        if crate::utils::detect_uring() {
240            let builder = RuntimeBuilder::<TimeDriver<IoUringDriver>> {
241                entries: self.entries,
242                urb: self.urb,
243                #[cfg(feature = "sync")]
244                blocking_handle: self.blocking_handle,
245                _mark: PhantomData,
246            };
247            info!("io_uring driver with timer built");
248            Ok(builder.build()?.into())
249        } else {
250            let builder = RuntimeBuilder::<TimeDriver<LegacyDriver>> {
251                entries: self.entries,
252                urb: self.urb,
253                #[cfg(feature = "sync")]
254                blocking_handle: self.blocking_handle,
255                _mark: PhantomData,
256            };
257            info!("legacy driver with timer built");
258            Ok(builder.build()?.into())
259        }
260    }
261
262    /// Build the runtime.
263    #[cfg(not(all(target_os = "linux", feature = "iouring")))]
264    pub fn build(self) -> io::Result<crate::FusionRuntime<TimeDriver<LegacyDriver>>> {
265        let builder = RuntimeBuilder::<TimeDriver<LegacyDriver>> {
266            entries: self.entries,
267            #[cfg(feature = "sync")]
268            blocking_handle: self.blocking_handle,
269            _mark: PhantomData,
270        };
271        Ok(builder.build()?.into())
272    }
273
274    /// Build the runtime.
275    #[cfg(all(target_os = "linux", feature = "iouring", not(feature = "legacy")))]
276    pub fn build(self) -> io::Result<crate::FusionRuntime<TimeDriver<IoUringDriver>>> {
277        let builder = RuntimeBuilder::<TimeDriver<IoUringDriver>> {
278            entries: self.entries,
279            urb: self.urb,
280            #[cfg(feature = "sync")]
281            blocking_handle: self.blocking_handle,
282            _mark: PhantomData,
283        };
284        Ok(builder.build()?.into())
285    }
286}
287
288// ===== enable_timer related =====
289mod time_wrap {
290    pub trait TimeWrapable {}
291}
292
293#[cfg(all(target_os = "linux", feature = "iouring"))]
294impl time_wrap::TimeWrapable for IoUringDriver {}
295#[cfg(feature = "legacy")]
296impl time_wrap::TimeWrapable for LegacyDriver {}
297#[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))]
298impl time_wrap::TimeWrapable for FusionDriver {}
299
300impl<D: Driver> Buildable for TimeDriver<D>
301where
302    D: Buildable,
303{
304    /// Build the runtime
305    fn build(this: RuntimeBuilder<Self>) -> io::Result<Runtime<TimeDriver<D>>> {
306        let Runtime {
307            driver,
308            mut context,
309        } = Buildable::build(RuntimeBuilder::<D> {
310            entries: this.entries,
311            #[cfg(all(target_os = "linux", feature = "iouring"))]
312            urb: this.urb,
313            #[cfg(feature = "sync")]
314            blocking_handle: this.blocking_handle,
315            _mark: PhantomData,
316        })?;
317
318        let timer_driver = TimeDriver::new(driver, Clock::new());
319        context.time_handle = Some(timer_driver.handle.clone());
320        Ok(Runtime {
321            driver: timer_driver,
322            context,
323        })
324    }
325}
326
327impl<D: time_wrap::TimeWrapable> RuntimeBuilder<D> {
328    /// Enable all(currently only timer)
329    #[must_use]
330    pub fn enable_all(self) -> RuntimeBuilder<TimeDriver<D>> {
331        self.enable_timer()
332    }
333
334    /// Enable timer
335    #[must_use]
336    pub fn enable_timer(self) -> RuntimeBuilder<TimeDriver<D>> {
337        let Self {
338            entries,
339            #[cfg(all(target_os = "linux", feature = "iouring"))]
340            urb,
341            #[cfg(feature = "sync")]
342            blocking_handle,
343            ..
344        } = self;
345        RuntimeBuilder {
346            entries,
347            #[cfg(all(target_os = "linux", feature = "iouring"))]
348            urb,
349            #[cfg(feature = "sync")]
350            blocking_handle,
351            _mark: PhantomData,
352        }
353    }
354}
355
356impl<D> RuntimeBuilder<D> {
357    /// Attach thread pool, this will overwrite blocking strategy.
358    /// All `spawn_blocking` will be executed on given thread pool.
359    #[cfg(feature = "sync")]
360    #[must_use]
361    pub fn attach_thread_pool(
362        mut self,
363        tp: Box<dyn crate::blocking::ThreadPool + Send + 'static>,
364    ) -> Self {
365        self.blocking_handle = crate::blocking::BlockingHandle::Attached(tp);
366        self
367    }
368
369    /// Set blocking strategy, this will overwrite thread pool setting.
370    /// If `BlockingStrategy::Panic` is used, it will panic if `spawn_blocking` on this thread.
371    /// If `BlockingStrategy::ExecuteLocal` is used, it will execute with current thread, and may
372    /// cause tasks high latency.
373    /// Attaching a thread pool is recommended if `spawn_blocking` will be used.
374    #[cfg(feature = "sync")]
375    #[must_use]
376    pub fn with_blocking_strategy(mut self, strategy: crate::blocking::BlockingStrategy) -> Self {
377        self.blocking_handle = crate::blocking::BlockingHandle::Empty(strategy);
378        self
379    }
380}