monoio/io/
async_rent_cancelable_ext.rs

1use std::future::Future;
2
3use super::{CancelHandle, CancelableAsyncReadRent, CancelableAsyncWriteRent};
4use crate::{
5    buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, Slice, SliceMut},
6    BufResult,
7};
8
9macro_rules! reader_trait {
10    ($future: ident, $n_ty: ty, $f: ident) => {
11        /// Read number in async way
12        fn $f(&mut self, c: CancelHandle) -> impl Future<Output = std::io::Result<$n_ty>>;
13    };
14}
15
16macro_rules! reader_be_impl {
17    ($future: ident, $n_ty: ty, $f: ident) => {
18        async fn $f(&mut self, c: CancelHandle) -> std::io::Result<$n_ty> {
19            let (res, buf) = self
20                .cancelable_read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), c)
21                .await;
22            res?;
23            use crate::utils::box_into_inner::IntoInner;
24            Ok(<$n_ty>::from_be_bytes(Box::consume(buf)))
25        }
26    };
27}
28
29macro_rules! reader_le_impl {
30    ($future: ident, $n_ty: ty, $f: ident) => {
31        async fn $f(&mut self, c: CancelHandle) -> std::io::Result<$n_ty> {
32            let (res, buf) = self
33                .cancelable_read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), c)
34                .await;
35            res?;
36            use crate::utils::box_into_inner::IntoInner;
37            Ok(<$n_ty>::from_le_bytes(Box::consume(buf)))
38        }
39    };
40}
41
42/// CancelableAsyncReadRentExt
43pub trait CancelableAsyncReadRentExt {
44    /// Read until buf capacity is fulfilled
45    fn cancelable_read_exact<T: IoBufMut + 'static>(
46        &mut self,
47        buf: T,
48        c: CancelHandle,
49    ) -> impl Future<Output = BufResult<usize, T>>;
50
51    /// Readv until buf capacity is fulfilled
52    fn cancelable_read_vectored_exact<T: IoVecBufMut + 'static>(
53        &mut self,
54        buf: T,
55        c: CancelHandle,
56    ) -> impl Future<Output = BufResult<usize, T>>;
57
58    reader_trait!(ReadU8Future, u8, cancelable_read_u8);
59    reader_trait!(ReadU16Future, u16, cancelable_read_u16);
60    reader_trait!(ReadU32Future, u32, cancelable_read_u32);
61    reader_trait!(ReadU64Future, u64, cancelable_read_u64);
62    reader_trait!(ReadU128Future, u128, cancelable_read_u128);
63    reader_trait!(ReadI8Future, i8, cancelable_read_i8);
64    reader_trait!(ReadI16Future, i16, cancelable_read_i16);
65    reader_trait!(ReadI32Future, i32, cancelable_read_i32);
66    reader_trait!(ReadI64Future, i64, cancelable_read_i64);
67    reader_trait!(ReadI128Future, i128, cancelable_read_i128);
68    reader_trait!(ReadF32Future, f32, cancelable_read_f32);
69    reader_trait!(ReadF64Future, f64, cancelable_read_f64);
70
71    reader_trait!(ReadU8LEFuture, u8, cancelable_read_u8_le);
72    reader_trait!(ReadU16LEFuture, u16, cancelable_read_u16_le);
73    reader_trait!(ReadU32LEFuture, u32, cancelable_read_u32_le);
74    reader_trait!(ReadU64LEFuture, u64, cancelable_read_u64_le);
75    reader_trait!(ReadU128LEFuture, u128, cancelable_read_u128_le);
76    reader_trait!(ReadI8LEFuture, i8, cancelable_read_i8_le);
77    reader_trait!(ReadI16LEFuture, i16, cancelable_read_i16_le);
78    reader_trait!(ReadI32LEFuture, i32, cancelable_read_i32_le);
79    reader_trait!(ReadI64LEFuture, i64, cancelable_read_i64_le);
80    reader_trait!(ReadI128LEFuture, i128, cancelable_read_i128_le);
81    reader_trait!(ReadF32LEFuture, f32, cancelable_read_f32_le);
82    reader_trait!(ReadF64LEFuture, f64, cancelable_read_f64_le);
83}
84
85impl<A> CancelableAsyncReadRentExt for A
86where
87    A: CancelableAsyncReadRent + ?Sized,
88{
89    async fn cancelable_read_exact<T: IoBufMut + 'static>(
90        &mut self,
91        mut buf: T,
92        c: CancelHandle,
93    ) -> BufResult<usize, T> {
94        let len = buf.bytes_total();
95        let mut read = 0;
96        while read < len {
97            let buf_slice = unsafe { SliceMut::new_unchecked(buf, read, len) };
98            let (result, buf_slice) = self.cancelable_read(buf_slice, c.clone()).await;
99            buf = buf_slice.into_inner();
100            match result {
101                Ok(0) => {
102                    return (
103                        Err(std::io::Error::new(
104                            std::io::ErrorKind::UnexpectedEof,
105                            "failed to fill whole buffer",
106                        )),
107                        buf,
108                    )
109                }
110                Ok(n) => {
111                    read += n;
112                    unsafe { buf.set_init(read) };
113                }
114                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
115                Err(e) => return (Err(e), buf),
116            }
117        }
118        (Ok(read), buf)
119    }
120
121    async fn cancelable_read_vectored_exact<T: IoVecBufMut + 'static>(
122        &mut self,
123        mut buf: T,
124        c: CancelHandle,
125    ) -> BufResult<usize, T> {
126        let mut meta = crate::buf::write_vec_meta(&mut buf);
127        let len = meta.len();
128        let mut read = 0;
129
130        while read < len {
131            let (res, meta_) = self.cancelable_readv(meta, c.clone()).await;
132            meta = meta_;
133            match res {
134                Ok(0) => {
135                    return (
136                        Err(std::io::Error::new(
137                            std::io::ErrorKind::UnexpectedEof,
138                            "failed to fill whole buffer",
139                        )),
140                        buf,
141                    )
142                }
143                Ok(n) => read += n,
144                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
145                Err(e) => return (Err(e), buf),
146            }
147        }
148        (Ok(read), buf)
149    }
150
151    reader_be_impl!(ReadU8Future, u8, cancelable_read_u8);
152    reader_be_impl!(ReadU16Future, u16, cancelable_read_u16);
153    reader_be_impl!(ReadU32Future, u32, cancelable_read_u32);
154    reader_be_impl!(ReadU64Future, u64, cancelable_read_u64);
155    reader_be_impl!(ReadU128Future, u128, cancelable_read_u128);
156    reader_be_impl!(ReadI8Future, i8, cancelable_read_i8);
157    reader_be_impl!(ReadI16Future, i16, cancelable_read_i16);
158    reader_be_impl!(ReadI32Future, i32, cancelable_read_i32);
159    reader_be_impl!(ReadI64Future, i64, cancelable_read_i64);
160    reader_be_impl!(ReadI128Future, i128, cancelable_read_i128);
161    reader_be_impl!(ReadF32Future, f32, cancelable_read_f32);
162    reader_be_impl!(ReadF64Future, f64, cancelable_read_f64);
163
164    reader_le_impl!(ReadU8LEFuture, u8, cancelable_read_u8_le);
165    reader_le_impl!(ReadU16LEFuture, u16, cancelable_read_u16_le);
166    reader_le_impl!(ReadU32LEFuture, u32, cancelable_read_u32_le);
167    reader_le_impl!(ReadU64LEFuture, u64, cancelable_read_u64_le);
168    reader_le_impl!(ReadU128LEFuture, u128, cancelable_read_u128_le);
169    reader_le_impl!(ReadI8LEFuture, i8, cancelable_read_i8_le);
170    reader_le_impl!(ReadI16LEFuture, i16, cancelable_read_i16_le);
171    reader_le_impl!(ReadI32LEFuture, i32, cancelable_read_i32_le);
172    reader_le_impl!(ReadI64LEFuture, i64, cancelable_read_i64_le);
173    reader_le_impl!(ReadI128LEFuture, i128, cancelable_read_i128_le);
174    reader_be_impl!(ReadF32LEFuture, f32, cancelable_read_f32_le);
175    reader_be_impl!(ReadF64LEFuture, f64, cancelable_read_f64_le);
176}
177
178/// CancelableAsyncWriteRentExt
179pub trait CancelableAsyncWriteRentExt {
180    /// Write all
181    fn write_all<T: IoBuf + 'static>(
182        &mut self,
183        buf: T,
184        c: CancelHandle,
185    ) -> impl Future<Output = BufResult<usize, T>>;
186
187    /// Write all
188    fn write_vectored_all<T: IoVecBuf + 'static>(
189        &mut self,
190        buf: T,
191        c: CancelHandle,
192    ) -> impl Future<Output = BufResult<usize, T>>;
193}
194
195impl<A> CancelableAsyncWriteRentExt for A
196where
197    A: CancelableAsyncWriteRent + ?Sized,
198{
199    async fn write_all<T: IoBuf + 'static>(
200        &mut self,
201        mut buf: T,
202        c: CancelHandle,
203    ) -> BufResult<usize, T> {
204        let len = buf.bytes_init();
205        let mut written = 0;
206        while written < len {
207            let buf_slice = unsafe { Slice::new_unchecked(buf, written, len) };
208            let (result, buf_slice) = self.cancelable_write(buf_slice, c.clone()).await;
209            buf = buf_slice.into_inner();
210            match result {
211                Ok(0) => {
212                    return (
213                        Err(std::io::Error::new(
214                            std::io::ErrorKind::WriteZero,
215                            "failed to write whole buffer",
216                        )),
217                        buf,
218                    )
219                }
220                Ok(n) => written += n,
221                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
222                Err(e) => return (Err(e), buf),
223            }
224        }
225        (Ok(written), buf)
226    }
227
228    async fn write_vectored_all<T: IoVecBuf + 'static>(
229        &mut self,
230        buf: T,
231        c: CancelHandle,
232    ) -> BufResult<usize, T> {
233        let mut meta = crate::buf::read_vec_meta(&buf);
234        let len = meta.len();
235        let mut written = 0;
236
237        while written < len {
238            let (res, meta_) = self.cancelable_writev(meta, c.clone()).await;
239            meta = meta_;
240            match res {
241                Ok(0) => {
242                    return (
243                        Err(std::io::Error::new(
244                            std::io::ErrorKind::WriteZero,
245                            "failed to write whole buffer",
246                        )),
247                        buf,
248                    )
249                }
250                Ok(n) => {
251                    written += n;
252                    meta.consume(n);
253                }
254                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
255                Err(e) => return (Err(e), buf),
256            }
257        }
258        (Ok(written), buf)
259    }
260}