ferron_common/util/
monoio_file_stream.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use async_channel::Receiver;
5use bytes::{Bytes, BytesMut};
6use futures_util::Stream;
7use monoio::fs::File;
8use tokio_util::sync::CancellationToken;
9
10const MAX_BUFFER_SIZE: usize = 16384;
11const MAX_CHANNEL_CAPACITY: usize = 2;
12
13/// A wrapper over Monoio's `File` that implements a `Stream` trait.
14#[allow(clippy::type_complexity)]
15pub struct MonoioFileStream {
16  rx: Pin<Box<Receiver<Result<Bytes, std::io::Error>>>>,
17  read_cancel: CancellationToken,
18}
19
20impl MonoioFileStream {
21  /// Creates a new stream from Monoio's `File`, with specified start and end positions
22  pub fn new(file: File, start: Option<u64>, end: Option<u64>) -> Self {
23    let (tx, rx) = async_channel::bounded(MAX_CHANNEL_CAPACITY);
24    let read_cancel = CancellationToken::new();
25    let read_cancel_clone = read_cancel.clone();
26    monoio::spawn(async move {
27      let mut current_pos = start.unwrap_or(0);
28      loop {
29        let buffer_sz = end.map_or(MAX_BUFFER_SIZE, |n| ((n - current_pos) as usize).min(MAX_BUFFER_SIZE));
30        if buffer_sz == 0 {
31          break;
32        }
33        let buffer = BytesMut::with_capacity(buffer_sz);
34        let (io_result, mut buffer) = monoio::select! {
35          biased;
36
37          _ = read_cancel_clone.cancelled() => {
38            break;
39          }
40          result = file.read_at(buffer, current_pos) => {
41            result
42          }
43        };
44        if let Ok(n) = io_result.as_ref() {
45          if n == &0 {
46            break;
47          }
48          current_pos += *n as u64;
49        }
50        let is_err = io_result.is_err();
51        if tx
52          .send(io_result.map(move |n| {
53            buffer.truncate(n);
54            buffer.freeze()
55          }))
56          .await
57          .is_err()
58        {
59          return;
60        }
61        if is_err {
62          break;
63        }
64      }
65    });
66    Self {
67      rx: Box::pin(rx),
68      read_cancel,
69    }
70  }
71}
72
73impl Stream for MonoioFileStream {
74  type Item = Result<Bytes, std::io::Error>;
75
76  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77    Pin::new(&mut self.rx).poll_next(cx)
78  }
79}
80
81impl Drop for MonoioFileStream {
82  fn drop(&mut self) {
83    self.rx.close();
84    self.read_cancel.cancel();
85  }
86}