monoio/io/util/
copy.rs

1#![allow(unused)]
2
3use std::io;
4
5use crate::io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt};
6#[cfg(unix)]
7use crate::net::unix::new_pipe;
8
9const BUF_SIZE: usize = 64 * 1024;
10
11/// Copy data from reader to writer.
12pub async fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result<u64>
13where
14    R: AsyncReadRent + ?Sized,
15    W: AsyncWriteRent + ?Sized,
16{
17    let mut buf: Vec<u8> = Vec::with_capacity(BUF_SIZE);
18    let mut transferred: u64 = 0;
19
20    'r: loop {
21        let (read_res, mut buf_read) = reader.read(buf).await;
22        match read_res {
23            Ok(0) => {
24                // read closed
25                break;
26            }
27            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
28                // retry
29                buf = buf_read;
30                continue;
31            }
32            Err(e) => {
33                // should return error
34                return Err(e);
35            }
36            Ok(_) => {
37                // go write data
38            }
39        }
40
41        'w: loop {
42            let (write_res, buf_) = writer.write_all(buf_read).await;
43            match write_res {
44                Ok(0) => {
45                    // write closed
46                    return Err(io::Error::new(
47                        io::ErrorKind::WriteZero,
48                        "write zero byte into writer",
49                    ));
50                }
51                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
52                    // retry
53                    buf_read = buf_;
54                    continue 'w;
55                }
56                Err(e) => {
57                    // should return error
58                    return Err(e);
59                }
60                Ok(n) => {
61                    // go read data
62                    transferred += n as u64;
63                    buf = buf_;
64                    break;
65                }
66            }
67        }
68    }
69
70    Ok(transferred)
71}
72
73/// Copy with splice.
74#[cfg(all(target_os = "linux", feature = "splice"))]
75pub async fn zero_copy<SRC: crate::io::as_fd::AsReadFd, DST: crate::io::as_fd::AsWriteFd>(
76    reader: &mut SRC,
77    writer: &mut DST,
78) -> io::Result<u64> {
79    use crate::{
80        driver::op::Op,
81        io::splice::{SpliceDestination, SpliceSource},
82    };
83
84    let (mut pr, mut pw) = new_pipe()?;
85    let mut transferred: u64 = 0;
86    loop {
87        let mut to_write = reader.splice_to_pipe(&mut pw, BUF_SIZE as u32).await?;
88        if to_write == 0 {
89            break;
90        }
91        transferred += to_write as u64;
92        while to_write > 0 {
93            let written = writer.splice_from_pipe(&mut pr, to_write).await?;
94            to_write -= written;
95        }
96    }
97    Ok(transferred)
98}