io_uring/
submit.rs

1use std::os::unix::io::{AsRawFd, RawFd};
2use std::sync::atomic;
3use std::{io, mem, ptr};
4
5use crate::register::{execute, Probe};
6use crate::sys;
7use crate::types::{CancelBuilder, Timespec};
8use crate::util::{cast_ptr, OwnedFd};
9use crate::Parameters;
10
11use crate::register::Restriction;
12
13use crate::types;
14
15/// Interface for submitting submission queue events in an io_uring instance to the kernel for
16/// executing and registering files or buffers with the instance.
17///
18/// io_uring supports both directly performing I/O on buffers and file descriptors and registering
19/// them beforehand. Registering is slow, but it makes performing the actual I/O much faster.
20pub struct Submitter<'a> {
21    fd: &'a OwnedFd,
22    params: &'a Parameters,
23
24    sq_head: *const atomic::AtomicU32,
25    sq_tail: *const atomic::AtomicU32,
26    sq_flags: *const atomic::AtomicU32,
27}
28
29impl<'a> Submitter<'a> {
30    #[inline]
31    pub(crate) const fn new(
32        fd: &'a OwnedFd,
33        params: &'a Parameters,
34        sq_head: *const atomic::AtomicU32,
35        sq_tail: *const atomic::AtomicU32,
36        sq_flags: *const atomic::AtomicU32,
37    ) -> Submitter<'a> {
38        Submitter {
39            fd,
40            params,
41            sq_head,
42            sq_tail,
43            sq_flags,
44        }
45    }
46
47    #[inline]
48    fn sq_len(&self) -> usize {
49        unsafe {
50            let head = (*self.sq_head).load(atomic::Ordering::Acquire);
51            let tail = (*self.sq_tail).load(atomic::Ordering::Acquire);
52
53            tail.wrapping_sub(head) as usize
54        }
55    }
56
57    /// Whether the kernel thread has gone to sleep because it waited for too long without
58    /// submission queue entries.
59    #[inline]
60    fn sq_need_wakeup(&self) -> bool {
61        unsafe {
62            (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0
63        }
64    }
65
66    /// CQ ring is overflown
67    fn sq_cq_overflow(&self) -> bool {
68        unsafe {
69            (*self.sq_flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
70        }
71    }
72
73    /// Initiate and/or complete asynchronous I/O. This is a low-level wrapper around
74    /// `io_uring_enter` - see `man io_uring_enter` (or [its online
75    /// version](https://manpages.debian.org/unstable/liburing-dev/io_uring_enter.2.en.html) for
76    /// more details.
77    ///
78    /// You will probably want to use a more high-level API such as
79    /// [`submit`](Self::submit) or [`submit_and_wait`](Self::submit_and_wait).
80    ///
81    /// # Safety
82    ///
83    /// This provides a raw interface so developer must ensure that parameters are correct.
84    pub unsafe fn enter<T: Sized>(
85        &self,
86        to_submit: u32,
87        min_complete: u32,
88        flag: u32,
89        arg: Option<&T>,
90    ) -> io::Result<usize> {
91        let arg = arg
92            .map(|arg| cast_ptr(arg).cast())
93            .unwrap_or_else(ptr::null);
94        let size = mem::size_of::<T>();
95        sys::io_uring_enter(
96            self.fd.as_raw_fd(),
97            to_submit,
98            min_complete,
99            flag,
100            arg,
101            size,
102        )
103        .map(|res| res as _)
104    }
105
106    /// Submit all queued submission queue events to the kernel.
107    #[inline]
108    pub fn submit(&self) -> io::Result<usize> {
109        self.submit_and_wait(0)
110    }
111
112    /// Submit all queued submission queue events to the kernel and wait for at least `want`
113    /// completion events to complete.
114    pub fn submit_and_wait(&self, want: usize) -> io::Result<usize> {
115        let len = self.sq_len();
116        let mut flags = 0;
117
118        // This logic suffers from the fact the sq_cq_overflow and sq_need_wakeup
119        // each cause an atomic load of the same variable, self.sq_flags.
120        // In the hottest paths, when a server is running with sqpoll,
121        // this is going to be hit twice, when once would be sufficient.
122
123        if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
124            flags |= sys::IORING_ENTER_GETEVENTS;
125        }
126
127        if self.params.is_setup_sqpoll() {
128            if self.sq_need_wakeup() {
129                flags |= sys::IORING_ENTER_SQ_WAKEUP;
130            } else if want == 0 {
131                // The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
132                // it to process events or wake it up
133                return Ok(len);
134            }
135        }
136
137        unsafe { self.enter::<libc::sigset_t>(len as _, want as _, flags, None) }
138    }
139
140    pub fn submit_with_args(
141        &self,
142        want: usize,
143        args: &types::SubmitArgs<'_, '_>,
144    ) -> io::Result<usize> {
145        let len = self.sq_len();
146        let mut flags = sys::IORING_ENTER_EXT_ARG;
147
148        if want > 0 || self.params.is_setup_iopoll() || self.sq_cq_overflow() {
149            flags |= sys::IORING_ENTER_GETEVENTS;
150        }
151
152        if self.params.is_setup_sqpoll() {
153            if self.sq_need_wakeup() {
154                flags |= sys::IORING_ENTER_SQ_WAKEUP;
155            } else if want == 0 {
156                // The kernel thread is polling and hasn't fallen asleep, so we don't need to tell
157                // it to process events or wake it up
158                return Ok(len);
159            }
160        }
161
162        unsafe { self.enter(len as _, want as _, flags, Some(&args.args)) }
163    }
164
165    /// Wait for the submission queue to have free entries.
166    pub fn squeue_wait(&self) -> io::Result<usize> {
167        unsafe { self.enter::<libc::sigset_t>(0, 0, sys::IORING_ENTER_SQ_WAIT, None) }
168    }
169
170    /// Register in-memory fixed buffers for I/O with the kernel. You can use these buffers with the
171    /// [`ReadFixed`](crate::opcode::ReadFixed) and [`WriteFixed`](crate::opcode::WriteFixed)
172    /// operations.
173    ///
174    /// # Safety
175    ///
176    /// Developers must ensure that the `iov_base` and `iov_len` values are valid and will
177    /// be valid until buffers are unregistered or the ring destroyed, otherwise undefined
178    /// behaviour may occur.
179    pub unsafe fn register_buffers(&self, bufs: &[libc::iovec]) -> io::Result<()> {
180        execute(
181            self.fd.as_raw_fd(),
182            sys::IORING_REGISTER_BUFFERS,
183            bufs.as_ptr().cast(),
184            bufs.len() as _,
185        )
186        .map(drop)
187    }
188
189    /// Registers an empty file table of nr_files number of file descriptors. The sparse variant is
190    /// available in kernels 5.19 and later.
191    ///
192    /// Registering a file table is a prerequisite for using any request that
193    /// uses direct descriptors.
194    pub fn register_files_sparse(&self, nr: u32) -> io::Result<()> {
195        let rr = sys::io_uring_rsrc_register {
196            nr,
197            flags: sys::IORING_RSRC_REGISTER_SPARSE,
198            resv2: 0,
199            data: 0,
200            tags: 0,
201        };
202        execute(
203            self.fd.as_raw_fd(),
204            sys::IORING_REGISTER_FILES2,
205            cast_ptr::<sys::io_uring_rsrc_register>(&rr).cast(),
206            mem::size_of::<sys::io_uring_rsrc_register>() as _,
207        )
208        .map(drop)
209    }
210
211    /// Register files for I/O. You can use the registered files with
212    /// [`Fixed`](crate::types::Fixed).
213    ///
214    /// Each fd may be -1, in which case it is considered "sparse", and can be filled in later with
215    /// [`register_files_update`](Self::register_files_update).
216    ///
217    /// Note that this will wait for the ring to idle; it will only return once all active requests
218    /// are complete. Use [`register_files_update`](Self::register_files_update) to avoid this.
219    pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> {
220        execute(
221            self.fd.as_raw_fd(),
222            sys::IORING_REGISTER_FILES,
223            fds.as_ptr().cast(),
224            fds.len() as _,
225        )
226        .map(drop)
227    }
228
229    /// This operation replaces existing files in the registered file set with new ones,
230    /// either turning a sparse entry (one where fd is equal to -1) into a real one, removing an existing entry (new one is set to -1),
231    /// or replacing an existing entry with a new existing entry. The `offset` parameter specifies
232    /// the offset into the list of registered files at which to start updating files.
233    ///
234    /// You can also perform this asynchronously with the
235    /// [`FilesUpdate`](crate::opcode::FilesUpdate) opcode.
236    pub fn register_files_update(&self, offset: u32, fds: &[RawFd]) -> io::Result<usize> {
237        let fu = sys::io_uring_files_update {
238            offset,
239            resv: 0,
240            fds: fds.as_ptr() as _,
241        };
242        let ret = execute(
243            self.fd.as_raw_fd(),
244            sys::IORING_REGISTER_FILES_UPDATE,
245            cast_ptr::<sys::io_uring_files_update>(&fu).cast(),
246            fds.len() as _,
247        )?;
248        Ok(ret as _)
249    }
250
251    /// Register an eventfd created by [`eventfd`](libc::eventfd) with the io_uring instance.
252    pub fn register_eventfd(&self, eventfd: RawFd) -> io::Result<()> {
253        execute(
254            self.fd.as_raw_fd(),
255            sys::IORING_REGISTER_EVENTFD,
256            cast_ptr::<RawFd>(&eventfd).cast(),
257            1,
258        )
259        .map(drop)
260    }
261
262    /// This works just like [`register_eventfd`](Self::register_eventfd), except notifications are
263    /// only posted for events that complete in an async manner, so requests that complete
264    /// immediately will not cause a notification.
265    pub fn register_eventfd_async(&self, eventfd: RawFd) -> io::Result<()> {
266        execute(
267            self.fd.as_raw_fd(),
268            sys::IORING_REGISTER_EVENTFD_ASYNC,
269            cast_ptr::<RawFd>(&eventfd).cast(),
270            1,
271        )
272        .map(drop)
273    }
274
275    /// Fill in the given [`Probe`] with information about the opcodes supported by io_uring on the
276    /// running kernel.
277    ///
278    /// # Examples
279    ///
280    // This is marked no_run as it is only available from Linux 5.6+, however the latest Ubuntu (on
281    // which CI runs) only has Linux 5.4.
282    /// ```no_run
283    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
284    /// let io_uring = io_uring::IoUring::new(1)?;
285    /// let mut probe = io_uring::Probe::new();
286    /// io_uring.submitter().register_probe(&mut probe)?;
287    ///
288    /// if probe.is_supported(io_uring::opcode::Read::CODE) {
289    ///     println!("Reading is supported!");
290    /// }
291    /// # Ok(())
292    /// # }
293    /// ```
294    pub fn register_probe(&self, probe: &mut Probe) -> io::Result<()> {
295        execute(
296            self.fd.as_raw_fd(),
297            sys::IORING_REGISTER_PROBE,
298            probe.as_mut_ptr() as *const _,
299            Probe::COUNT as _,
300        )
301        .map(drop)
302    }
303
304    /// Register credentials of the running application with io_uring, and get an id associated with
305    /// these credentials. This ID can then be [passed](crate::squeue::Entry::personality) into
306    /// submission queue entries to issue the request with this process' credentials.
307    ///
308    /// By default, if [`Parameters::is_feature_cur_personality`] is set then requests will use the
309    /// credentials of the task that called [`Submitter::enter`], otherwise they will use the
310    /// credentials of the task that originally registered the io_uring.
311    ///
312    /// [`Parameters::is_feature_cur_personality`]: crate::Parameters::is_feature_cur_personality
313    pub fn register_personality(&self) -> io::Result<u16> {
314        let id = execute(
315            self.fd.as_raw_fd(),
316            sys::IORING_REGISTER_PERSONALITY,
317            ptr::null(),
318            0,
319        )?;
320        Ok(id as u16)
321    }
322
323    /// Unregister all previously registered buffers.
324    ///
325    /// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
326    /// it will be cleaned up by the kernel automatically.
327    pub fn unregister_buffers(&self) -> io::Result<()> {
328        execute(
329            self.fd.as_raw_fd(),
330            sys::IORING_UNREGISTER_BUFFERS,
331            ptr::null(),
332            0,
333        )
334        .map(drop)
335    }
336
337    /// Unregister all previously registered files.
338    ///
339    /// You do not need to explicitly call this before dropping the [`IoUring`](crate::IoUring), as
340    /// it will be cleaned up by the kernel automatically.
341    pub fn unregister_files(&self) -> io::Result<()> {
342        execute(
343            self.fd.as_raw_fd(),
344            sys::IORING_UNREGISTER_FILES,
345            ptr::null(),
346            0,
347        )
348        .map(drop)
349    }
350
351    /// Unregister an eventfd file descriptor to stop notifications.
352    pub fn unregister_eventfd(&self) -> io::Result<()> {
353        execute(
354            self.fd.as_raw_fd(),
355            sys::IORING_UNREGISTER_EVENTFD,
356            ptr::null(),
357            0,
358        )
359        .map(drop)
360    }
361
362    /// Unregister a previously registered personality.
363    pub fn unregister_personality(&self, personality: u16) -> io::Result<()> {
364        execute(
365            self.fd.as_raw_fd(),
366            sys::IORING_UNREGISTER_PERSONALITY,
367            ptr::null(),
368            personality as _,
369        )
370        .map(drop)
371    }
372
373    /// Permanently install a feature allowlist. Once this has been called, attempting to perform
374    /// an operation not on the allowlist will fail with `-EACCES`.
375    ///
376    /// This can only be called once, to prevent untrusted code from removing restrictions.
377    pub fn register_restrictions(&self, res: &mut [Restriction]) -> io::Result<()> {
378        execute(
379            self.fd.as_raw_fd(),
380            sys::IORING_REGISTER_RESTRICTIONS,
381            res.as_mut_ptr().cast(),
382            res.len() as _,
383        )
384        .map(drop)
385    }
386
387    /// Enable the rings of the io_uring instance if they have been disabled with
388    /// [`setup_r_disabled`](crate::Builder::setup_r_disabled).
389    pub fn register_enable_rings(&self) -> io::Result<()> {
390        execute(
391            self.fd.as_raw_fd(),
392            sys::IORING_REGISTER_ENABLE_RINGS,
393            ptr::null(),
394            0,
395        )
396        .map(drop)
397    }
398
399    /// Tell io_uring on what CPUs the async workers can run. By default, async workers
400    /// created by io_uring will inherit the CPU mask of its parent. This is usually
401    /// all the CPUs in the system, unless the parent is being run with a limited set.
402    pub fn register_iowq_aff(&self, cpu_set: &libc::cpu_set_t) -> io::Result<()> {
403        execute(
404            self.fd.as_raw_fd(),
405            sys::IORING_REGISTER_IOWQ_AFF,
406            cpu_set as *const _ as *const libc::c_void,
407            mem::size_of::<libc::cpu_set_t>() as u32,
408        )
409        .map(drop)
410    }
411
412    /// Undoes a CPU mask previously set with register_iowq_aff
413    pub fn unregister_iowq_aff(&self) -> io::Result<()> {
414        execute(
415            self.fd.as_raw_fd(),
416            sys::IORING_UNREGISTER_IOWQ_AFF,
417            ptr::null(),
418            0,
419        )
420        .map(drop)
421    }
422
423    /// Get and/or set the limit for number of io_uring worker threads per NUMA
424    /// node. `max[0]` holds the limit for bounded workers, which process I/O
425    /// operations expected to be bound in time, that is I/O on regular files or
426    /// block devices. While `max[1]` holds the limit for unbounded workers,
427    /// which carry out I/O operations that can never complete, for instance I/O
428    /// on sockets. Passing `0` does not change the current limit. Returns
429    /// previous limits on success.
430    pub fn register_iowq_max_workers(&self, max: &mut [u32; 2]) -> io::Result<()> {
431        execute(
432            self.fd.as_raw_fd(),
433            sys::IORING_REGISTER_IOWQ_MAX_WORKERS,
434            max.as_mut_ptr().cast(),
435            max.len() as _,
436        )
437        .map(drop)
438    }
439
440    /// Register buffer ring for provided buffers.
441    ///
442    /// Details can be found in the io_uring_register_buf_ring.3 man page.
443    ///
444    /// If the register command is not supported, or the ring_entries value exceeds
445    /// 32768, the InvalidInput error is returned.
446    ///
447    /// Available since 5.19.
448    ///
449    /// # Safety
450    ///
451    /// Developers must ensure that the `ring_addr` and its length represented by `ring_entries`
452    /// are valid and will be valid until the bgid is unregistered or the ring destroyed,
453    /// otherwise undefined behaviour may occur.
454    pub unsafe fn register_buf_ring(
455        &self,
456        ring_addr: u64,
457        ring_entries: u16,
458        bgid: u16,
459    ) -> io::Result<()> {
460        // The interface type for ring_entries is u32 but the same interface only allows a u16 for
461        // the tail to be specified, so to try and avoid further confusion, we limit the
462        // ring_entries to u16 here too. The value is actually limited to 2^15 (32768) but we can
463        // let the kernel enforce that.
464        let arg = sys::io_uring_buf_reg {
465            ring_addr,
466            ring_entries: ring_entries as _,
467            bgid,
468            ..Default::default()
469        };
470        execute(
471            self.fd.as_raw_fd(),
472            sys::IORING_REGISTER_PBUF_RING,
473            cast_ptr::<sys::io_uring_buf_reg>(&arg).cast(),
474            1,
475        )
476        .map(drop)
477    }
478
479    /// Unregister a previously registered buffer ring.
480    ///
481    /// Available since 5.19.
482    pub fn unregister_buf_ring(&self, bgid: u16) -> io::Result<()> {
483        let arg = sys::io_uring_buf_reg {
484            ring_addr: 0,
485            ring_entries: 0,
486            bgid,
487            ..Default::default()
488        };
489        execute(
490            self.fd.as_raw_fd(),
491            sys::IORING_UNREGISTER_PBUF_RING,
492            cast_ptr::<sys::io_uring_buf_reg>(&arg).cast(),
493            1,
494        )
495        .map(drop)
496    }
497
498    /// Performs a synchronous cancellation request, similar to [AsyncCancel](crate::opcode::AsyncCancel),
499    /// except that it completes synchronously.
500    ///
501    /// Cancellation can target a specific request, or all requests matching some criteria. The
502    /// [CancelBuilder](types::CancelBuilder) builder supports describing the match criteria for cancellation.
503    ///
504    /// An optional `timeout` can be provided to specify how long to wait for matched requests to be
505    /// canceled. If no timeout is provided, the default is to wait indefinitely.
506    ///
507    /// ### Errors
508    ///
509    /// If no requests are matched, returns:
510    ///
511    /// [io::ErrorKind::NotFound]: `No such file or directory (os error 2)`
512    ///
513    /// If a timeout is supplied, and the timeout elapses prior to all requests being canceled, returns:
514    ///
515    /// [io::ErrorKind::Uncategorized]: `Timer expired (os error 62)`
516    ///
517    /// ### Notes
518    ///
519    /// Only requests which have been submitted to the ring will be considered for cancellation. Requests
520    /// which have been written to the SQ, but not submitted, will not be canceled.
521    ///
522    /// Available since 6.0.
523    pub fn register_sync_cancel(
524        &self,
525        timeout: Option<Timespec>,
526        builder: CancelBuilder,
527    ) -> io::Result<()> {
528        let timespec = timeout.map(|ts| ts.0).unwrap_or(sys::__kernel_timespec {
529            tv_sec: -1,
530            tv_nsec: -1,
531        });
532        let user_data = builder.user_data.unwrap_or(0);
533        let flags = builder.flags.bits();
534        let fd = builder.to_fd();
535
536        let arg = {
537            let mut arg = sys::io_uring_sync_cancel_reg::default();
538            arg.addr = user_data;
539            arg.fd = fd;
540            arg.flags = flags;
541            arg.timeout = timespec;
542            arg
543        };
544        execute(
545            self.fd.as_raw_fd(),
546            sys::IORING_REGISTER_SYNC_CANCEL,
547            cast_ptr::<sys::io_uring_sync_cancel_reg>(&arg).cast(),
548            1,
549        )
550        .map(drop)
551    }
552}