ferron_common/util/
monoio_file_stream_no_spawn.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use futures_util::Stream;
8use monoio::fs::File;
9use send_wrapper::SendWrapper;
10
11const MAX_BUFFER_SIZE: usize = 16384;
12
13#[allow(clippy::type_complexity)]
15pub struct MonoioFileStreamNoSpawn {
16 file: Arc<SendWrapper<File>>,
17 current_pos: u64,
18 end: Option<u64>,
19 finished: bool,
20 read_future: Option<Pin<Box<dyn Future<Output = Option<Result<Bytes, std::io::Error>>> + Send + Sync>>>,
21}
22
23impl MonoioFileStreamNoSpawn {
24 pub fn new(file: File, start: Option<u64>, end: Option<u64>) -> Self {
26 Self {
27 file: Arc::new(SendWrapper::new(file)),
28 current_pos: start.unwrap_or(0),
29 end,
30 finished: false,
31 read_future: None,
32 }
33 }
34}
35
36impl Stream for MonoioFileStreamNoSpawn {
37 type Item = Result<Bytes, std::io::Error>;
38
39 #[inline]
40 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41 if self.finished {
42 return Poll::Ready(None);
43 }
44 if self.read_future.is_none() {
45 self.read_future = Some(Box::pin(SendWrapper::new(read_chunk(
46 self.file.clone(),
47 self.current_pos,
48 self.end,
49 ))));
50 }
51 match Pin::new(
52 self
53 .read_future
54 .as_mut()
55 .expect("file stream read future is not initialized"),
56 )
57 .poll(cx)
58 {
59 Poll::Ready(Some(Ok(chunk))) => {
60 let _ = self.read_future.take();
61 self.current_pos += chunk.len() as u64;
62 if let Some(end) = &self.end {
63 if self.current_pos >= *end {
64 self.finished = true;
66 }
67 }
68 Poll::Ready(Some(Ok(chunk)))
69 }
70 Poll::Ready(option) => {
71 let _ = self.read_future.take();
72 if option.is_none() {
73 self.finished = true;
74 }
75 Poll::Ready(option)
76 }
77 Poll::Pending => Poll::Pending,
78 }
79 }
80}
81
82#[inline]
83async fn read_chunk(file: Arc<SendWrapper<File>>, pos: u64, end: Option<u64>) -> Option<Result<Bytes, std::io::Error>> {
84 let buffer_sz = end.map_or(MAX_BUFFER_SIZE, |n| ((n - pos) as usize).min(MAX_BUFFER_SIZE));
85 if buffer_sz == 0 {
86 return None;
87 }
88 let buffer_uninit = Box::new_uninit_slice(buffer_sz);
89 let buffer: Box<[u8]> = unsafe { buffer_uninit.assume_init() };
91 let result = file.read_at(buffer, pos).await;
92 match result {
93 (Ok(n), buffer) => {
94 if n == 0 {
95 None
96 } else {
97 let mut bytes = Bytes::from_owner(buffer);
98 bytes.truncate(n);
99 Some(Ok(bytes))
100 }
101 }
102 (Err(e), _) => Some(Err(e)),
103 }
104}