1use std::error::Error;
4use std::fmt::{self, Debug, Display, Formatter};
5use std::mem;
6use std::sync::atomic;
7
8use crate::sys;
9use crate::util::{private, unsync_load, Mmap};
10
11use bitflags::bitflags;
12
13pub(crate) struct Inner<E: EntryMarker> {
14 pub(crate) head: *const atomic::AtomicU32,
15 pub(crate) tail: *const atomic::AtomicU32,
16 pub(crate) ring_mask: u32,
17 pub(crate) ring_entries: u32,
18 pub(crate) flags: *const atomic::AtomicU32,
19 dropped: *const atomic::AtomicU32,
20
21 pub(crate) sqes: *mut E,
22}
23
24pub struct SubmissionQueue<'a, E: EntryMarker = Entry> {
26 head: u32,
27 tail: u32,
28 queue: &'a Inner<E>,
29}
30
31pub trait EntryMarker: Clone + Debug + From<Entry> + private::Sealed {
35 const BUILD_FLAGS: u32;
36}
37
38#[repr(C)]
42pub struct Entry(pub(crate) sys::io_uring_sqe);
43
44#[repr(C)]
48#[derive(Clone)]
49pub struct Entry128(pub(crate) Entry, pub(crate) [u8; 64]);
50
51#[test]
52fn test_entry_sizes() {
53 assert_eq!(mem::size_of::<Entry>(), 64);
54 assert_eq!(mem::size_of::<Entry128>(), 128);
55}
56
57bitflags! {
58 pub struct Flags: u8 {
60 #[doc(hidden)]
63 const FIXED_FILE = 1 << sys::IOSQE_FIXED_FILE_BIT;
64
65 const IO_DRAIN = 1 << sys::IOSQE_IO_DRAIN_BIT;
69
70 const IO_LINK = 1 << sys::IOSQE_IO_LINK_BIT;
74
75 const IO_HARDLINK = 1 << sys::IOSQE_IO_HARDLINK_BIT;
78
79 const ASYNC = 1 << sys::IOSQE_ASYNC_BIT;
86
87 const BUFFER_SELECT = 1 << sys::IOSQE_BUFFER_SELECT_BIT;
111
112 const SKIP_SUCCESS = 1 << sys::IOSQE_CQE_SKIP_SUCCESS_BIT;
114 }
115}
116
117impl<E: EntryMarker> Inner<E> {
118 #[rustfmt::skip]
119 pub(crate) unsafe fn new(
120 sq_mmap: &Mmap,
121 sqe_mmap: &Mmap,
122 p: &sys::io_uring_params,
123 ) -> Self {
124 let head = sq_mmap.offset(p.sq_off.head ) as *const atomic::AtomicU32;
125 let tail = sq_mmap.offset(p.sq_off.tail ) as *const atomic::AtomicU32;
126 let ring_mask = sq_mmap.offset(p.sq_off.ring_mask ).cast::<u32>().read();
127 let ring_entries = sq_mmap.offset(p.sq_off.ring_entries).cast::<u32>().read();
128 let flags = sq_mmap.offset(p.sq_off.flags ) as *const atomic::AtomicU32;
129 let dropped = sq_mmap.offset(p.sq_off.dropped ) as *const atomic::AtomicU32;
130 let array = sq_mmap.offset(p.sq_off.array ) as *mut u32;
131
132 let sqes = sqe_mmap.as_mut_ptr() as *mut E;
133
134 for i in 0..ring_entries {
136 array.add(i as usize).write_volatile(i);
137 }
138
139 Self {
140 head,
141 tail,
142 ring_mask,
143 ring_entries,
144 flags,
145 dropped,
146 sqes,
147 }
148 }
149
150 #[inline]
151 pub(crate) unsafe fn borrow_shared(&self) -> SubmissionQueue<'_, E> {
152 SubmissionQueue {
153 head: (*self.head).load(atomic::Ordering::Acquire),
154 tail: unsync_load(self.tail),
155 queue: self,
156 }
157 }
158
159 #[inline]
160 pub(crate) fn borrow(&mut self) -> SubmissionQueue<'_, E> {
161 unsafe { self.borrow_shared() }
162 }
163}
164
165impl<E: EntryMarker> SubmissionQueue<'_, E> {
166 #[inline]
172 pub fn sync(&mut self) {
173 unsafe {
174 (*self.queue.tail).store(self.tail, atomic::Ordering::Release);
175 self.head = (*self.queue.head).load(atomic::Ordering::Acquire);
176 }
177 }
178
179 #[inline]
182 pub fn need_wakeup(&self) -> bool {
183 unsafe {
184 (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_NEED_WAKEUP != 0
185 }
186 }
187
188 pub fn dropped(&self) -> u32 {
191 unsafe { (*self.queue.dropped).load(atomic::Ordering::Acquire) }
192 }
193
194 pub fn cq_overflow(&self) -> bool {
196 unsafe {
197 (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_CQ_OVERFLOW != 0
198 }
199 }
200
201 pub fn taskrun(&self) -> bool {
204 unsafe { (*self.queue.flags).load(atomic::Ordering::Acquire) & sys::IORING_SQ_TASKRUN != 0 }
205 }
206
207 #[inline]
209 pub fn capacity(&self) -> usize {
210 self.queue.ring_entries as usize
211 }
212
213 #[inline]
215 pub fn len(&self) -> usize {
216 self.tail.wrapping_sub(self.head) as usize
217 }
218
219 #[inline]
221 pub fn is_empty(&self) -> bool {
222 self.len() == 0
223 }
224
225 #[inline]
228 pub fn is_full(&self) -> bool {
229 self.len() == self.capacity()
230 }
231
232 #[inline]
240 pub unsafe fn push(&mut self, entry: &E) -> Result<(), PushError> {
241 if !self.is_full() {
242 self.push_unchecked(entry);
243 Ok(())
244 } else {
245 Err(PushError)
246 }
247 }
248
249 #[inline]
258 pub unsafe fn push_multiple(&mut self, entries: &[E]) -> Result<(), PushError> {
259 if self.capacity() - self.len() < entries.len() {
260 return Err(PushError);
261 }
262
263 for entry in entries {
264 self.push_unchecked(entry);
265 }
266
267 Ok(())
268 }
269
270 #[inline]
271 unsafe fn push_unchecked(&mut self, entry: &E) {
272 *self
273 .queue
274 .sqes
275 .add((self.tail & self.queue.ring_mask) as usize) = entry.clone();
276 self.tail = self.tail.wrapping_add(1);
277 }
278}
279
280impl<E: EntryMarker> Drop for SubmissionQueue<'_, E> {
281 #[inline]
282 fn drop(&mut self) {
283 unsafe { &*self.queue.tail }.store(self.tail, atomic::Ordering::Release);
284 }
285}
286
287impl Entry {
288 #[inline]
290 pub fn flags(mut self, flags: Flags) -> Entry {
291 self.0.flags |= flags.bits();
292 self
293 }
294
295 #[inline]
298 pub fn user_data(mut self, user_data: u64) -> Entry {
299 self.0.user_data = user_data;
300 self
301 }
302
303 #[inline]
305 pub fn get_user_data(&self) -> u64 {
306 self.0.user_data
307 }
308
309 pub fn personality(mut self, personality: u16) -> Entry {
312 self.0.personality = personality;
313 self
314 }
315}
316
317impl private::Sealed for Entry {}
318
319impl EntryMarker for Entry {
320 const BUILD_FLAGS: u32 = 0;
321}
322
323impl Clone for Entry {
324 #[inline(always)]
325 fn clone(&self) -> Entry {
326 Entry(unsafe { mem::transmute_copy(&self.0) })
328 }
329}
330
331impl Debug for Entry {
332 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
333 f.debug_struct("Entry")
334 .field("op_code", &self.0.opcode)
335 .field("flags", &self.0.flags)
336 .field("user_data", &self.0.user_data)
337 .finish()
338 }
339}
340
341impl Entry128 {
342 #[inline]
344 pub fn flags(mut self, flags: Flags) -> Entry128 {
345 self.0 .0.flags |= flags.bits();
346 self
347 }
348
349 #[inline]
352 pub fn user_data(mut self, user_data: u64) -> Entry128 {
353 self.0 .0.user_data = user_data;
354 self
355 }
356
357 #[inline]
360 pub fn personality(mut self, personality: u16) -> Entry128 {
361 self.0 .0.personality = personality;
362 self
363 }
364}
365
366impl private::Sealed for Entry128 {}
367
368impl EntryMarker for Entry128 {
369 const BUILD_FLAGS: u32 = sys::IORING_SETUP_SQE128;
370}
371
372impl From<Entry> for Entry128 {
373 fn from(entry: Entry) -> Entry128 {
374 Entry128(entry, [0u8; 64])
375 }
376}
377
378impl Debug for Entry128 {
379 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
380 f.debug_struct("Entry128")
381 .field("op_code", &self.0 .0.opcode)
382 .field("flags", &self.0 .0.flags)
383 .field("user_data", &self.0 .0.user_data)
384 .finish()
385 }
386}
387
388#[derive(Debug, Clone, PartialEq, Eq)]
390#[non_exhaustive]
391pub struct PushError;
392
393impl Display for PushError {
394 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
395 f.write_str("submission queue is full")
396 }
397}
398
399impl Error for PushError {}
400
401impl<E: EntryMarker> Debug for SubmissionQueue<'_, E> {
402 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
403 let mut d = f.debug_list();
404 let mut pos = self.head;
405 while pos != self.tail {
406 let entry: &E = unsafe { &*self.queue.sqes.add((pos & self.queue.ring_mask) as usize) };
407 d.entry(&entry);
408 pos = pos.wrapping_add(1);
409 }
410 d.finish()
411 }
412}