monoio/io/util/
buf_writer.rs1use std::{future::Future, io};
2
3use crate::{
4 buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapper, Slice},
5 io::{AsyncBufRead, AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt},
6 BufResult,
7};
8
9pub struct BufWriter<W> {
13 inner: W,
14 buf: Option<Box<[u8]>>,
15 cap: usize,
16}
17
18const DEFAULT_BUF_SIZE: usize = 8 * 1024;
19
20impl<W> BufWriter<W> {
21 #[inline]
23 pub fn new(inner: W) -> Self {
24 Self::with_capacity(DEFAULT_BUF_SIZE, inner)
25 }
26
27 #[inline]
29 pub fn with_capacity(capacity: usize, inner: W) -> Self {
30 let buffer = vec![0; capacity];
31 Self {
32 inner,
33 buf: Some(buffer.into_boxed_slice()),
34 cap: 0,
35 }
36 }
37
38 #[inline]
40 pub fn get_ref(&self) -> &W {
41 &self.inner
42 }
43
44 #[inline]
46 pub fn get_mut(&mut self) -> &mut W {
47 &mut self.inner
48 }
49
50 #[inline]
54 pub fn into_inner(self) -> W {
55 self.inner
56 }
57
58 #[inline]
60 pub fn buffer(&self) -> &[u8] {
61 &self.buf.as_ref().expect("unable to take buffer")[..self.cap]
62 }
63
64 #[inline]
66 fn discard_buffer(&mut self) {
67 self.cap = 0;
68 }
69}
70
71impl<W: AsyncWriteRent> BufWriter<W> {
72 async fn flush_buf(&mut self) -> io::Result<()> {
73 if self.cap != 0 {
74 let buf = self
76 .buf
77 .take()
78 .expect("no buffer available, generated future must be awaited");
79 let slice = Slice::new(buf, 0, self.cap);
81 let (ret, slice) = self.inner.write_all(slice).await;
82 self.buf = Some(slice.into_inner());
84 ret?;
85 self.discard_buffer();
86 }
87 Ok(())
88 }
89}
90
91impl<W: AsyncWriteRent> AsyncWriteRent for BufWriter<W> {
92 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
93 let owned_buf = self.buf.as_ref().unwrap();
94 let owned_len = owned_buf.len();
95 let amt = buf.bytes_init();
96
97 if self.cap + amt > owned_len {
98 match self.flush_buf().await {
101 Ok(_) => (),
102 Err(e) => {
103 return (Err(e), buf);
104 }
105 }
106 }
107
108 if amt > owned_len {
115 self.inner.write(buf).await
116 } else {
117 unsafe {
118 let owned_buf = self.buf.as_mut().unwrap();
119 owned_buf
120 .as_mut_ptr()
121 .add(self.cap)
122 .copy_from_nonoverlapping(buf.read_ptr(), amt);
123 }
124 self.cap += amt;
125 (Ok(amt), buf)
126 }
127 }
128
129 async fn writev<T: IoVecBuf>(&mut self, buf: T) -> BufResult<usize, T> {
131 let slice = match IoVecWrapper::new(buf) {
132 Ok(slice) => slice,
133 Err(buf) => return (Ok(0), buf),
134 };
135
136 let (result, slice) = self.write(slice).await;
137 (result, slice.into_inner())
138 }
139
140 async fn flush(&mut self) -> std::io::Result<()> {
141 self.flush_buf().await?;
142 self.inner.flush().await
143 }
144
145 async fn shutdown(&mut self) -> std::io::Result<()> {
146 self.flush_buf().await?;
147 self.inner.shutdown().await
148 }
149}
150
151impl<W: AsyncWriteRent + AsyncReadRent> AsyncReadRent for BufWriter<W> {
152 #[inline]
153 fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
154 self.inner.read(buf)
155 }
156
157 #[inline]
158 fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
159 self.inner.readv(buf)
160 }
161}
162
163impl<W: AsyncWriteRent + AsyncBufRead> AsyncBufRead for BufWriter<W> {
164 #[inline]
165 fn fill_buf(&mut self) -> impl Future<Output = std::io::Result<&[u8]>> {
166 self.inner.fill_buf()
167 }
168
169 #[inline]
170 fn consume(&mut self, amt: usize) {
171 self.inner.consume(amt)
172 }
173}