ferron_common/util/
monoio_file_stream.rs1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use async_channel::Receiver;
5use bytes::{Bytes, BytesMut};
6use futures_util::Stream;
7use monoio::fs::File;
8use tokio_util::sync::CancellationToken;
9
10const MAX_BUFFER_SIZE: usize = 16384;
11const MAX_CHANNEL_CAPACITY: usize = 2;
12
13#[allow(clippy::type_complexity)]
15pub struct MonoioFileStream {
16 rx: Pin<Box<Receiver<Result<Bytes, std::io::Error>>>>,
17 read_cancel: CancellationToken,
18}
19
20impl MonoioFileStream {
21 pub fn new(file: File, start: Option<u64>, end: Option<u64>) -> Self {
23 let (tx, rx) = async_channel::bounded(MAX_CHANNEL_CAPACITY);
24 let read_cancel = CancellationToken::new();
25 let read_cancel_clone = read_cancel.clone();
26 monoio::spawn(async move {
27 let mut current_pos = start.unwrap_or(0);
28 loop {
29 let buffer_sz = end.map_or(MAX_BUFFER_SIZE, |n| ((n - current_pos) as usize).min(MAX_BUFFER_SIZE));
30 if buffer_sz == 0 {
31 break;
32 }
33 let buffer = BytesMut::with_capacity(buffer_sz);
34 let (io_result, mut buffer) = monoio::select! {
35 biased;
36
37 _ = read_cancel_clone.cancelled() => {
38 break;
39 }
40 result = file.read_at(buffer, current_pos) => {
41 result
42 }
43 };
44 if let Ok(n) = io_result.as_ref() {
45 if n == &0 {
46 break;
47 }
48 current_pos += *n as u64;
49 }
50 let is_err = io_result.is_err();
51 if tx
52 .send(io_result.map(move |n| {
53 buffer.truncate(n);
54 buffer.freeze()
55 }))
56 .await
57 .is_err()
58 {
59 return;
60 }
61 if is_err {
62 break;
63 }
64 }
65 });
66 Self {
67 rx: Box::pin(rx),
68 read_cancel,
69 }
70 }
71}
72
73impl Stream for MonoioFileStream {
74 type Item = Result<Bytes, std::io::Error>;
75
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 Pin::new(&mut self.rx).poll_next(cx)
78 }
79}
80
81impl Drop for MonoioFileStream {
82 fn drop(&mut self) {
83 self.rx.close();
84 self.read_cancel.cancel();
85 }
86}