aws_smithy_types/byte_stream/
bytestream_util.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::body::SdkBody;
7use crate::byte_stream::{error::Error, error::ErrorKind, ByteStream};
8use std::cmp::min;
9use std::future::Future;
10use std::path::PathBuf;
11use std::pin::Pin;
12use tokio::fs::File;
13use tokio::io::{self, AsyncReadExt, AsyncSeekExt};
14use tokio_util::io::ReaderStream;
15
16// TODO(https://github.com/smithy-lang/smithy-rs/issues/1925)
17//     Feature gating this now would break the
18//     `cargo check --no-default-features --features rt-tokio` test.
19// #[cfg(feature = "http-body-0-4-x")]
20mod http_body_0_4_x;
21
22#[cfg(feature = "http-body-1-x")]
23mod http_body_1_x;
24
25// 4KB corresponds to the default buffer size used by Tokio's ReaderStream
26const DEFAULT_BUFFER_SIZE: usize = 4096;
27// By default, read files from their start
28const DEFAULT_OFFSET: u64 = 0;
29
30/// An HTTP Body designed to wrap files
31///
32/// PathBody is a three-phase HTTP body designed to wrap files with three specific features:
33/// 1. The underlying file is wrapped with StreamReader to implement HTTP body
34/// 2. It can be constructed directly from a path so it's easy to use during retries
35/// 3. Provide size hint
36struct PathBody {
37    state: State,
38    // The number of bytes to read
39    length: u64,
40    buffer_size: usize,
41    // The byte-offset to start reading from
42    offset: Option<u64>,
43}
44
45impl PathBody {
46    fn from_path(path_buf: PathBuf, length: u64, buffer_size: usize, offset: Option<u64>) -> Self {
47        PathBody {
48            state: State::Unloaded(path_buf),
49            length,
50            buffer_size,
51            offset,
52        }
53    }
54
55    fn from_file(file: File, length: u64, buffer_size: usize) -> Self {
56        PathBody {
57            state: State::Loaded {
58                stream: ReaderStream::with_capacity(file.take(length), buffer_size),
59                bytes_left: length,
60            },
61            length,
62            buffer_size,
63            // The file used to create this `PathBody` should have already had an offset applied
64            offset: None,
65        }
66    }
67}
68
69/// Builder for creating [`ByteStreams`](ByteStream) from a file/path, with full control over advanced options.
70///
71/// ```no_run
72/// # #[cfg(feature = "rt-tokio")]
73/// # {
74/// use aws_smithy_types::byte_stream::{ByteStream, Length};
75/// use std::path::Path;
76/// struct GetObjectInput {
77///     body: ByteStream
78/// }
79///
80/// async fn bytestream_from_file() -> GetObjectInput {
81///     let bytestream = ByteStream::read_from()
82///         .path("docs/some-large-file.csv")
83///         // Specify the size of the buffer used to read the file (in bytes, default is 4096)
84///         .buffer_size(32_784)
85///         // Specify the length of the file used (skips an additional call to retrieve the size)
86///         .length(Length::UpTo(123_456))
87///         .build()
88///         .await
89///         .expect("valid path");
90///     GetObjectInput { body: bytestream }
91/// }
92/// # }
93/// ```
94#[allow(missing_debug_implementations)]
95pub struct FsBuilder {
96    file: Option<File>,
97    path: Option<PathBuf>,
98    length: Option<Length>,
99    buffer_size: usize,
100    offset: Option<u64>,
101}
102
103impl Default for FsBuilder {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109/// The length (in bytes) to read. Determines whether or not a short read counts as an error.
110#[allow(missing_debug_implementations)]
111pub enum Length {
112    /// Read this number of bytes exactly. Returns an error if the file is smaller than expected.
113    Exact(u64),
114    /// Read up to this number of bytes. May read less than the specified amount if the file
115    /// is smaller than expected.
116    UpTo(u64),
117}
118
119impl FsBuilder {
120    /// Create a new [`FsBuilder`] (using a default read buffer of 4096 bytes).
121    ///
122    /// You must then call either [`file`](FsBuilder::file) or [`path`](FsBuilder::path) to specify what to read from.
123    pub fn new() -> Self {
124        Self {
125            buffer_size: DEFAULT_BUFFER_SIZE,
126            file: None,
127            length: None,
128            offset: None,
129            path: None,
130        }
131    }
132
133    /// Sets the path to read from.
134    ///
135    /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will be retryable.
136    /// The returned ByteStream will provide a size hint when used as an HTTP body.
137    /// If the request fails, the read will begin again by reloading the file handle.
138    pub fn path(mut self, path: impl AsRef<std::path::Path>) -> Self {
139        self.path = Some(path.as_ref().to_path_buf());
140        self
141    }
142
143    /// Sets the file to read from.
144    ///
145    /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will not be a retryable ByteStream.
146    /// For a ByteStream that can be retried in the case of upstream failures, use [`FsBuilder::path`](FsBuilder::path).
147    pub fn file(mut self, file: File) -> Self {
148        self.file = Some(file);
149        self
150    }
151
152    /// Specify the length to read (in bytes).
153    ///
154    /// By pre-specifying the length, this API skips an additional call to retrieve the size from file-system metadata.
155    ///
156    /// When used in conjunction with [`offset`](FsBuilder::offset), allows for reading a single "chunk" of a file.
157    pub fn length(mut self, length: Length) -> Self {
158        self.length = Some(length);
159        self
160    }
161
162    /// Specify the size of the buffer used to read the file (in bytes).
163    ///
164    /// Increasing the read buffer capacity to higher values than the default (4096 bytes) can result in a large reduction
165    /// in CPU usage, at the cost of memory increase.
166    pub fn buffer_size(mut self, buffer_size: usize) -> Self {
167        self.buffer_size = buffer_size;
168        self
169    }
170
171    /// Specify the offset to start reading from (in bytes)
172    ///
173    /// When used in conjunction with [`length`](FsBuilder::length), allows for reading a single "chunk" of a file.
174    pub fn offset(mut self, offset: u64) -> Self {
175        self.offset = Some(offset);
176        self
177    }
178
179    /// Returns a [`ByteStream`] from this builder.
180    pub async fn build(self) -> Result<ByteStream, Error> {
181        if self.path.is_some() && self.file.is_some() {
182            panic!("The 'file' and 'path' options on an FsBuilder are mutually exclusive but both were set. Please set only one")
183        };
184
185        let buffer_size = self.buffer_size;
186        let offset = self.offset.unwrap_or(DEFAULT_OFFSET);
187        // Checking the file length like this does have a cost, but the benefit is that we can
188        // notify users when file/chunk is smaller than expected.
189        let file_length = self.get_file_size().await?;
190        if offset > file_length {
191            return Err(ErrorKind::OffsetLargerThanFileSize.into());
192        }
193
194        let remaining_file_length = file_length - offset;
195        let length = match self.length {
196            Some(Length::Exact(length)) => {
197                if length > remaining_file_length {
198                    return Err(ErrorKind::LengthLargerThanFileSizeMinusReadOffset.into());
199                }
200                length
201            }
202            Some(Length::UpTo(length)) => min(length, remaining_file_length),
203            None => remaining_file_length,
204        };
205
206        if let Some(path) = self.path {
207            let body_loader = move || {
208                // If an offset was provided, seeking will be handled in `PathBody::poll_data` each
209                // time the file is loaded.
210                SdkBody::from_body_0_4_internal(PathBody::from_path(
211                    path.clone(),
212                    length,
213                    buffer_size,
214                    self.offset,
215                ))
216            };
217
218            Ok(ByteStream::new(SdkBody::retryable(body_loader)))
219        } else if let Some(mut file) = self.file {
220            // When starting from a `File`, we need to do our own seeking
221            if offset != 0 {
222                let _s = file.seek(io::SeekFrom::Start(offset)).await?;
223            }
224
225            let body =
226                SdkBody::from_body_0_4_internal(PathBody::from_file(file, length, buffer_size));
227
228            Ok(ByteStream::new(body))
229        } else {
230            panic!("FsBuilder constructed without a file or a path")
231        }
232    }
233
234    async fn get_file_size(&self) -> Result<u64, Error> {
235        Ok(match self.path.as_ref() {
236            Some(path) => tokio::fs::metadata(path).await,
237            // If it's not path-based then it's file-based
238            None => self.file.as_ref().unwrap().metadata().await,
239        }
240        .map(|metadata| metadata.len())?)
241    }
242}
243
244enum State {
245    Unloaded(PathBuf),
246    Loading(Pin<Box<dyn Future<Output = io::Result<File>> + Send + Sync + 'static>>),
247    Loaded {
248        stream: ReaderStream<io::Take<File>>,
249        bytes_left: u64,
250    },
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use std::io::Write;
257    use tempfile::NamedTempFile;
258
259    #[tokio::test]
260    async fn length_up_to_should_work() {
261        const FILE_LEN: usize = 1000;
262        // up to less than `FILE_LEN`
263        {
264            let mut file = NamedTempFile::new().unwrap();
265            file.write_all(vec![0; FILE_LEN].as_slice()).unwrap();
266            let byte_stream = FsBuilder::new()
267                .path(file.path())
268                .length(Length::UpTo((FILE_LEN / 2) as u64))
269                .build()
270                .await
271                .unwrap();
272            let (lower, upper) = byte_stream.size_hint();
273            assert_eq!(lower, upper.unwrap());
274            assert_eq!((FILE_LEN / 2) as u64, lower);
275        }
276        // up to equal to `FILE_LEN`
277        {
278            let mut file = NamedTempFile::new().unwrap();
279            file.write_all(vec![0; FILE_LEN].as_slice()).unwrap();
280            let byte_stream = FsBuilder::new()
281                .path(file.path())
282                .length(Length::UpTo(FILE_LEN as u64))
283                .build()
284                .await
285                .unwrap();
286            let (lower, upper) = byte_stream.size_hint();
287            assert_eq!(lower, upper.unwrap());
288            assert_eq!(FILE_LEN as u64, lower);
289        }
290        // up to greater than `FILE_LEN`
291        {
292            let mut file = NamedTempFile::new().unwrap();
293            file.write_all(vec![0; FILE_LEN].as_slice()).unwrap();
294            let byte_stream = FsBuilder::new()
295                .path(file.path())
296                .length(Length::UpTo((FILE_LEN * 2) as u64))
297                .build()
298                .await
299                .unwrap();
300            let (lower, upper) = byte_stream.size_hint();
301            assert_eq!(lower, upper.unwrap());
302            assert_eq!(FILE_LEN as u64, lower);
303        }
304    }
305}