ferron_common/util/
monoio_file_stream_no_spawn.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use futures_util::Stream;
8use monoio::fs::File;
9use send_wrapper::SendWrapper;
10
11const MAX_BUFFER_SIZE: usize = 16384;
12
13/// A wrapper over Monoio's `File` that implements a `Stream` trait and doesn't spawn a background task.
14#[allow(clippy::type_complexity)]
15pub struct MonoioFileStreamNoSpawn {
16  file: Arc<SendWrapper<File>>,
17  current_pos: u64,
18  end: Option<u64>,
19  finished: bool,
20  read_future: Option<Pin<Box<dyn Future<Output = Option<Result<Bytes, std::io::Error>>> + Send + Sync>>>,
21}
22
23impl MonoioFileStreamNoSpawn {
24  /// Creates a new stream from Monoio's `File`, with specified start and end positions
25  pub fn new(file: File, start: Option<u64>, end: Option<u64>) -> Self {
26    Self {
27      file: Arc::new(SendWrapper::new(file)),
28      current_pos: start.unwrap_or(0),
29      end,
30      finished: false,
31      read_future: None,
32    }
33  }
34}
35
36impl Stream for MonoioFileStreamNoSpawn {
37  type Item = Result<Bytes, std::io::Error>;
38
39  #[inline]
40  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41    if self.finished {
42      return Poll::Ready(None);
43    }
44    if self.read_future.is_none() {
45      self.read_future = Some(Box::pin(SendWrapper::new(read_chunk(
46        self.file.clone(),
47        self.current_pos,
48        self.end,
49      ))));
50    }
51    match Pin::new(
52      self
53        .read_future
54        .as_mut()
55        .expect("file stream read future is not initialized"),
56    )
57    .poll(cx)
58    {
59      Poll::Ready(Some(Ok(chunk))) => {
60        let _ = self.read_future.take();
61        self.current_pos += chunk.len() as u64;
62        if let Some(end) = &self.end {
63          if self.current_pos >= *end {
64            // EOF
65            self.finished = true;
66          }
67        }
68        Poll::Ready(Some(Ok(chunk)))
69      }
70      Poll::Ready(option) => {
71        let _ = self.read_future.take();
72        if option.is_none() {
73          self.finished = true;
74        }
75        Poll::Ready(option)
76      }
77      Poll::Pending => Poll::Pending,
78    }
79  }
80}
81
82#[inline]
83async fn read_chunk(file: Arc<SendWrapper<File>>, pos: u64, end: Option<u64>) -> Option<Result<Bytes, std::io::Error>> {
84  let buffer_sz = end.map_or(MAX_BUFFER_SIZE, |n| ((n - pos) as usize).min(MAX_BUFFER_SIZE));
85  if buffer_sz == 0 {
86    return None;
87  }
88  let buffer_uninit = Box::new_uninit_slice(buffer_sz);
89  // Safety: The buffer is a boxed slice of uninitialized `u8` values. `u8` is a primitive type.
90  let buffer: Box<[u8]> = unsafe { buffer_uninit.assume_init() };
91  let result = file.read_at(buffer, pos).await;
92  match result {
93    (Ok(n), buffer) => {
94      if n == 0 {
95        None
96      } else {
97        let mut bytes = Bytes::from_owner(buffer);
98        bytes.truncate(n);
99        Some(Ok(bytes))
100      }
101    }
102    (Err(e), _) => Some(Err(e)),
103  }
104}