io_uring/submit.rs
1use std::os::unix::io::{AsRawFd, RawFd};
2use std::sync::atomic;
3use std::{io, mem, ptr};
4
5use crate::register::{execute, Probe};
6use crate::sys;
7use crate::types::{CancelBuilder, Timespec};
8use crate::util::{cast_ptr, OwnedFd};
9use crate::Parameters;
10
11use crate::register::Restriction;
12
13use crate::types;
14
15/// Interface for submitting submission queue events in an io_uring instance to the kernel for
16/// executing and registering files or buffers with the instance.
17///
18/// io_uring supports both directly performing I/O on buffers and file descriptors and registering
19/// them beforehand. Registering is slow, but it makes performing the actual I/O much faster.
20pub struct Submitter<'a> {
21 fd: &'a OwnedFd,
22 params: &'a Parameters,
23
24 sq_head: *const atomic::AtomicU32,
25 sq_tail: *const atomic::AtomicU32,
26 sq_flags: *const atomic::AtomicU32,
27}
28
29impl<'a> Submitter<'a> {
30 #[inline]
31 pub(crate) const fn new(
32 fd: &'a OwnedFd,
33 params: &'a Parameters,
34 sq_head: *const atomic::AtomicU32,
35 sq_tail: *const atomic::AtomicU32,
36 sq_flags: *const atomic::AtomicU32,
37 ) -> Submitter<'a> {
38 Submitter {
39 fd,
40 params,
41 sq_head,
42 sq_tail,
43 sq_flags,
44 }
45 }
46
47 #[inline]
48 fn sq_len(&self) -> usize {
49 unsafe {
50 let head = (*self.sq_head).load(atomic::Ordering::Acquire);
51 let tail = (*self.sq_tail).load(atomic::Ordering::Acquire);
52
53 tail.wrapping_sub(head) as usize
54 }
55 }
56
57 /// Whether the kernel thread has gone to sleep because it waited for too long without
58 /// submission queue entries.
59 #[inline]
60 fn sq_need_wakeup(&self) -> bool {
61 unsafe {
62 (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0
63 }
64 }
65
66 /// CQ ring is overflown
67 fn sq_cq_overflow(&self) -> bool {
68 unsafe {
69 (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
70 }
71 }
72
73 /// Initiate and/or complete asynchronous I/O. This is a low-level wrapper around
74 /// `io_uring_enter` - see `man io_uring_enter` (or [its online
75 /// version](https://manpages.debian.org/unstable/liburing-dev/io_uring_enter.2.en.html) for
76 /// more details.
77 ///
78 /// You will probably want to use a more high-level API such as
79 /// [`submit`](Self::submit) or [`submit_and_wait`](Self::submit_and_wait).
80 ///
81 /// # Safety
82 ///
83 /// This provides a raw interface so developer must ensure that parameters are correct.
84 pub unsafe fn enter<T: Sized>(
85 &self,
86 to_submit: u32,
87 min_complete: u32,
88 flag: u32,
89 arg: Option<&T>,
90 ) -> io::Result<usize> {
91 let arg = arg
92 .map(|arg| cast_ptr(arg).cast())
93 .unwrap_or_else(ptr::null);
94 let size = mem::size_of::<T>();
95 sys::io_uring_enter(
96 self.fd.as_raw_fd(),
97 to_submit,
98 min_complete,
99 flag,
100 arg,
101 size,
102 )
103 .map(|res| res as _)
104 }
105
106 /// Submit all queued submission queue events to the kernel.
107 #[inline]
108 pub fn submit(&self) -> io::Result<usize> {
109 self.submit_and_wait(0)
110 }
111
112 /// Submit all queued submission queue events to the kernel and wait for at least `want`
113 /// completion events to complete.
114 pub fn submit_and_wait(&self, want: usize) -> io::Result<usize> {
115 let len = self.sq_len();
116 let mut flags = 0;
117
118 // This logic suffers from the fact the sq_cq_overflow and sq_need_wakeup
119 // each cause an atomic load of the same variable, self.sq_flags.
120 // In the hottest paths, when a server is running with sqpoll,
121 // this is going to be hit twice, when once would be sufficient.
122
123 if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
124 flags |= sys::IORING_ENTER_GETEVENTS;
125 }
126
127 if self.params.is_setup_sqpoll() {
128 if self.sq_need_wakeup() {
129 flags |= sys::IORING_ENTER_SQ_WAKEUP;
130 } else if want == 0 {
131 // The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
132 // it to process events or wake it up
133 return Ok(len);
134 }
135 }
136
137 unsafe { self.enter::<libc::sigset_t>(len as _, want as _, flags, None) }
138 }
139
140 pub fn submit_with_args(
141 &self,
142 want: usize,
143 args: &types::SubmitArgs<'_, '_>,
144 ) -> io::Result<usize> {
145 let len = self.sq_len();
146 let mut flags = sys::IORING_ENTER_EXT_ARG;
147
148 if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
149 flags |= sys::IORING_ENTER_GETEVENTS;
150 }
151
152 if self.params.is_setup_sqpoll() {
153 if self.sq_need_wakeup() {
154 flags |= sys::IORING_ENTER_SQ_WAKEUP;
155 } else if want == 0 {
156 // The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
157 // it to process events or wake it up
158 return Ok(len);
159 }
160 }
161
162 unsafe { self.enter(len as _, want as _, flags, Some(&args.args)) }
163 }
164
165 /// Wait for the submission queue to have free entries.
166 pub fn squeue_wait(&self) -> io::Result<usize> {
167 unsafe { self.enter::<libc::sigset_t>(0, 0, sys::IORING_ENTER_SQ_WAIT, None) }
168 }
169
170 /// Register in-memory fixed buffers for I/O with the kernel. You can use these buffers with the
171 /// [`ReadFixed`](crate::opcode::ReadFixed) and [`WriteFixed`](crate::opcode::WriteFixed)
172 /// operations.
173 ///
174 /// # Safety
175 ///
176 /// Developers must ensure that the `iov_base` and `iov_len` values are valid and will
177 /// be valid until buffers are unregistered or the ring destroyed, otherwise undefined
178 /// behaviour may occur.
179 pub unsafe fn register_buffers(&self, bufs: &[libc::iovec]) -> io::Result<()> {
180 execute(
181 self.fd.as_raw_fd(),
182 sys::IORING_REGISTER_BUFFERS,
183 bufs.as_ptr().cast(),
184 bufs.len() as _,
185 )
186 .map(drop)
187 }
188
189 /// Registers an empty file table of nr_files number of file descriptors. The sparse variant is
190 /// available in kernels 5.19 and later.
191 ///
192 /// Registering a file table is a prerequisite for using any request that
193 /// uses direct descriptors.
194 pub fn register_files_sparse(&self, nr: u32) -> io::Result<()> {
195 let rr = sys::io_uring_rsrc_register {
196 nr,
197 flags: sys::IORING_RSRC_REGISTER_SPARSE,
198 resv2: 0,
199 data: 0,
200 tags: 0,
201 };
202 execute(
203 self.fd.as_raw_fd(),
204 sys::IORING_REGISTER_FILES2,
205 cast_ptr::<sys::io_uring_rsrc_register>(&rr).cast(),
206 mem::size_of::<sys::io_uring_rsrc_register>() as _,
207 )
208 .map(drop)
209 }
210
211 /// Register files for I/O. You can use the registered files with
212 /// [`Fixed`](crate::types::Fixed).
213 ///
214 /// Each fd may be -1, in which case it is considered "sparse", and can be filled in later with
215 /// [`register_files_update`](Self::register_files_update).
216 ///
217 /// Note that this will wait for the ring to idle; it will only return once all active requests
218 /// are complete. Use [`register_files_update`](Self::register_files_update) to avoid this.
219 pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
220 execute(
221 self.fd.as_raw_fd(),
222 sys::IORING_REGISTER_FILES,
223 fds.as_ptr().cast(),
224 fds.len() as _,
225 )
226 .map(drop)
227 }
228
229 /// This operation replaces existing files in the registered file set with new ones,
230 /// either turning a sparse entry (one where fd is equal to -1) into a real one, removing an existing entry (new one is set to -1),
231 /// or replacing an existing entry with a new existing entry. The `offset` parameter specifies
232 /// the offset into the list of registered files at which to start updating files.
233 ///
234 /// You can also perform this asynchronously with the
235 /// [`FilesUpdate`](crate::opcode::FilesUpdate) opcode.
236 pub fn register_files_update(&self, offset: u32, fds: &[RawFd]) -> io::Result<usize> {
237 let fu = sys::io_uring_files_update {
238 offset,
239 resv: 0,
240 fds: fds.as_ptr() as _,
241 };
242 let ret = execute(
243 self.fd.as_raw_fd(),
244 sys::IORING_REGISTER_FILES_UPDATE,
245 cast_ptr::<sys::io_uring_files_update>(&fu).cast(),
246 fds.len() as _,
247 )?;
248 Ok(ret as _)
249 }
250
251 /// Register an eventfd created by [`eventfd`](libc::eventfd) with the io_uring instance.
252 pub fn register_eventfd(&self, eventfd: RawFd) -> io::Result<()> {
253 execute(
254 self.fd.as_raw_fd(),
255 sys::IORING_REGISTER_EVENTFD,
256 cast_ptr::<RawFd>(&eventfd).cast(),
257 1,
258 )
259 .map(drop)
260 }
261
262 /// This works just like [`register_eventfd`](Self::register_eventfd), except notifications are
263 /// only posted for events that complete in an async manner, so requests that complete
264 /// immediately will not cause a notification.
265 pub fn register_eventfd_async(&self, eventfd: RawFd) -> io::Result<()> {
266 execute(
267 self.fd.as_raw_fd(),
268 sys::IORING_REGISTER_EVENTFD_ASYNC,
269 cast_ptr::<RawFd>(&eventfd).cast(),
270 1,
271 )
272 .map(drop)
273 }
274
275 /// Fill in the given [`Probe`] with information about the opcodes supported by io_uring on the
276 /// running kernel.
277 ///
278 /// # Examples
279 ///
280 // This is marked no_run as it is only available from Linux 5.6+, however the latest Ubuntu (on
281 // which CI runs) only has Linux 5.4.
282 /// ```no_run
283 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
284 /// let io_uring = io_uring::IoUring::new(1)?;
285 /// let mut probe = io_uring::Probe::new();
286 /// io_uring.submitter().register_probe(&mut probe)?;
287 ///
288 /// if probe.is_supported(io_uring::opcode::Read::CODE) {
289 /// println!("Reading is supported!");
290 /// }
291 /// # Ok(())
292 /// # }
293 /// ```
294 pub fn register_probe(&self, probe: &mut Probe) -> io::Result<()> {
295 execute(
296 self.fd.as_raw_fd(),
297 sys::IORING_REGISTER_PROBE,
298 probe.as_mut_ptr() as *const _,
299 Probe::COUNT as _,
300 )
301 .map(drop)
302 }
303
304 /// Register credentials of the running application with io_uring, and get an id associated with
305 /// these credentials. This ID can then be [passed](crate::squeue::Entry::personality) into
306 /// submission queue entries to issue the request with this process' credentials.
307 ///
308 /// By default, if [`Parameters::is_feature_cur_personality`] is set then requests will use the
309 /// credentials of the task that called [`Submitter::enter`], otherwise they will use the
310 /// credentials of the task that originally registered the io_uring.
311 ///
312 /// [`Parameters::is_feature_cur_personality`]: crate::Parameters::is_feature_cur_personality
313 pub fn register_personality(&self) -> io::Result<u16> {
314 let id = execute(
315 self.fd.as_raw_fd(),
316 sys::IORING_REGISTER_PERSONALITY,
317 ptr::null(),
318 0,
319 )?;
320 Ok(id as u16)
321 }
322
323 /// Unregister all previously registered buffers.
324 ///
325 /// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
326 /// it will be cleaned up by the kernel automatically.
327 pub fn unregister_buffers(&self) -> io::Result<()> {
328 execute(
329 self.fd.as_raw_fd(),
330 sys::IORING_UNREGISTER_BUFFERS,
331 ptr::null(),
332 0,
333 )
334 .map(drop)
335 }
336
337 /// Unregister all previously registered files.
338 ///
339 /// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
340 /// it will be cleaned up by the kernel automatically.
341 pub fn unregister_files(&self) -> io::Result<()> {
342 execute(
343 self.fd.as_raw_fd(),
344 sys::IORING_UNREGISTER_FILES,
345 ptr::null(),
346 0,
347 )
348 .map(drop)
349 }
350
351 /// Unregister an eventfd file descriptor to stop notifications.
352 pub fn unregister_eventfd(&self) -> io::Result<()> {
353 execute(
354 self.fd.as_raw_fd(),
355 sys::IORING_UNREGISTER_EVENTFD,
356 ptr::null(),
357 0,
358 )
359 .map(drop)
360 }
361
362 /// Unregister a previously registered personality.
363 pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
364 execute(
365 self.fd.as_raw_fd(),
366 sys::IORING_UNREGISTER_PERSONALITY,
367 ptr::null(),
368 personality as _,
369 )
370 .map(drop)
371 }
372
373 /// Permanently install a feature allowlist. Once this has been called, attempting to perform
374 /// an operation not on the allowlist will fail with `-EACCES`.
375 ///
376 /// This can only be called once, to prevent untrusted code from removing restrictions.
377 pub fn register_restrictions(&self, res: &mut [Restriction]) -> io::Result<()> {
378 execute(
379 self.fd.as_raw_fd(),
380 sys::IORING_REGISTER_RESTRICTIONS,
381 res.as_mut_ptr().cast(),
382 res.len() as _,
383 )
384 .map(drop)
385 }
386
387 /// Enable the rings of the io_uring instance if they have been disabled with
388 /// [`setup_r_disabled`](crate::Builder::setup_r_disabled).
389 pub fn register_enable_rings(&self) -> io::Result<()> {
390 execute(
391 self.fd.as_raw_fd(),
392 sys::IORING_REGISTER_ENABLE_RINGS,
393 ptr::null(),
394 0,
395 )
396 .map(drop)
397 }
398
399 /// Tell io_uring on what CPUs the async workers can run. By default, async workers
400 /// created by io_uring will inherit the CPU mask of its parent. This is usually
401 /// all the CPUs in the system, unless the parent is being run with a limited set.
402 pub fn register_iowq_aff(&self, cpu_set: &libc::cpu_set_t) -> io::Result<()> {
403 execute(
404 self.fd.as_raw_fd(),
405 sys::IORING_REGISTER_IOWQ_AFF,
406 cpu_set as *const _ as *const libc::c_void,
407 mem::size_of::<libc::cpu_set_t>() as u32,
408 )
409 .map(drop)
410 }
411
412 /// Undoes a CPU mask previously set with register_iowq_aff
413 pub fn unregister_iowq_aff(&self) -> io::Result<()> {
414 execute(
415 self.fd.as_raw_fd(),
416 sys::IORING_UNREGISTER_IOWQ_AFF,
417 ptr::null(),
418 0,
419 )
420 .map(drop)
421 }
422
423 /// Get and/or set the limit for number of io_uring worker threads per NUMA
424 /// node. `max[0]` holds the limit for bounded workers, which process I/O
425 /// operations expected to be bound in time, that is I/O on regular files or
426 /// block devices. While `max[1]` holds the limit for unbounded workers,
427 /// which carry out I/O operations that can never complete, for instance I/O
428 /// on sockets. Passing `0` does not change the current limit. Returns
429 /// previous limits on success.
430 pub fn register_iowq_max_workers(&self, max: &mut [u32; 2]) -> io::Result<()> {
431 execute(
432 self.fd.as_raw_fd(),
433 sys::IORING_REGISTER_IOWQ_MAX_WORKERS,
434 max.as_mut_ptr().cast(),
435 max.len() as _,
436 )
437 .map(drop)
438 }
439
440 /// Register buffer ring for provided buffers.
441 ///
442 /// Details can be found in the io_uring_register_buf_ring.3 man page.
443 ///
444 /// If the register command is not supported, or the ring_entries value exceeds
445 /// 32768, the InvalidInput error is returned.
446 ///
447 /// Available since 5.19.
448 ///
449 /// # Safety
450 ///
451 /// Developers must ensure that the `ring_addr` and its length represented by `ring_entries`
452 /// are valid and will be valid until the bgid is unregistered or the ring destroyed,
453 /// otherwise undefined behaviour may occur.
454 pub unsafe fn register_buf_ring(
455 &self,
456 ring_addr: u64,
457 ring_entries: u16,
458 bgid: u16,
459 ) -> io::Result<()> {
460 // The interface type for ring_entries is u32 but the same interface only allows a u16 for
461 // the tail to be specified, so to try and avoid further confusion, we limit the
462 // ring_entries to u16 here too. The value is actually limited to 2^15 (32768) but we can
463 // let the kernel enforce that.
464 let arg = sys::io_uring_buf_reg {
465 ring_addr,
466 ring_entries: ring_entries as _,
467 bgid,
468 ..Default::default()
469 };
470 execute(
471 self.fd.as_raw_fd(),
472 sys::IORING_REGISTER_PBUF_RING,
473 cast_ptr::<sys::io_uring_buf_reg>(&arg).cast(),
474 1,
475 )
476 .map(drop)
477 }
478
479 /// Unregister a previously registered buffer ring.
480 ///
481 /// Available since 5.19.
482 pub fn unregister_buf_ring(&self, bgid: u16) -> io::Result<()> {
483 let arg = sys::io_uring_buf_reg {
484 ring_addr: 0,
485 ring_entries: 0,
486 bgid,
487 ..Default::default()
488 };
489 execute(
490 self.fd.as_raw_fd(),
491 sys::IORING_UNREGISTER_PBUF_RING,
492 cast_ptr::<sys::io_uring_buf_reg>(&arg).cast(),
493 1,
494 )
495 .map(drop)
496 }
497
498 /// Performs a synchronous cancellation request, similar to [AsyncCancel](crate::opcode::AsyncCancel),
499 /// except that it completes synchronously.
500 ///
501 /// Cancellation can target a specific request, or all requests matching some criteria. The
502 /// [CancelBuilder](types::CancelBuilder) builder supports describing the match criteria for cancellation.
503 ///
504 /// An optional `timeout` can be provided to specify how long to wait for matched requests to be
505 /// canceled. If no timeout is provided, the default is to wait indefinitely.
506 ///
507 /// ### Errors
508 ///
509 /// If no requests are matched, returns:
510 ///
511 /// [io::ErrorKind::NotFound]: `No such file or directory (os error 2)`
512 ///
513 /// If a timeout is supplied, and the timeout elapses prior to all requests being canceled, returns:
514 ///
515 /// [io::ErrorKind::Uncategorized]: `Timer expired (os error 62)`
516 ///
517 /// ### Notes
518 ///
519 /// Only requests which have been submitted to the ring will be considered for cancellation. Requests
520 /// which have been written to the SQ, but not submitted, will not be canceled.
521 ///
522 /// Available since 6.0.
523 pub fn register_sync_cancel(
524 &self,
525 timeout: Option<Timespec>,
526 builder: CancelBuilder,
527 ) -> io::Result<()> {
528 let timespec = timeout.map(|ts| ts.0).unwrap_or(sys::__kernel_timespec {
529 tv_sec: -1,
530 tv_nsec: -1,
531 });
532 let user_data = builder.user_data.unwrap_or(0);
533 let flags = builder.flags.bits();
534 let fd = builder.to_fd();
535
536 let arg = {
537 let mut arg = sys::io_uring_sync_cancel_reg::default();
538 arg.addr = user_data;
539 arg.fd = fd;
540 arg.flags = flags;
541 arg.timeout = timespec;
542 arg
543 };
544 execute(
545 self.fd.as_raw_fd(),
546 sys::IORING_REGISTER_SYNC_CANCEL,
547 cast_ptr::<sys::io_uring_sync_cancel_reg>(&arg).cast(),
548 1,
549 )
550 .map(drop)
551 }
552}