monoio/io/util/
buf_writer.rs

1use std::{future::Future, io};
2
3use crate::{
4    buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapper, Slice},
5    io::{AsyncBufRead, AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt},
6    BufResult,
7};
8
9/// BufWriter is a struct with a buffer. BufWriter implements AsyncWriteRent,
10/// and if the inner io implements AsyncReadRent, it will delegate the
11/// implementation.
12pub struct BufWriter<W> {
13    inner: W,
14    buf: Option<Box<[u8]>>,
15    cap: usize,
16}
17
18const DEFAULT_BUF_SIZE: usize = 8 * 1024;
19
20impl<W> BufWriter<W> {
21    /// Create BufWriter with default buffer size
22    #[inline]
23    pub fn new(inner: W) -> Self {
24        Self::with_capacity(DEFAULT_BUF_SIZE, inner)
25    }
26
27    /// Create BufWriter with given buffer size
28    #[inline]
29    pub fn with_capacity(capacity: usize, inner: W) -> Self {
30        let buffer = vec![0; capacity];
31        Self {
32            inner,
33            buf: Some(buffer.into_boxed_slice()),
34            cap: 0,
35        }
36    }
37
38    /// Gets a reference to the underlying writer.
39    #[inline]
40    pub fn get_ref(&self) -> &W {
41        &self.inner
42    }
43
44    /// Gets a mutable reference to the underlying writer.
45    #[inline]
46    pub fn get_mut(&mut self) -> &mut W {
47        &mut self.inner
48    }
49
50    /// Consumes this `BufWriter`, returning the underlying writer.
51    ///
52    /// Note that any leftover data in the internal buffer is lost.
53    #[inline]
54    pub fn into_inner(self) -> W {
55        self.inner
56    }
57
58    /// Returns a reference to the internally buffered data.
59    #[inline]
60    pub fn buffer(&self) -> &[u8] {
61        &self.buf.as_ref().expect("unable to take buffer")[..self.cap]
62    }
63
64    /// Invalidates all data in the internal buffer.
65    #[inline]
66    fn discard_buffer(&mut self) {
67        self.cap = 0;
68    }
69}
70
71impl<W: AsyncWriteRent> BufWriter<W> {
72    async fn flush_buf(&mut self) -> io::Result<()> {
73        if self.cap != 0 {
74            // there is some data left inside internal buf
75            let buf = self
76                .buf
77                .take()
78                .expect("no buffer available, generated future must be awaited");
79            // move buf to slice and write_all
80            let slice = Slice::new(buf, 0, self.cap);
81            let (ret, slice) = self.inner.write_all(slice).await;
82            // move it back and return
83            self.buf = Some(slice.into_inner());
84            ret?;
85            self.discard_buffer();
86        }
87        Ok(())
88    }
89}
90
91impl<W: AsyncWriteRent> AsyncWriteRent for BufWriter<W> {
92    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
93        let owned_buf = self.buf.as_ref().unwrap();
94        let owned_len = owned_buf.len();
95        let amt = buf.bytes_init();
96
97        if self.cap + amt > owned_len {
98            // Buf can not be copied directly into OwnedBuf,
99            // we must flush OwnedBuf first.
100            match self.flush_buf().await {
101                Ok(_) => (),
102                Err(e) => {
103                    return (Err(e), buf);
104                }
105            }
106        }
107
108        // Now there are two situations here:
109        // 1. OwnedBuf has data, and self.cap + amt <= owned_len,
110        // which means the data can be copied into OwnedBuf.
111        // 2. OwnedBuf is empty. If we can copy buf into OwnedBuf,
112        // we will copy it, otherwise we will send it directly(in
113        // this situation, the OwnedBuf must be already empty).
114        if amt > owned_len {
115            self.inner.write(buf).await
116        } else {
117            unsafe {
118                let owned_buf = self.buf.as_mut().unwrap();
119                owned_buf
120                    .as_mut_ptr()
121                    .add(self.cap)
122                    .copy_from_nonoverlapping(buf.read_ptr(), amt);
123            }
124            self.cap += amt;
125            (Ok(amt), buf)
126        }
127    }
128
129    // TODO: implement it as real io_vec
130    async fn writev<T: IoVecBuf>(&mut self, buf: T) -> BufResult<usize, T> {
131        let slice = match IoVecWrapper::new(buf) {
132            Ok(slice) => slice,
133            Err(buf) => return (Ok(0), buf),
134        };
135
136        let (result, slice) = self.write(slice).await;
137        (result, slice.into_inner())
138    }
139
140    async fn flush(&mut self) -> std::io::Result<()> {
141        self.flush_buf().await?;
142        self.inner.flush().await
143    }
144
145    async fn shutdown(&mut self) -> std::io::Result<()> {
146        self.flush_buf().await?;
147        self.inner.shutdown().await
148    }
149}
150
151impl<W: AsyncWriteRent + AsyncReadRent> AsyncReadRent for BufWriter<W> {
152    #[inline]
153    fn read<T: IoBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
154        self.inner.read(buf)
155    }
156
157    #[inline]
158    fn readv<T: IoVecBufMut>(&mut self, buf: T) -> impl Future<Output = BufResult<usize, T>> {
159        self.inner.readv(buf)
160    }
161}
162
163impl<W: AsyncWriteRent + AsyncBufRead> AsyncBufRead for BufWriter<W> {
164    #[inline]
165    fn fill_buf(&mut self) -> impl Future<Output = std::io::Result<&[u8]>> {
166        self.inner.fill_buf()
167    }
168
169    #[inline]
170    fn consume(&mut self, amt: usize) {
171        self.inner.consume(amt)
172    }
173}