monoio/io/util/
prefixed_io.rs

1use std::future::Future;
2
3use super::{split::Split, CancelHandle};
4use crate::{
5    buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut},
6    io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent},
7    BufResult,
8};
9
10/// PrefixedReadIO facilitates the addition of a prefix to an IO stream,
11/// enabling stream rewinding and peeking capabilities.
12/// Subsequent reads will preserve access to the original stream contents.
13/// ```
14/// # use monoio::io::PrefixedReadIo;
15/// # use monoio::io::{AsyncReadRent, AsyncWriteRent, AsyncReadRentExt};
16///
17/// async fn demo<T>(mut stream: T)
18/// where
19///     T: AsyncReadRent + AsyncWriteRent,
20/// {
21///     // let stream = b"hello world";
22///     let buf = vec![0 as u8; 6];
23///     let (_, buf) = stream.read_exact(buf).await;
24///     assert_eq!(buf, b"hello ");
25///
26///     let prefix_buf = std::io::Cursor::new(buf);
27///     let mut pio = PrefixedReadIo::new(stream, prefix_buf);
28///
29///     let buf = vec![0 as u8; 11];
30///     let (_, buf) = pio.read_exact(buf).await;
31///     assert_eq!(buf, b"hello world");
32/// }
33/// ```
34pub struct PrefixedReadIo<I, P> {
35    io: I,
36    prefix: P,
37
38    prefix_finished: bool,
39}
40
41impl<I, P> PrefixedReadIo<I, P> {
42    /// Create a PrefixedIo with given io and read prefix.
43    pub const fn new(io: I, prefix: P) -> Self {
44        Self {
45            io,
46            prefix,
47            prefix_finished: false,
48        }
49    }
50
51    /// If the prefix has read to eof
52    pub const fn prefix_finished(&self) -> bool {
53        self.prefix_finished
54    }
55
56    /// Into inner
57    #[inline]
58    pub fn into_inner(self) -> I {
59        self.io
60    }
61}
62
63impl<I: AsyncReadRent, P: std::io::Read> AsyncReadRent for PrefixedReadIo<I, P> {
64    async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
65        if buf.bytes_total() == 0 {
66            return (Ok(0), buf);
67        }
68        if !self.prefix_finished {
69            let slice = unsafe {
70                &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total())
71            };
72            match self.prefix.read(slice) {
73                Ok(0) => {
74                    // prefix finished
75                    self.prefix_finished = true;
76                }
77                Ok(n) => {
78                    unsafe { buf.set_init(n) };
79                    return (Ok(n), buf);
80                }
81                Err(e) => {
82                    return (Err(e), buf);
83                }
84            }
85        }
86        // prefix eof now, read io directly
87        self.io.read(buf).await
88    }
89
90    async fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
91        let slice = match IoVecWrapperMut::new(buf) {
92            Ok(slice) => slice,
93            Err(buf) => return (Ok(0), buf),
94        };
95
96        let (result, slice) = self.read(slice).await;
97        buf = slice.into_inner();
98        if let Ok(n) = result {
99            unsafe { buf.set_init(n) };
100        }
101        (result, buf)
102    }
103}
104
105impl<I: CancelableAsyncReadRent, P: std::io::Read> CancelableAsyncReadRent
106    for PrefixedReadIo<I, P>
107{
108    async fn cancelable_read<T: IoBufMut>(
109        &mut self,
110        mut buf: T,
111        c: CancelHandle,
112    ) -> crate::BufResult<usize, T> {
113        if buf.bytes_total() == 0 {
114            return (Ok(0), buf);
115        }
116        if !self.prefix_finished {
117            let slice = unsafe {
118                &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total())
119            };
120            match self.prefix.read(slice) {
121                Ok(0) => {
122                    // prefix finished
123                    self.prefix_finished = true;
124                }
125                Ok(n) => {
126                    unsafe { buf.set_init(n) };
127                    return (Ok(n), buf);
128                }
129                Err(e) => {
130                    return (Err(e), buf);
131                }
132            }
133        }
134        // prefix eof now, read io directly
135        self.io.cancelable_read(buf, c).await
136    }
137
138    async fn cancelable_readv<T: IoVecBufMut>(
139        &mut self,
140        mut buf: T,
141        c: CancelHandle,
142    ) -> crate::BufResult<usize, T> {
143        let slice = match IoVecWrapperMut::new(buf) {
144            Ok(slice) => slice,
145            Err(buf) => return (Ok(0), buf),
146        };
147
148        let (result, slice) = self.cancelable_read(slice, c).await;
149        buf = slice.into_inner();
150        if let Ok(n) = result {
151            unsafe { buf.set_init(n) };
152        }
153        (result, buf)
154    }
155}
156
157impl<I: AsyncWriteRent, P> AsyncWriteRent for PrefixedReadIo<I, P> {
158    #[inline]
159    fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
160        self.io.write(buf)
161    }
162
163    #[inline]
164    fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
165        self.io.writev(buf_vec)
166    }
167
168    #[inline]
169    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
170        self.io.flush()
171    }
172
173    #[inline]
174    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
175        self.io.shutdown()
176    }
177}
178
179impl<I: CancelableAsyncWriteRent, P> CancelableAsyncWriteRent for PrefixedReadIo<I, P> {
180    #[inline]
181    fn cancelable_write<T: IoBuf>(
182        &mut self,
183        buf: T,
184        c: CancelHandle,
185    ) -> impl Future<Output = BufResult<usize, T>> {
186        self.io.cancelable_write(buf, c)
187    }
188
189    #[inline]
190    fn cancelable_writev<T: IoVecBuf>(
191        &mut self,
192        buf_vec: T,
193        c: CancelHandle,
194    ) -> impl Future<Output = BufResult<usize, T>> {
195        self.io.cancelable_writev(buf_vec, c)
196    }
197
198    #[inline]
199    fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future<Output = std::io::Result<()>> {
200        self.io.cancelable_flush(c)
201    }
202
203    #[inline]
204    fn cancelable_shutdown(
205        &mut self,
206        c: CancelHandle,
207    ) -> impl Future<Output = std::io::Result<()>> {
208        self.io.cancelable_shutdown(c)
209    }
210}
211
212/// implement unsafe Split for PrefixedReadIo, it's `safe`
213/// because read/write are independent, we can safely split them into two I/O parts.
214unsafe impl<I, P> Split for PrefixedReadIo<I, P> where I: Split {}