monoio/io/util/
buf_reader.rs

1use std::future::Future;
2
3use crate::{
4    buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut},
5    io::{AsyncBufRead, AsyncReadRent, AsyncWriteRent},
6    BufResult,
7};
8
9/// BufReader is a struct with a buffer. BufReader implements AsyncBufRead
10/// and AsyncReadRent, and if the inner io implements AsyncWriteRent, it
11/// will delegate the implementation.
12pub struct BufReader<R> {
13    inner: R,
14    buf: Option<Box<[u8]>>,
15    pos: usize,
16    cap: usize,
17}
18
19const DEFAULT_BUF_SIZE: usize = 8 * 1024;
20
21impl<R> BufReader<R> {
22    /// Create BufReader with default buffer size
23    #[inline]
24    pub fn new(inner: R) -> Self {
25        Self::with_capacity(DEFAULT_BUF_SIZE, inner)
26    }
27
28    /// Create BufReader with given buffer size
29    #[inline]
30    pub fn with_capacity(capacity: usize, inner: R) -> Self {
31        let buffer = vec![0; capacity];
32        Self {
33            inner,
34            buf: Some(buffer.into_boxed_slice()),
35            pos: 0,
36            cap: 0,
37        }
38    }
39
40    /// Gets a reference to the underlying reader.
41    ///
42    /// It is inadvisable to directly read from the underlying reader.
43    #[inline]
44    pub const fn get_ref(&self) -> &R {
45        &self.inner
46    }
47
48    /// Gets a mutable reference to the underlying reader.
49    #[inline]
50    pub fn get_mut(&mut self) -> &mut R {
51        &mut self.inner
52    }
53
54    /// Consumes this `BufReader`, returning the underlying reader.
55    ///
56    /// Note that any leftover data in the internal buffer is lost.
57    #[inline]
58    pub fn into_inner(self) -> R {
59        self.inner
60    }
61
62    /// Returns a reference to the internally buffered data.
63    ///
64    /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is
65    /// empty.
66    #[inline]
67    pub fn buffer(&self) -> &[u8] {
68        &self.buf.as_ref().expect("unable to take buffer")[self.pos..self.cap]
69    }
70
71    /// Invalidates all data in the internal buffer.
72    #[inline]
73    fn discard_buffer(&mut self) {
74        self.pos = 0;
75        self.cap = 0;
76    }
77}
78
79impl<R: AsyncReadRent> AsyncReadRent for BufReader<R> {
80    async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
81        // If we don't have any buffered data and we're doing a massive read
82        // (larger than our internal buffer), bypass our internal buffer
83        // entirely.
84        let owned_buf = self.buf.as_ref().unwrap();
85        if self.pos == self.cap && buf.bytes_total() >= owned_buf.len() {
86            self.discard_buffer();
87            return self.inner.read(buf).await;
88        }
89
90        let rem = match self.fill_buf().await {
91            Ok(slice) => slice,
92            Err(e) => {
93                return (Err(e), buf);
94            }
95        };
96        let amt = std::cmp::min(rem.len(), buf.bytes_total());
97        unsafe {
98            buf.write_ptr().copy_from_nonoverlapping(rem.as_ptr(), amt);
99            buf.set_init(amt);
100        }
101        self.consume(amt);
102        (Ok(amt), buf)
103    }
104
105    async fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
106        let slice = match IoVecWrapperMut::new(buf) {
107            Ok(slice) => slice,
108            Err(buf) => return (Ok(0), buf),
109        };
110
111        let (result, slice) = self.read(slice).await;
112        buf = slice.into_inner();
113        if let Ok(n) = result {
114            unsafe { buf.set_init(n) };
115        }
116        (result, buf)
117    }
118}
119
120impl<R: AsyncReadRent> AsyncBufRead for BufReader<R> {
121    async fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
122        if self.pos == self.cap {
123            // there's no buffered data
124            let buf = self
125                .buf
126                .take()
127                .expect("no buffer available, generated future must be awaited");
128            let (res, buf_) = self.inner.read(buf).await;
129            self.buf = Some(buf_);
130            match res {
131                Ok(n) => {
132                    self.pos = 0;
133                    self.cap = n;
134                    return Ok(unsafe {
135                        // We just put the buf into Option, so it must be Some.
136                        &(self.buf.as_ref().unwrap_unchecked().as_ref())[self.pos..self.cap]
137                    });
138                }
139                Err(e) => {
140                    return Err(e);
141                }
142            }
143        }
144        Ok(&(self
145            .buf
146            .as_ref()
147            .expect("no buffer available, generated future must be awaited")
148            .as_ref())[self.pos..self.cap])
149    }
150
151    fn consume(&mut self, amt: usize) {
152        self.pos = self.cap.min(self.pos + amt);
153    }
154}
155
156impl<R: AsyncReadRent + AsyncWriteRent> AsyncWriteRent for BufReader<R> {
157    #[inline]
158    fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
159        self.inner.write(buf)
160    }
161
162    #[inline]
163    fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
164        self.inner.writev(buf_vec)
165    }
166
167    #[inline]
168    fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
169        self.inner.flush()
170    }
171
172    #[inline]
173    fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
174        self.inner.shutdown()
175    }
176}