monoio/io/
async_read_rent.rs

1use std::{future::Future, io::Cursor};
2
3use crate::{
4    buf::{IoBufMut, IoVecBufMut},
5    BufResult,
6};
7
8/// The `AsyncReadRent` trait defines asynchronous reading operations for objects that
9/// implement it.
10///
11/// It provides a way to read bytes from a source into a buffer asynchronously,
12/// which could be a file, socket, or any other byte-oriented stream.
13///
14/// Types that implement this trait are expected to manage asynchronous read operations,
15/// allowing them to interact with other asynchronous tasks without blocking the executor.
16pub trait AsyncReadRent {
17    /// Reads bytes from this source into the provided buffer, returning the number of bytes read.
18    ///
19    /// # Return
20    ///
21    /// When this method returns `(Ok(n), buf)`, it guarantees that `0 <= n <= buf.len()`. A
22    /// non-zero `n` means the buffer `buf` has been filled with `n` bytes of data from this source.
23    /// If `n` is `0`, it can indicate one of two possibilities:
24    ///
25    /// 1. The reader has likely reached the end of the file and may not produce more bytes, though
26    ///    it is not certain that no more bytes will ever be produced.
27    /// 2. The provided buffer was 0 bytes in length.
28    ///
29    /// # Errors
30    ///
31    /// If an I/O or other error occurs, an error variant will be returned, ensuring that no bytes
32    /// were read.
33    fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>>;
34    /// Similar to `read`, but reads data into a slice of buffers.
35    ///
36    /// Data is copied sequentially into each buffer, with the last buffer potentially being only
37    /// partially filled. This method should behave equivalently to a single call to `read` with the
38    /// buffers concatenated.
39    fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>>;
40}
41
42/// AsyncReadRentAt: async read with a ownership of a buffer and a position
43pub trait AsyncReadRentAt {
44    /// Same as pread(2)
45    fn read_at<T: IoBufMut>(
46        &mut self,
47        buf: T,
48        pos: usize,
49    ) -> impl Future<Output = BufResult<usize, T>>;
50}
51
52impl<A: ?Sized + AsyncReadRentAt> AsyncReadRentAt for &mut A {
53    #[inline]
54    fn read_at<T: IoBufMut>(
55        &mut self,
56        buf: T,
57        pos: usize,
58    ) -> impl Future<Output = BufResult<usize, T>> {
59        (**self).read_at(buf, pos)
60    }
61}
62
63impl<A: ?Sized + AsyncReadRent> AsyncReadRent for &mut A {
64    #[inline]
65    fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
66        (**self).read(buf)
67    }
68
69    #[inline]
70    fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
71        (**self).readv(buf)
72    }
73}
74
75impl AsyncReadRent for &[u8] {
76    fn read<T: IoBufMut>(&mut self, mut buf: T) -> impl Future<Output = BufResult<usize, T>> {
77        let buf_capacity = buf.bytes_total();
78        let available = self.len();
79        let to_read = std::cmp::min(available, buf_capacity);
80        let (prefix, remainder) = self.split_at(to_read);
81        unsafe {
82            let dst = buf.write_ptr();
83            dst.copy_from_nonoverlapping(prefix.as_ptr(), to_read);
84            buf.set_init(to_read);
85        }
86        *self = remainder;
87        std::future::ready((Ok(to_read), buf))
88    }
89
90    fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> impl Future<Output = BufResult<usize, T>> {
91        let mut sum = 0;
92        {
93            #[cfg(windows)]
94            let buf_slice = unsafe {
95                std::slice::from_raw_parts_mut(buf.write_wsabuf_ptr(), buf.write_wsabuf_len())
96            };
97            #[cfg(unix)]
98            let buf_slice = unsafe {
99                std::slice::from_raw_parts_mut(buf.write_iovec_ptr(), buf.write_iovec_len())
100            };
101            for buf in buf_slice {
102                #[cfg(windows)]
103                let amt = std::cmp::min(self.len(), buf.len as usize);
104                #[cfg(unix)]
105                let amt = std::cmp::min(self.len(), buf.iov_len);
106
107                let (prefix, remainder) = self.split_at(amt);
108                // # Safety
109                // The pointer is valid.
110                unsafe {
111                    #[cfg(windows)]
112                    buf.buf
113                        .cast::<u8>()
114                        .copy_from_nonoverlapping(prefix.as_ptr(), amt);
115                    #[cfg(unix)]
116                    buf.iov_base
117                        .cast::<u8>()
118                        .copy_from_nonoverlapping(prefix.as_ptr(), amt);
119                }
120                *self = remainder;
121                sum += amt;
122
123                if self.is_empty() {
124                    break;
125                }
126            }
127        }
128
129        unsafe { buf.set_init(sum) };
130        std::future::ready((Ok(sum), buf))
131    }
132}
133
134impl AsyncReadRent for &mut [u8] {
135    fn read<T: IoBufMut>(&mut self, mut buf: T) -> impl Future<Output = BufResult<usize, T>> {
136        // Determine how many bytes to read
137        let buf_capacity = buf.bytes_total();
138        let available = self.len();
139        let to_read = std::cmp::min(available, buf_capacity);
140        // Pointers to the source and remaining data
141        let src_ptr = self.as_mut_ptr();
142        let next_ptr = unsafe { src_ptr.add(to_read) };
143        let remaining_len = available - to_read;
144        unsafe {
145            let dst = buf.write_ptr();
146            dst.copy_from_nonoverlapping(src_ptr, to_read);
147            buf.set_init(to_read);
148            // Update self to the remaining slice
149            *self = std::slice::from_raw_parts_mut(next_ptr, remaining_len);
150        }
151        std::future::ready((Ok(to_read), buf))
152    }
153
154    fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> impl Future<Output = BufResult<usize, T>> {
155        let mut sum = 0;
156        {
157            #[cfg(windows)]
158            let buf_slice = unsafe {
159                std::slice::from_raw_parts_mut(buf.write_wsabuf_ptr(), buf.write_wsabuf_len())
160            };
161            #[cfg(unix)]
162            let buf_slice = unsafe {
163                std::slice::from_raw_parts_mut(buf.write_iovec_ptr(), buf.write_iovec_len())
164            };
165            for buf in buf_slice {
166                #[cfg(windows)]
167                let amt = std::cmp::min(self.len(), buf.len as usize);
168                #[cfg(unix)]
169                let amt = std::cmp::min(self.len(), buf.iov_len);
170
171                // Compute source and remaining pointers for this chunk
172                let src_ptr = self.as_mut_ptr();
173                let next_ptr = unsafe { src_ptr.add(amt) };
174                let remaining_len = self.len() - amt;
175                unsafe {
176                    #[cfg(windows)]
177                    buf.buf.cast::<u8>().copy_from_nonoverlapping(src_ptr, amt);
178                    #[cfg(unix)]
179                    buf.iov_base
180                        .cast::<u8>()
181                        .copy_from_nonoverlapping(src_ptr, amt);
182                    // Update self to the remaining slice
183                    *self = std::slice::from_raw_parts_mut(next_ptr, remaining_len);
184                }
185                sum += amt;
186
187                if self.is_empty() {
188                    break;
189                }
190            }
191        }
192
193        unsafe { buf.set_init(sum) };
194        std::future::ready((Ok(sum), buf))
195    }
196}
197
198impl<T: AsRef<[u8]>> AsyncReadRent for Cursor<T> {
199    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
200        let pos = self.position();
201        let slice: &[u8] = (*self).get_ref().as_ref();
202
203        if pos > slice.len() as u64 {
204            return (Ok(0), buf);
205        }
206
207        (&slice[pos as usize..]).read(buf).await
208    }
209
210    async fn readv<B: IoVecBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
211        let pos = self.position();
212        let slice: &[u8] = (*self).get_ref().as_ref();
213
214        if pos > slice.len() as u64 {
215            return (Ok(0), buf);
216        }
217
218        (&slice[pos as usize..]).readv(buf).await
219    }
220}
221
222impl<T: ?Sized + AsyncReadRent> AsyncReadRent for Box<T> {
223    #[inline]
224    fn read<B: IoBufMut>(&mut self, buf: B) -> impl Future<Output = BufResult<usize, B>> {
225        (**self).read(buf)
226    }
227
228    #[inline]
229    fn readv<B: IoVecBufMut>(&mut self, buf: B) -> impl Future<Output = BufResult<usize, B>> {
230        (**self).readv(buf)
231    }
232}