monoio/io/
async_write_rent_ext.rs

1use std::future::Future;
2
3use crate::{
4    buf::{IoBuf, IoVecBuf, Slice},
5    io::AsyncWriteRent,
6    BufResult,
7};
8
9/// AsyncWriteRentExt
10pub trait AsyncWriteRentExt {
11    /// Write all
12    fn write_all<T: IoBuf + 'static>(
13        &mut self,
14        buf: T,
15    ) -> impl Future<Output = BufResult<usize, T>>;
16
17    /// Write vectored all
18    fn write_vectored_all<T: IoVecBuf + 'static>(
19        &mut self,
20        buf: T,
21    ) -> impl Future<Output = BufResult<usize, T>>;
22}
23
24impl<A> AsyncWriteRentExt for A
25where
26    A: AsyncWriteRent + ?Sized,
27{
28    async fn write_all<T: IoBuf + 'static>(&mut self, mut buf: T) -> BufResult<usize, T> {
29        let len = buf.bytes_init();
30        let mut written = 0;
31        while written < len {
32            let buf_slice = unsafe { Slice::new_unchecked(buf, written, len) };
33            let (result, buf_slice) = self.write(buf_slice).await;
34            buf = buf_slice.into_inner();
35            match result {
36                Ok(0) => {
37                    return (
38                        Err(std::io::Error::new(
39                            std::io::ErrorKind::WriteZero,
40                            "failed to write whole buffer",
41                        )),
42                        buf,
43                    )
44                }
45                Ok(n) => written += n,
46                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
47                Err(e) => return (Err(e), buf),
48            }
49        }
50        (Ok(written), buf)
51    }
52
53    async fn write_vectored_all<T: IoVecBuf + 'static>(&mut self, buf: T) -> BufResult<usize, T> {
54        let mut meta = crate::buf::read_vec_meta(&buf);
55        let len = meta.len();
56        let mut written = 0;
57
58        while written < len {
59            let (res, meta_) = self.writev(meta).await;
60            meta = meta_;
61            match res {
62                Ok(0) => {
63                    return (
64                        Err(std::io::Error::new(
65                            std::io::ErrorKind::WriteZero,
66                            "failed to write whole buffer",
67                        )),
68                        buf,
69                    )
70                }
71                Ok(n) => {
72                    written += n;
73                    meta.consume(n);
74                }
75                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
76                Err(e) => return (Err(e), buf),
77            }
78        }
79        (Ok(written), buf)
80    }
81}