monoio/io/util/
prefixed_io.rs1use std::future::Future;
2
3use super::{split::Split, CancelHandle};
4use crate::{
5 buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut},
6 io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent},
7 BufResult,
8};
9
10pub struct PrefixedReadIo<I, P> {
35 io: I,
36 prefix: P,
37
38 prefix_finished: bool,
39}
40
41impl<I, P> PrefixedReadIo<I, P> {
42 pub const fn new(io: I, prefix: P) -> Self {
44 Self {
45 io,
46 prefix,
47 prefix_finished: false,
48 }
49 }
50
51 pub const fn prefix_finished(&self) -> bool {
53 self.prefix_finished
54 }
55
56 #[inline]
58 pub fn into_inner(self) -> I {
59 self.io
60 }
61}
62
63impl<I: AsyncReadRent, P: std::io::Read> AsyncReadRent for PrefixedReadIo<I, P> {
64 async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
65 if buf.bytes_total() == 0 {
66 return (Ok(0), buf);
67 }
68 if !self.prefix_finished {
69 let slice = unsafe {
70 &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total())
71 };
72 match self.prefix.read(slice) {
73 Ok(0) => {
74 self.prefix_finished = true;
76 }
77 Ok(n) => {
78 unsafe { buf.set_init(n) };
79 return (Ok(n), buf);
80 }
81 Err(e) => {
82 return (Err(e), buf);
83 }
84 }
85 }
86 self.io.read(buf).await
88 }
89
90 async fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
91 let slice = match IoVecWrapperMut::new(buf) {
92 Ok(slice) => slice,
93 Err(buf) => return (Ok(0), buf),
94 };
95
96 let (result, slice) = self.read(slice).await;
97 buf = slice.into_inner();
98 if let Ok(n) = result {
99 unsafe { buf.set_init(n) };
100 }
101 (result, buf)
102 }
103}
104
105impl<I: CancelableAsyncReadRent, P: std::io::Read> CancelableAsyncReadRent
106 for PrefixedReadIo<I, P>
107{
108 async fn cancelable_read<T: IoBufMut>(
109 &mut self,
110 mut buf: T,
111 c: CancelHandle,
112 ) -> crate::BufResult<usize, T> {
113 if buf.bytes_total() == 0 {
114 return (Ok(0), buf);
115 }
116 if !self.prefix_finished {
117 let slice = unsafe {
118 &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total())
119 };
120 match self.prefix.read(slice) {
121 Ok(0) => {
122 self.prefix_finished = true;
124 }
125 Ok(n) => {
126 unsafe { buf.set_init(n) };
127 return (Ok(n), buf);
128 }
129 Err(e) => {
130 return (Err(e), buf);
131 }
132 }
133 }
134 self.io.cancelable_read(buf, c).await
136 }
137
138 async fn cancelable_readv<T: IoVecBufMut>(
139 &mut self,
140 mut buf: T,
141 c: CancelHandle,
142 ) -> crate::BufResult<usize, T> {
143 let slice = match IoVecWrapperMut::new(buf) {
144 Ok(slice) => slice,
145 Err(buf) => return (Ok(0), buf),
146 };
147
148 let (result, slice) = self.cancelable_read(slice, c).await;
149 buf = slice.into_inner();
150 if let Ok(n) = result {
151 unsafe { buf.set_init(n) };
152 }
153 (result, buf)
154 }
155}
156
157impl<I: AsyncWriteRent, P> AsyncWriteRent for PrefixedReadIo<I, P> {
158 #[inline]
159 fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
160 self.io.write(buf)
161 }
162
163 #[inline]
164 fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
165 self.io.writev(buf_vec)
166 }
167
168 #[inline]
169 fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
170 self.io.flush()
171 }
172
173 #[inline]
174 fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
175 self.io.shutdown()
176 }
177}
178
179impl<I: CancelableAsyncWriteRent, P> CancelableAsyncWriteRent for PrefixedReadIo<I, P> {
180 #[inline]
181 fn cancelable_write<T: IoBuf>(
182 &mut self,
183 buf: T,
184 c: CancelHandle,
185 ) -> impl Future<Output = BufResult<usize, T>> {
186 self.io.cancelable_write(buf, c)
187 }
188
189 #[inline]
190 fn cancelable_writev<T: IoVecBuf>(
191 &mut self,
192 buf_vec: T,
193 c: CancelHandle,
194 ) -> impl Future<Output = BufResult<usize, T>> {
195 self.io.cancelable_writev(buf_vec, c)
196 }
197
198 #[inline]
199 fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future<Output = std::io::Result<()>> {
200 self.io.cancelable_flush(c)
201 }
202
203 #[inline]
204 fn cancelable_shutdown(
205 &mut self,
206 c: CancelHandle,
207 ) -> impl Future<Output = std::io::Result<()>> {
208 self.io.cancelable_shutdown(c)
209 }
210}
211
212unsafe impl<I, P> Split for PrefixedReadIo<I, P> where I: Split {}