io_uring/
squeue.rs

1//! Submission Queue
2
3use std::error::Error;
4use std::fmt::{self, Debug, Display, Formatter};
5use std::mem;
6use std::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14    pub(crate) head: *const atomic::AtomicU32,
15    pub(crate) tail: *const atomic::AtomicU32,
16    pub(crate) ring_mask: u32,
17    pub(crate) ring_entries: u32,
18    pub(crate) flags: *const atomic::AtomicU32,
19    dropped: *const atomic::AtomicU32,
20
21    pub(crate) sqes: *mut E,
22}
23
24/// An io_uring instance's submission queue. This is used to send I/O requests to the kernel.
25pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
26    head: u32,
27    tail: u32,
28    queue: &'a Inner<E>,
29}
30
31/// A submission queue entry (SQE), representing a request for an I/O operation.
32///
33/// This is implemented for [`Entry`] and [`Entry128`].
34pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
35    const BUILD_FLAGS: u32;
36}
37
38/// A 64-byte submission queue entry (SQE), representing a request for an I/O operation.
39///
40/// These can be created via opcodes in [`opcode`](crate::opcode).
41#[repr(C)]
42pub struct Entry(pub(crate) sys::io_uring_sqe);
43
44/// A 128-byte submission queue entry (SQE), representing a request for an I/O operation.
45///
46/// These can be created via opcodes in [`opcode`](crate::opcode).
47#[repr(C)]
48#[derive(Clone)]
49pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
50
51#[test]
52fn test_entry_sizes() {
53    assert_eq!(mem::size_of::<Entry>(), 64);
54    assert_eq!(mem::size_of::<Entry128>(), 128);
55}
56
57bitflags! {
58    /// Submission flags
59    pub struct Flags: u8 {
60        /// When this flag is specified,
61        /// `fd` is an index into the files array registered with the io_uring instance.
62        #[doc(hidden)]
63        const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT;
64
65        /// When this flag is specified,
66        /// the SQE will not be started before previously submitted SQEs have completed,
67        /// and new SQEs will not be started before this one completes.
68        const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT;
69
70        /// When this flag is specified,
71        /// it forms a link with the next SQE in the submission ring.
72        /// That next SQE will not be started before this one completes.
73        const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT;
74
75        /// Like [`IO_LINK`](Self::IO_LINK), but it doesn’t sever regardless of the completion
76        /// result.
77        const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT;
78
79        /// Normal operation for io_uring is to try and issue an sqe as non-blocking first,
80        /// and if that fails, execute it in an async manner.
81        ///
82        /// To support more efficient overlapped operation of requests
83        /// that the application knows/assumes will always (or most of the time) block,
84        /// the application can ask for an sqe to be issued async from the start.
85        const ASYNC = 1 << sys::IOSQE_ASYNC_BIT;
86
87        /// Conceptually the kernel holds a set of buffers organized into groups. When you issue a
88        /// request with this flag and set `buf_group` to a valid buffer group ID (e.g.
89        /// [`buf_group` on `Read`](crate::opcode::Read::buf_group)) then once the file descriptor
90        /// becomes ready the kernel will try to take a buffer from the group.
91        ///
92        /// If there are no buffers in the group, your request will fail with `-ENOBUFS`. Otherwise,
93        /// the corresponding [`cqueue::Entry::flags`](crate::cqueue::Entry::flags) will contain the
94        /// chosen buffer ID, encoded with:
95        ///
96        /// ```text
97        /// (buffer_id << IORING_CQE_BUFFER_SHIFT) | IORING_CQE_F_BUFFER
98        /// ```
99        ///
100        /// You can use [`buffer_select`](crate::cqueue::buffer_select) to take the buffer ID.
101        ///
102        /// The buffer will then be removed from the group and won't be usable by other requests
103        /// anymore.
104        ///
105        /// You can provide new buffers in a group with
106        /// [`ProvideBuffers`](crate::opcode::ProvideBuffers).
107        ///
108        /// See also [the LWN thread on automatic buffer
109        /// selection](https://lwn.net/Articles/815491/).
110        const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT;
111
112        /// Don't post CQE if request succeeded.
113        const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT;
114    }
115}
116
117impl<E: EntryMarker> Inner<E> {
118    #[rustfmt::skip]
119    pub(crate) unsafe fn new(
120        sq_mmap: &Mmap,
121        sqe_mmap: &Mmap,
122        p: &sys::io_uring_params,
123    ) -> Self {
124        let head         = sq_mmap.offset(p.sq_off.head        ) as *const atomic::AtomicU32;
125        let tail         = sq_mmap.offset(p.sq_off.tail        ) as *const atomic::AtomicU32;
126        let ring_mask    = sq_mmap.offset(p.sq_off.ring_mask   ).cast::<u32>().read();
127        let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
128        let flags        = sq_mmap.offset(p.sq_off.flags       ) as *const atomic::AtomicU32;
129        let dropped      = sq_mmap.offset(p.sq_off.dropped     ) as *const atomic::AtomicU32;
130        let array        = sq_mmap.offset(p.sq_off.array       ) as *mut u32;
131
132        let sqes         = sqe_mmap.as_mut_ptr() as *mut E;
133
134        // To keep it simple, map it directly to `sqes`.
135        for i in 0..ring_entries {
136            array.add(i as usize).write_volatile(i);
137        }
138
139        Self {
140            head,
141            tail,
142            ring_mask,
143            ring_entries,
144            flags,
145            dropped,
146            sqes,
147        }
148    }
149
150    #[inline]
151    pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
152        SubmissionQueue {
153            head: (*self.head).load(atomic::Ordering::Acquire),
154            tail: unsync_load(self.tail),
155            queue: self,
156        }
157    }
158
159    #[inline]
160    pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
161        unsafe { self.borrow_shared() }
162    }
163}
164
165impl<E: EntryMarker> SubmissionQueue<'_, E> {
166    /// Synchronize this type with the real submission queue.
167    ///
168    /// This will flush any entries added by [`push`](Self::push) or
169    /// [`push_multiple`](Self::push_multiple) and will update the queue's length if the kernel has
170    /// consumed some entries in the meantime.
171    #[inline]
172    pub fn sync(&mut self) {
173        unsafe {
174            (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
175            self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
176        }
177    }
178
179    /// When [`is_setup_sqpoll`](crate::Parameters::is_setup_sqpoll) is set, whether the kernel
180    /// threads has gone to sleep and requires a system call to wake it up.
181    #[inline]
182    pub fn need_wakeup(&self) -> bool {
183        unsafe {
184            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0
185        }
186    }
187
188    /// The number of invalid submission queue entries that have been encountered in the ring
189    /// buffer.
190    pub fn dropped(&self) -> u32 {
191        unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
192    }
193
194    /// Returns `true` if the completion queue ring is overflown.
195    pub fn cq_overflow(&self) -> bool {
196        unsafe {
197            (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
198        }
199    }
200
201    /// Returns `true` if completions are pending that should be processed. Only relevant when used
202    /// in conjuction with the `setup_taskrun_flag` function. Available since 5.19.
203    pub fn taskrun(&self) -> bool {
204        unsafe { (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_TASKRUN != 0 }
205    }
206
207    /// Get the total number of entries in the submission queue ring buffer.
208    #[inline]
209    pub fn capacity(&self) -> usize {
210        self.queue.ring_entries as usize
211    }
212
213    /// Get the number of submission queue events in the ring buffer.
214    #[inline]
215    pub fn len(&self) -> usize {
216        self.tail.wrapping_sub(self.head) as usize
217    }
218
219    /// Returns `true` if the submission queue ring buffer is empty.
220    #[inline]
221    pub fn is_empty(&self) -> bool {
222        self.len() == 0
223    }
224
225    /// Returns `true` if the submission queue ring buffer has reached capacity, and no more events
226    /// can be added before the kernel consumes some.
227    #[inline]
228    pub fn is_full(&self) -> bool {
229        self.len() == self.capacity()
230    }
231
232    /// Attempts to push an entry into the queue.
233    /// If the queue is full, an error is returned.
234    ///
235    /// # Safety
236    ///
237    /// Developers must ensure that parameters of the entry (such as buffer) are valid and will
238    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
239    #[inline]
240    pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
241        if !self.is_full() {
242            self.push_unchecked(entry);
243            Ok(())
244        } else {
245            Err(PushError)
246        }
247    }
248
249    /// Attempts to push several entries into the queue.
250    /// If the queue does not have space for all of the entries, an error is returned.
251    ///
252    /// # Safety
253    ///
254    /// Developers must ensure that parameters of all the entries (such as buffer) are valid and
255    /// will be valid for the entire duration of the operation, otherwise it may cause memory
256    /// problems.
257    #[inline]
258    pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
259        if self.capacity() - self.len() < entries.len() {
260            return Err(PushError);
261        }
262
263        for entry in entries {
264            self.push_unchecked(entry);
265        }
266
267        Ok(())
268    }
269
270    #[inline]
271    unsafe fn push_unchecked(&mut self, entry: &E) {
272        *self
273            .queue
274            .sqes
275            .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
276        self.tail = self.tail.wrapping_add(1);
277    }
278}
279
280impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
281    #[inline]
282    fn drop(&mut self) {
283        unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
284    }
285}
286
287impl Entry {
288    /// Set the submission event's [flags](Flags).
289    #[inline]
290    pub fn flags(mut self, flags: Flags) -> Entry {
291        self.0.flags |= flags.bits();
292        self
293    }
294
295    /// Set the user data. This is an application-supplied value that will be passed straight
296    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
297    #[inline]
298    pub fn user_data(mut self, user_data: u64) -> Entry {
299        self.0.user_data = user_data;
300        self
301    }
302
303    /// Get the previously application-supplied user data.
304    #[inline]
305    pub fn get_user_data(&self) -> u64 {
306        self.0.user_data
307    }
308
309    /// Set the personality of this event. You can obtain a personality using
310    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
311    pub fn personality(mut self, personality: u16) -> Entry {
312        self.0.personality = personality;
313        self
314    }
315}
316
317impl private::Sealed for Entry {}
318
319impl EntryMarker for Entry {
320    const BUILD_FLAGS: u32 = 0;
321}
322
323impl Clone for Entry {
324    #[inline(always)]
325    fn clone(&self) -> Entry {
326        // io_uring_sqe doesn't implement Clone due to the 'cmd' incomplete array field.
327        Entry(unsafe { mem::transmute_copy(&self.0) })
328    }
329}
330
331impl Debug for Entry {
332    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
333        f.debug_struct("Entry")
334            .field("op_code", &self.0.opcode)
335            .field("flags", &self.0.flags)
336            .field("user_data", &self.0.user_data)
337            .finish()
338    }
339}
340
341impl Entry128 {
342    /// Set the submission event's [flags](Flags).
343    #[inline]
344    pub fn flags(mut self, flags: Flags) -> Entry128 {
345        self.0 .0.flags |= flags.bits();
346        self
347    }
348
349    /// Set the user data. This is an application-supplied value that will be passed straight
350    /// through into the [completion queue entry](crate::cqueue::Entry::user_data).
351    #[inline]
352    pub fn user_data(mut self, user_data: u64) -> Entry128 {
353        self.0 .0.user_data = user_data;
354        self
355    }
356
357    /// Set the personality of this event. You can obtain a personality using
358    /// [`Submitter::register_personality`](crate::Submitter::register_personality).
359    #[inline]
360    pub fn personality(mut self, personality: u16) -> Entry128 {
361        self.0 .0.personality = personality;
362        self
363    }
364}
365
366impl private::Sealed for Entry128 {}
367
368impl EntryMarker for Entry128 {
369    const BUILD_FLAGS: u32 = sys::IORING_SETUP_SQE128;
370}
371
372impl From<Entry> for Entry128 {
373    fn from(entry: Entry) -> Entry128 {
374        Entry128(entry, [0u8; 64])
375    }
376}
377
378impl Debug for Entry128 {
379    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
380        f.debug_struct("Entry128")
381            .field("op_code", &self.0 .0.opcode)
382            .field("flags", &self.0 .0.flags)
383            .field("user_data", &self.0 .0.user_data)
384            .finish()
385    }
386}
387
388/// An error pushing to the submission queue due to it being full.
389#[derive(Debug, Clone, PartialEq, Eq)]
390#[non_exhaustive]
391pub struct PushError;
392
393impl Display for PushError {
394    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
395        f.write_str("submission queue is full")
396    }
397}
398
399impl Error for PushError {}
400
401impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
402    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
403        let mut d = f.debug_list();
404        let mut pos = self.head;
405        while pos != self.tail {
406            let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
407            d.entry(&entry);
408            pos = pos.wrapping_add(1);
409        }
410        d.finish()
411    }
412}