ferron/
runtime.rs

1use std::future::Future;
2#[cfg(feature = "runtime-vibeio")]
3use std::sync::LazyLock;
4
5/// A representation of an asynchronous runtime
6pub struct Runtime {
7  inner: RuntimeInner,
8  io_uring_enable_configured: Option<i32>,
9}
10
11enum RuntimeInner {
12  #[cfg(all(feature = "runtime-monoio", target_os = "linux"))]
13  MonoioIouring(monoio::Runtime<monoio::time::TimeDriver<monoio::IoUringDriver>>),
14  #[cfg(feature = "runtime-monoio")]
15  MonoioLegacy(monoio::Runtime<monoio::time::TimeDriver<monoio::LegacyDriver>>),
16  #[cfg(feature = "runtime-vibeio")]
17  Custom(vibeio::Runtime),
18  #[cfg(feature = "runtime-tokio")]
19  Tokio(tokio::runtime::Runtime),
20  TokioOnly(tokio::runtime::Runtime),
21}
22
23impl Runtime {
24  /// Creates a new asynchronous runtime
25  pub fn new_runtime(enable_uring: Option<bool>) -> Result<Self, std::io::Error> {
26    #[allow(unused_mut)]
27    let mut io_uring_enable_configured = None;
28
29    #[cfg(all(feature = "runtime-monoio", target_os = "linux"))]
30    if enable_uring.is_none_or(|x| x) && monoio::utils::detect_uring() {
31      match monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
32        .enable_all()
33        .attach_thread_pool(Box::new(BlockingThreadPool))
34        .build()
35      {
36        Ok(rt) => {
37          return Ok(Self {
38            inner: RuntimeInner::MonoioIouring(rt),
39            io_uring_enable_configured,
40          });
41        }
42        Err(e) => {
43          if enable_uring.is_some() {
44            Err(e)?;
45          } else {
46            io_uring_enable_configured = e.raw_os_error();
47          }
48        }
49      }
50    }
51    #[cfg(all(feature = "runtime-vibeio", target_os = "linux"))]
52    if enable_uring.is_none_or(|x| x) && vibeio::util::supports_io_uring() {
53      match vibeio::RuntimeBuilder::new()
54        .driver(vibeio::DriverKind::IoUring)
55        .enable_timer(true)
56        .blocking_pool(Box::new(BlockingThreadPool))
57        .build()
58      {
59        Ok(rt) => {
60          return Ok(Self {
61            inner: RuntimeInner::Custom(rt),
62            io_uring_enable_configured,
63          });
64        }
65        Err(e) => {
66          if enable_uring.is_some() {
67            Err(e)?;
68          } else {
69            io_uring_enable_configured = e.raw_os_error();
70          }
71        }
72      }
73    }
74    #[cfg(not(all(any(feature = "runtime-monoio", feature = "runtime-vibeio"), target_os = "linux")))]
75    let _ = enable_uring;
76
77    // `io_uring` is either disabled or not supported
78    #[cfg(feature = "runtime-monoio")]
79    let rt_inner = RuntimeInner::MonoioLegacy(
80      monoio::RuntimeBuilder::<monoio::LegacyDriver>::new()
81        .enable_all()
82        .attach_thread_pool(Box::new(BlockingThreadPool))
83        .build()?,
84    );
85    #[cfg(feature = "runtime-tokio")]
86    let rt_inner = RuntimeInner::Tokio(tokio::runtime::Builder::new_current_thread().enable_all().build()?);
87    #[cfg(feature = "runtime-vibeio")]
88    let rt_inner = {
89      #[cfg(unix)]
90      let driver_kind = vibeio::DriverKind::Mio;
91      #[cfg(windows)]
92      let driver_kind = vibeio::DriverKind::Iocp;
93      RuntimeInner::Custom(
94        vibeio::RuntimeBuilder::new()
95          .driver(driver_kind)
96          .enable_timer(true)
97          .blocking_pool(Box::new(BlockingThreadPool))
98          .build()?,
99      )
100    };
101
102    Ok(Self {
103      inner: rt_inner,
104      io_uring_enable_configured,
105    })
106  }
107
108  /// Creates a new asynchronous runtime using only Tokio
109  pub fn new_runtime_tokio_only() -> Result<Self, std::io::Error> {
110    Ok(Self {
111      inner: RuntimeInner::TokioOnly(tokio::runtime::Builder::new_current_thread().enable_all().build()?),
112      io_uring_enable_configured: None,
113    })
114  }
115
116  /// Return the OS error if `io_uring` couldn't be configured
117  pub fn return_io_uring_error(&self) -> Option<std::io::Error> {
118    self.io_uring_enable_configured.map(std::io::Error::from_raw_os_error)
119  }
120
121  /// Run a future on the runtime
122  pub fn run(&mut self, fut: impl Future + 'static) {
123    match self.inner {
124      #[cfg(all(feature = "runtime-monoio", target_os = "linux"))]
125      RuntimeInner::MonoioIouring(ref mut rt) => rt.block_on(fut),
126      #[cfg(feature = "runtime-monoio")]
127      RuntimeInner::MonoioLegacy(ref mut rt) => rt.block_on(fut),
128      #[cfg(feature = "runtime-tokio")]
129      RuntimeInner::Tokio(ref mut rt) => rt.block_on(async move {
130        let local_set = tokio::task::LocalSet::new();
131        local_set.run_until(fut).await;
132      }),
133      #[cfg(feature = "runtime-vibeio")]
134      RuntimeInner::Custom(ref mut rt) => rt.block_on(fut),
135      RuntimeInner::TokioOnly(ref mut rt) => rt.block_on(fut),
136    };
137  }
138}
139
140pub use ferron_common::runtime::*;
141
142#[cfg(feature = "runtime-vibeio")]
143use vibeio::blocking::DefaultBlockingThreadPool;
144
145/// A blocking thread pool for Monoio, implemented using `blocking` crate
146#[cfg(feature = "runtime-monoio")]
147struct BlockingThreadPool;
148
149#[cfg(feature = "runtime-monoio")]
150impl monoio::blocking::ThreadPool for BlockingThreadPool {
151  #[inline]
152  fn schedule_task(&self, task: monoio::blocking::BlockingTask) {
153    blocking::unblock(move || task.run()).detach();
154  }
155}
156
157#[cfg(feature = "runtime-vibeio")]
158static GLOBAL_BLOCKING_POOL: LazyLock<DefaultBlockingThreadPool> =
159  LazyLock::new(|| DefaultBlockingThreadPool::with_max_threads(1536));
160
161/// A global blocking thread pool for `vibeio`
162#[cfg(feature = "runtime-vibeio")]
163struct BlockingThreadPool;
164
165#[cfg(feature = "runtime-vibeio")]
166impl vibeio::blocking::BlockingThreadPool for BlockingThreadPool {
167  #[inline]
168  fn spawn(&self, task: Box<dyn FnOnce() + Send + 'static>) {
169    GLOBAL_BLOCKING_POOL.spawn(task);
170  }
171}