monoio/io/
async_write_rent.rs

1use std::{
2    future::Future,
3    io::{Cursor, Write},
4};
5
6use crate::{
7    buf::{IoBuf, IoVecBuf},
8    BufResult,
9};
10
11/// The `AsyncWriteRent` trait provides asynchronous writing capabilities for structs
12/// that implement it.
13///
14/// It abstracts over the concept of writing bytes asynchronously
15/// to an underlying I/O object, which could be a file, socket, or any other
16/// byte-oriented stream. The trait also encompasses the ability to flush buffered
17/// data and to shut down the output stream cleanly.
18///
19/// Types implementing this trait are required to manage asynchronous I/O operations,
20/// allowing for non-blocking writes. This is particularly useful in scenarios where
21/// the object might need to interact with other asynchronous tasks without blocking
22/// the executor.
23pub trait AsyncWriteRent {
24    /// Writes the contents of a buffer into this writer, returning the number of bytes written.
25    ///
26    /// This function attempts to write the entire buffer `buf`, but the write may not fully
27    /// succeed, and it might also result in an error. A call to `write` represents *at most one*
28    /// attempt to write to the underlying object.
29    ///
30    /// # Return
31    ///
32    /// When this method returns `(Ok(n), buf)`, it guarantees that `n <= buf.len()`. A return value
33    /// of `0` typically indicates that the underlying object can no longer accept bytes and likely
34    /// won't be able to in the future, or that the provided buffer is empty.
35    ///
36    /// # Errors
37    ///
38    /// Each `write` call may result in an I/O error, indicating the operation couldn't be
39    /// completed. If an error occurs, no bytes from the buffer were written to the writer.
40    ///
41    /// It is **not** an error if the entire buffer could not be written to this writer.
42    fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>>;
43
44    /// This function attempts to write the entire contents of `buf_vec`, but the write may not
45    /// fully succeed, and it might also result in an error. The bytes will be written starting at
46    /// the specified offset.
47    ///
48    /// # Return
49    ///
50    /// The method returns the result of the operation along with the same array of buffers passed
51    /// as an argument. A return value of `0` typically indicates that the underlying file can no
52    /// longer accept bytes and likely won't be able to in the future, or that the provided buffer
53    /// is empty.
54    ///
55    /// # Errors
56    ///
57    /// Each `write` call may result in an I/O error, indicating the operation couldn't be
58    /// completed. If an error occurs, no bytes from the buffer were written to the writer.
59    ///
60    /// It is **not** considered an error if the entire buffer could not be written to this writer.
61    fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>>;
62
63    /// Flushes this output stream, ensuring that all buffered content is successfully written to
64    /// its destination.
65    ///
66    /// # Errors
67    ///
68    /// An error occurs if not all bytes can be written due to I/O issues or if the end of the file
69    /// (EOF) is reached.
70    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>>;
71
72    /// Shuts down the output stream, ensuring that the value can be cleanly dropped.
73    ///
74    /// Similar to [`flush`], all buffered data is written to the underlying stream. After this
75    /// operation completes, the caller should no longer attempt to write to the stream.
76    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>>;
77}
78
79/// AsyncWriteRentAt: async write with a ownership of a buffer and a position
80pub trait AsyncWriteRentAt {
81    /// Write buf at given offset
82    fn write_at<T: IoBuf>(
83        &mut self,
84        buf: T,
85        pos: usize,
86    ) -> impl Future<Output = BufResult<usize, T>>;
87}
88
89impl<A: ?Sized + AsyncWriteRentAt> AsyncWriteRentAt for &mut A {
90    #[inline]
91    fn write_at<T: IoBuf>(
92        &mut self,
93        buf: T,
94        pos: usize,
95    ) -> impl Future<Output = BufResult<usize, T>> {
96        (**self).write_at(buf, pos)
97    }
98}
99
100impl<A: ?Sized + AsyncWriteRent> AsyncWriteRent for &mut A {
101    #[inline]
102    fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
103        (**self).write(buf)
104    }
105
106    #[inline]
107    fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
108        (**self).writev(buf_vec)
109    }
110
111    #[inline]
112    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
113        (**self).flush()
114    }
115
116    #[inline]
117    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
118        (**self).shutdown()
119    }
120}
121
122// Helper function for cursor writev logic
123#[inline]
124fn write_vectored_logic<C: std::io::Write + ?Sized, T: IoVecBuf>(
125    writer: &mut C,
126    buf_vec: T,
127) -> BufResult<usize, T> {
128    let bufs: &[std::io::IoSlice<'_>];
129    #[cfg(unix)]
130    {
131        // SAFETY: IoSlice<'_> is repr(transparent) over libc::iovec
132        bufs = unsafe {
133            std::slice::from_raw_parts(
134                buf_vec.read_iovec_ptr() as *const std::io::IoSlice<'_>,
135                buf_vec.read_iovec_len(),
136            )
137        };
138    }
139    #[cfg(windows)]
140    {
141        // SAFETY: IoSlice<'_> is repr(transparent) over WSABUF
142        bufs = unsafe {
143            std::slice::from_raw_parts(
144                buf_vec.read_wsabuf_ptr() as *const std::io::IoSlice<'_>,
145                buf_vec.read_wsabuf_len(),
146            )
147        };
148    }
149    let res = std::io::Write::write_vectored(writer, bufs);
150    match res {
151        Ok(n) => (Ok(n), buf_vec),
152        Err(e) => (Err(e), buf_vec),
153    }
154}
155
156// Helper function to extend a Vec<u8> from platform-specific buffer slices
157#[inline]
158fn extend_vec_from_platform_bufs<P>(
159    vec: &mut Vec<u8>,
160    platform_bufs: &[P],
161    get_ptr: fn(&P) -> *const u8,
162    get_len: fn(&P) -> usize,
163) -> usize {
164    let mut total_bytes_to_write = 0;
165    for buf_part in platform_bufs.iter() {
166        total_bytes_to_write += get_len(buf_part);
167    }
168
169    if total_bytes_to_write == 0 {
170        return 0;
171    }
172    vec.reserve(total_bytes_to_write);
173
174    for buf_part in platform_bufs.iter() {
175        let buffer_ptr = get_ptr(buf_part);
176        let buffer_len = get_len(buf_part);
177        if buffer_len > 0 {
178            let buffer_data_slice = unsafe { std::slice::from_raw_parts(buffer_ptr, buffer_len) };
179            vec.extend_from_slice(buffer_data_slice);
180        }
181    }
182    total_bytes_to_write
183}
184
185impl AsyncWriteRent for Vec<u8> {
186    fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
187        let slice = buf.as_slice();
188        self.extend_from_slice(slice);
189        let len = slice.len();
190        std::future::ready((Ok(len), buf))
191    }
192
193    #[inline]
194    fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
195        let total_bytes_to_write: usize;
196        #[cfg(unix)]
197        {
198            // SAFETY: IoVecBuf guarantees valid iovec array
199            let iovec_array_ptr = buf_vec.read_iovec_ptr();
200            let iovec_count = buf_vec.read_iovec_len();
201            let iovec_slice = unsafe { std::slice::from_raw_parts(iovec_array_ptr, iovec_count) };
202            total_bytes_to_write = extend_vec_from_platform_bufs(
203                self,
204                iovec_slice,
205                |iovec: &libc::iovec| iovec.iov_base as *const u8,
206                |iovec: &libc::iovec| iovec.iov_len,
207            );
208        }
209        #[cfg(windows)]
210        {
211            // SAFETY: IoVecBuf guarantees valid WSABUF array
212            let wsabuf_array_ptr = buf_vec.read_wsabuf_ptr();
213            let wsabuf_count = buf_vec.read_wsabuf_len();
214            let wsabuf_slice =
215                unsafe { std::slice::from_raw_parts(wsabuf_array_ptr, wsabuf_count) };
216            total_bytes_to_write = extend_vec_from_platform_bufs(
217                self,
218                wsabuf_slice,
219                |wsabuf: &windows_sys::Win32::Networking::WinSock::WSABUF| wsabuf.buf as *const u8,
220                |wsabuf: &windows_sys::Win32::Networking::WinSock::WSABUF| wsabuf.len as usize,
221            );
222        }
223        std::future::ready((Ok(total_bytes_to_write), buf_vec))
224    }
225
226    #[inline]
227    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
228        std::future::ready(Ok(()))
229    }
230
231    #[inline]
232    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
233        std::future::ready(Ok(()))
234    }
235}
236
237impl<W> AsyncWriteRent for Cursor<W>
238where
239    Cursor<W>: Write + Unpin,
240{
241    #[inline]
242    fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
243        let slice = buf.as_slice();
244        std::future::ready((Write::write(self, slice), buf))
245    }
246
247    #[inline]
248    fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
249        std::future::ready(write_vectored_logic(self, buf_vec))
250    }
251
252    #[inline]
253    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
254        std::future::ready(Write::flush(self))
255    }
256
257    #[inline]
258    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
259        // Cursor is in-memory, flush is a no-op, so shutdown is also a no-op.
260        std::future::ready(Ok(()))
261    }
262}
263
264impl<T: ?Sized + AsyncWriteRent + Unpin> AsyncWriteRent for Box<T> {
265    #[inline]
266    fn write<B: IoBuf>(&mut self, buf: B) -> impl Future<Output = BufResult<usize, B>> {
267        (**self).write(buf)
268    }
269
270    #[inline]
271    fn writev<B: IoVecBuf>(&mut self, buf_vec: B) -> impl Future<Output = BufResult<usize, B>> {
272        (**self).writev(buf_vec)
273    }
274
275    #[inline]
276    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
277        (**self).flush()
278    }
279
280    #[inline]
281    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
282        (**self).shutdown()
283    }
284}