1use std::future::Future;
2
3use super::{CancelHandle, CancelableAsyncReadRent, CancelableAsyncWriteRent};
4use crate::{
5 buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, Slice, SliceMut},
6 BufResult,
7};
8
9macro_rules! reader_trait {
10 ($future: ident, $n_ty: ty, $f: ident) => {
11 fn $f(&mut self, c: CancelHandle) -> impl Future<Output = std::io::Result<$n_ty>>;
13 };
14}
15
16macro_rules! reader_be_impl {
17 ($future: ident, $n_ty: ty, $f: ident) => {
18 async fn $f(&mut self, c: CancelHandle) -> std::io::Result<$n_ty> {
19 let (res, buf) = self
20 .cancelable_read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), c)
21 .await;
22 res?;
23 use crate::utils::box_into_inner::IntoInner;
24 Ok(<$n_ty>::from_be_bytes(Box::consume(buf)))
25 }
26 };
27}
28
29macro_rules! reader_le_impl {
30 ($future: ident, $n_ty: ty, $f: ident) => {
31 async fn $f(&mut self, c: CancelHandle) -> std::io::Result<$n_ty> {
32 let (res, buf) = self
33 .cancelable_read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), c)
34 .await;
35 res?;
36 use crate::utils::box_into_inner::IntoInner;
37 Ok(<$n_ty>::from_le_bytes(Box::consume(buf)))
38 }
39 };
40}
41
42pub trait CancelableAsyncReadRentExt {
44 fn cancelable_read_exact<T: IoBufMut + 'static>(
46 &mut self,
47 buf: T,
48 c: CancelHandle,
49 ) -> impl Future<Output = BufResult<usize, T>>;
50
51 fn cancelable_read_vectored_exact<T: IoVecBufMut + 'static>(
53 &mut self,
54 buf: T,
55 c: CancelHandle,
56 ) -> impl Future<Output = BufResult<usize, T>>;
57
58 reader_trait!(ReadU8Future, u8, cancelable_read_u8);
59 reader_trait!(ReadU16Future, u16, cancelable_read_u16);
60 reader_trait!(ReadU32Future, u32, cancelable_read_u32);
61 reader_trait!(ReadU64Future, u64, cancelable_read_u64);
62 reader_trait!(ReadU128Future, u128, cancelable_read_u128);
63 reader_trait!(ReadI8Future, i8, cancelable_read_i8);
64 reader_trait!(ReadI16Future, i16, cancelable_read_i16);
65 reader_trait!(ReadI32Future, i32, cancelable_read_i32);
66 reader_trait!(ReadI64Future, i64, cancelable_read_i64);
67 reader_trait!(ReadI128Future, i128, cancelable_read_i128);
68 reader_trait!(ReadF32Future, f32, cancelable_read_f32);
69 reader_trait!(ReadF64Future, f64, cancelable_read_f64);
70
71 reader_trait!(ReadU8LEFuture, u8, cancelable_read_u8_le);
72 reader_trait!(ReadU16LEFuture, u16, cancelable_read_u16_le);
73 reader_trait!(ReadU32LEFuture, u32, cancelable_read_u32_le);
74 reader_trait!(ReadU64LEFuture, u64, cancelable_read_u64_le);
75 reader_trait!(ReadU128LEFuture, u128, cancelable_read_u128_le);
76 reader_trait!(ReadI8LEFuture, i8, cancelable_read_i8_le);
77 reader_trait!(ReadI16LEFuture, i16, cancelable_read_i16_le);
78 reader_trait!(ReadI32LEFuture, i32, cancelable_read_i32_le);
79 reader_trait!(ReadI64LEFuture, i64, cancelable_read_i64_le);
80 reader_trait!(ReadI128LEFuture, i128, cancelable_read_i128_le);
81 reader_trait!(ReadF32LEFuture, f32, cancelable_read_f32_le);
82 reader_trait!(ReadF64LEFuture, f64, cancelable_read_f64_le);
83}
84
85impl<A> CancelableAsyncReadRentExt for A
86where
87 A: CancelableAsyncReadRent + ?Sized,
88{
89 async fn cancelable_read_exact<T: IoBufMut + 'static>(
90 &mut self,
91 mut buf: T,
92 c: CancelHandle,
93 ) -> BufResult<usize, T> {
94 let len = buf.bytes_total();
95 let mut read = 0;
96 while read < len {
97 let buf_slice = unsafe { SliceMut::new_unchecked(buf, read, len) };
98 let (result, buf_slice) = self.cancelable_read(buf_slice, c.clone()).await;
99 buf = buf_slice.into_inner();
100 match result {
101 Ok(0) => {
102 return (
103 Err(std::io::Error::new(
104 std::io::ErrorKind::UnexpectedEof,
105 "failed to fill whole buffer",
106 )),
107 buf,
108 )
109 }
110 Ok(n) => {
111 read += n;
112 unsafe { buf.set_init(read) };
113 }
114 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
115 Err(e) => return (Err(e), buf),
116 }
117 }
118 (Ok(read), buf)
119 }
120
121 async fn cancelable_read_vectored_exact<T: IoVecBufMut + 'static>(
122 &mut self,
123 mut buf: T,
124 c: CancelHandle,
125 ) -> BufResult<usize, T> {
126 let mut meta = crate::buf::write_vec_meta(&mut buf);
127 let len = meta.len();
128 let mut read = 0;
129
130 while read < len {
131 let (res, meta_) = self.cancelable_readv(meta, c.clone()).await;
132 meta = meta_;
133 match res {
134 Ok(0) => {
135 return (
136 Err(std::io::Error::new(
137 std::io::ErrorKind::UnexpectedEof,
138 "failed to fill whole buffer",
139 )),
140 buf,
141 )
142 }
143 Ok(n) => read += n,
144 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
145 Err(e) => return (Err(e), buf),
146 }
147 }
148 (Ok(read), buf)
149 }
150
151 reader_be_impl!(ReadU8Future, u8, cancelable_read_u8);
152 reader_be_impl!(ReadU16Future, u16, cancelable_read_u16);
153 reader_be_impl!(ReadU32Future, u32, cancelable_read_u32);
154 reader_be_impl!(ReadU64Future, u64, cancelable_read_u64);
155 reader_be_impl!(ReadU128Future, u128, cancelable_read_u128);
156 reader_be_impl!(ReadI8Future, i8, cancelable_read_i8);
157 reader_be_impl!(ReadI16Future, i16, cancelable_read_i16);
158 reader_be_impl!(ReadI32Future, i32, cancelable_read_i32);
159 reader_be_impl!(ReadI64Future, i64, cancelable_read_i64);
160 reader_be_impl!(ReadI128Future, i128, cancelable_read_i128);
161 reader_be_impl!(ReadF32Future, f32, cancelable_read_f32);
162 reader_be_impl!(ReadF64Future, f64, cancelable_read_f64);
163
164 reader_le_impl!(ReadU8LEFuture, u8, cancelable_read_u8_le);
165 reader_le_impl!(ReadU16LEFuture, u16, cancelable_read_u16_le);
166 reader_le_impl!(ReadU32LEFuture, u32, cancelable_read_u32_le);
167 reader_le_impl!(ReadU64LEFuture, u64, cancelable_read_u64_le);
168 reader_le_impl!(ReadU128LEFuture, u128, cancelable_read_u128_le);
169 reader_le_impl!(ReadI8LEFuture, i8, cancelable_read_i8_le);
170 reader_le_impl!(ReadI16LEFuture, i16, cancelable_read_i16_le);
171 reader_le_impl!(ReadI32LEFuture, i32, cancelable_read_i32_le);
172 reader_le_impl!(ReadI64LEFuture, i64, cancelable_read_i64_le);
173 reader_le_impl!(ReadI128LEFuture, i128, cancelable_read_i128_le);
174 reader_be_impl!(ReadF32LEFuture, f32, cancelable_read_f32_le);
175 reader_be_impl!(ReadF64LEFuture, f64, cancelable_read_f64_le);
176}
177
178pub trait CancelableAsyncWriteRentExt {
180 fn write_all<T: IoBuf + 'static>(
182 &mut self,
183 buf: T,
184 c: CancelHandle,
185 ) -> impl Future<Output = BufResult<usize, T>>;
186
187 fn write_vectored_all<T: IoVecBuf + 'static>(
189 &mut self,
190 buf: T,
191 c: CancelHandle,
192 ) -> impl Future<Output = BufResult<usize, T>>;
193}
194
195impl<A> CancelableAsyncWriteRentExt for A
196where
197 A: CancelableAsyncWriteRent + ?Sized,
198{
199 async fn write_all<T: IoBuf + 'static>(
200 &mut self,
201 mut buf: T,
202 c: CancelHandle,
203 ) -> BufResult<usize, T> {
204 let len = buf.bytes_init();
205 let mut written = 0;
206 while written < len {
207 let buf_slice = unsafe { Slice::new_unchecked(buf, written, len) };
208 let (result, buf_slice) = self.cancelable_write(buf_slice, c.clone()).await;
209 buf = buf_slice.into_inner();
210 match result {
211 Ok(0) => {
212 return (
213 Err(std::io::Error::new(
214 std::io::ErrorKind::WriteZero,
215 "failed to write whole buffer",
216 )),
217 buf,
218 )
219 }
220 Ok(n) => written += n,
221 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
222 Err(e) => return (Err(e), buf),
223 }
224 }
225 (Ok(written), buf)
226 }
227
228 async fn write_vectored_all<T: IoVecBuf + 'static>(
229 &mut self,
230 buf: T,
231 c: CancelHandle,
232 ) -> BufResult<usize, T> {
233 let mut meta = crate::buf::read_vec_meta(&buf);
234 let len = meta.len();
235 let mut written = 0;
236
237 while written < len {
238 let (res, meta_) = self.cancelable_writev(meta, c.clone()).await;
239 meta = meta_;
240 match res {
241 Ok(0) => {
242 return (
243 Err(std::io::Error::new(
244 std::io::ErrorKind::WriteZero,
245 "failed to write whole buffer",
246 )),
247 buf,
248 )
249 }
250 Ok(n) => {
251 written += n;
252 meta.consume(n);
253 }
254 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
255 Err(e) => return (Err(e), buf),
256 }
257 }
258 (Ok(written), buf)
259 }
260}