monoio/io/util/
buf_reader.rs1use std::future::Future;
2
3use crate::{
4 buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut},
5 io::{AsyncBufRead, AsyncReadRent, AsyncWriteRent},
6 BufResult,
7};
8
9pub struct BufReader<R> {
13 inner: R,
14 buf: Option<Box<[u8]>>,
15 pos: usize,
16 cap: usize,
17}
18
19const DEFAULT_BUF_SIZE: usize = 8 * 1024;
20
21impl<R> BufReader<R> {
22 #[inline]
24 pub fn new(inner: R) -> Self {
25 Self::with_capacity(DEFAULT_BUF_SIZE, inner)
26 }
27
28 #[inline]
30 pub fn with_capacity(capacity: usize, inner: R) -> Self {
31 let buffer = vec![0; capacity];
32 Self {
33 inner,
34 buf: Some(buffer.into_boxed_slice()),
35 pos: 0,
36 cap: 0,
37 }
38 }
39
40 #[inline]
44 pub const fn get_ref(&self) -> &R {
45 &self.inner
46 }
47
48 #[inline]
50 pub fn get_mut(&mut self) -> &mut R {
51 &mut self.inner
52 }
53
54 #[inline]
58 pub fn into_inner(self) -> R {
59 self.inner
60 }
61
62 #[inline]
67 pub fn buffer(&self) -> &[u8] {
68 &self.buf.as_ref().expect("unable to take buffer")[self.pos..self.cap]
69 }
70
71 #[inline]
73 fn discard_buffer(&mut self) {
74 self.pos = 0;
75 self.cap = 0;
76 }
77}
78
79impl<R: AsyncReadRent> AsyncReadRent for BufReader<R> {
80 async fn read<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
81 let owned_buf = self.buf.as_ref().unwrap();
85 if self.pos == self.cap && buf.bytes_total() >= owned_buf.len() {
86 self.discard_buffer();
87 return self.inner.read(buf).await;
88 }
89
90 let rem = match self.fill_buf().await {
91 Ok(slice) => slice,
92 Err(e) => {
93 return (Err(e), buf);
94 }
95 };
96 let amt = std::cmp::min(rem.len(), buf.bytes_total());
97 unsafe {
98 buf.write_ptr().copy_from_nonoverlapping(rem.as_ptr(), amt);
99 buf.set_init(amt);
100 }
101 self.consume(amt);
102 (Ok(amt), buf)
103 }
104
105 async fn readv<T: IoVecBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
106 let slice = match IoVecWrapperMut::new(buf) {
107 Ok(slice) => slice,
108 Err(buf) => return (Ok(0), buf),
109 };
110
111 let (result, slice) = self.read(slice).await;
112 buf = slice.into_inner();
113 if let Ok(n) = result {
114 unsafe { buf.set_init(n) };
115 }
116 (result, buf)
117 }
118}
119
120impl<R: AsyncReadRent> AsyncBufRead for BufReader<R> {
121 async fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
122 if self.pos == self.cap {
123 let buf = self
125 .buf
126 .take()
127 .expect("no buffer available, generated future must be awaited");
128 let (res, buf_) = self.inner.read(buf).await;
129 self.buf = Some(buf_);
130 match res {
131 Ok(n) => {
132 self.pos = 0;
133 self.cap = n;
134 return Ok(unsafe {
135 &(self.buf.as_ref().unwrap_unchecked().as_ref())[self.pos..self.cap]
137 });
138 }
139 Err(e) => {
140 return Err(e);
141 }
142 }
143 }
144 Ok(&(self
145 .buf
146 .as_ref()
147 .expect("no buffer available, generated future must be awaited")
148 .as_ref())[self.pos..self.cap])
149 }
150
151 fn consume(&mut self, amt: usize) {
152 self.pos = self.cap.min(self.pos + amt);
153 }
154}
155
156impl<R: AsyncReadRent + AsyncWriteRent> AsyncWriteRent for BufReader<R> {
157 #[inline]
158 fn write<T: IoBuf>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
159 self.inner.write(buf)
160 }
161
162 #[inline]
163 fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> impl Future<Output = BufResult<usize, T>> {
164 self.inner.writev(buf_vec)
165 }
166
167 #[inline]
168 fn flush(&mut self) -> impl Future<Output = std::io::Result<()>> {
169 self.inner.flush()
170 }
171
172 #[inline]
173 fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
174 self.inner.shutdown()
175 }
176}