aws_smithy_types/byte_stream/bytestream_util.rs
1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::body::SdkBody;
7use crate::byte_stream::{error::Error, error::ErrorKind, ByteStream};
8use std::cmp::min;
9use std::future::Future;
10use std::path::PathBuf;
11use std::pin::Pin;
12use tokio::fs::File;
13use tokio::io::{self, AsyncReadExt, AsyncSeekExt};
14use tokio_util::io::ReaderStream;
15
16// TODO(https://github.com/smithy-lang/smithy-rs/issues/1925)
17// Feature gating this now would break the
18// `cargo check --no-default-features --features rt-tokio` test.
19// #[cfg(feature = "http-body-0-4-x")]
20mod http_body_0_4_x;
21
22#[cfg(feature = "http-body-1-x")]
23mod http_body_1_x;
24
25// 4KB corresponds to the default buffer size used by Tokio's ReaderStream
26const DEFAULT_BUFFER_SIZE: usize = 4096;
27// By default, read files from their start
28const DEFAULT_OFFSET: u64 = 0;
29
30/// An HTTP Body designed to wrap files
31///
32/// PathBody is a three-phase HTTP body designed to wrap files with three specific features:
33/// 1. The underlying file is wrapped with StreamReader to implement HTTP body
34/// 2. It can be constructed directly from a path so it's easy to use during retries
35/// 3. Provide size hint
36struct PathBody {
37 state: State,
38 // The number of bytes to read
39 length: u64,
40 buffer_size: usize,
41 // The byte-offset to start reading from
42 offset: Option<u64>,
43}
44
45impl PathBody {
46 fn from_path(path_buf: PathBuf, length: u64, buffer_size: usize, offset: Option<u64>) -> Self {
47 PathBody {
48 state: State::Unloaded(path_buf),
49 length,
50 buffer_size,
51 offset,
52 }
53 }
54
55 fn from_file(file: File, length: u64, buffer_size: usize) -> Self {
56 PathBody {
57 state: State::Loaded {
58 stream: ReaderStream::with_capacity(file.take(length), buffer_size),
59 bytes_left: length,
60 },
61 length,
62 buffer_size,
63 // The file used to create this `PathBody` should have already had an offset applied
64 offset: None,
65 }
66 }
67}
68
69/// Builder for creating [`ByteStreams`](ByteStream) from a file/path, with full control over advanced options.
70///
71/// ```no_run
72/// # #[cfg(feature = "rt-tokio")]
73/// # {
74/// use aws_smithy_types::byte_stream::{ByteStream, Length};
75/// use std::path::Path;
76/// struct GetObjectInput {
77/// body: ByteStream
78/// }
79///
80/// async fn bytestream_from_file() -> GetObjectInput {
81/// let bytestream = ByteStream::read_from()
82/// .path("docs/some-large-file.csv")
83/// // Specify the size of the buffer used to read the file (in bytes, default is 4096)
84/// .buffer_size(32_784)
85/// // Specify the length of the file used (skips an additional call to retrieve the size)
86/// .length(Length::UpTo(123_456))
87/// .build()
88/// .await
89/// .expect("valid path");
90/// GetObjectInput { body: bytestream }
91/// }
92/// # }
93/// ```
94#[allow(missing_debug_implementations)]
95pub struct FsBuilder {
96 file: Option<File>,
97 path: Option<PathBuf>,
98 length: Option<Length>,
99 buffer_size: usize,
100 offset: Option<u64>,
101}
102
103impl Default for FsBuilder {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109/// The length (in bytes) to read. Determines whether or not a short read counts as an error.
110#[allow(missing_debug_implementations)]
111pub enum Length {
112 /// Read this number of bytes exactly. Returns an error if the file is smaller than expected.
113 Exact(u64),
114 /// Read up to this number of bytes. May read less than the specified amount if the file
115 /// is smaller than expected.
116 UpTo(u64),
117}
118
119impl FsBuilder {
120 /// Create a new [`FsBuilder`] (using a default read buffer of 4096 bytes).
121 ///
122 /// You must then call either [`file`](FsBuilder::file) or [`path`](FsBuilder::path) to specify what to read from.
123 pub fn new() -> Self {
124 Self {
125 buffer_size: DEFAULT_BUFFER_SIZE,
126 file: None,
127 length: None,
128 offset: None,
129 path: None,
130 }
131 }
132
133 /// Sets the path to read from.
134 ///
135 /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will be retryable.
136 /// The returned ByteStream will provide a size hint when used as an HTTP body.
137 /// If the request fails, the read will begin again by reloading the file handle.
138 pub fn path(mut self, path: impl AsRef<std::path::Path>) -> Self {
139 self.path = Some(path.as_ref().to_path_buf());
140 self
141 }
142
143 /// Sets the file to read from.
144 ///
145 /// NOTE: The resulting ByteStream (after calling [build](FsBuilder::build)) will not be a retryable ByteStream.
146 /// For a ByteStream that can be retried in the case of upstream failures, use [`FsBuilder::path`](FsBuilder::path).
147 pub fn file(mut self, file: File) -> Self {
148 self.file = Some(file);
149 self
150 }
151
152 /// Specify the length to read (in bytes).
153 ///
154 /// By pre-specifying the length, this API skips an additional call to retrieve the size from file-system metadata.
155 ///
156 /// When used in conjunction with [`offset`](FsBuilder::offset), allows for reading a single "chunk" of a file.
157 pub fn length(mut self, length: Length) -> Self {
158 self.length = Some(length);
159 self
160 }
161
162 /// Specify the size of the buffer used to read the file (in bytes).
163 ///
164 /// Increasing the read buffer capacity to higher values than the default (4096 bytes) can result in a large reduction
165 /// in CPU usage, at the cost of memory increase.
166 pub fn buffer_size(mut self, buffer_size: usize) -> Self {
167 self.buffer_size = buffer_size;
168 self
169 }
170
171 /// Specify the offset to start reading from (in bytes)
172 ///
173 /// When used in conjunction with [`length`](FsBuilder::length), allows for reading a single "chunk" of a file.
174 pub fn offset(mut self, offset: u64) -> Self {
175 self.offset = Some(offset);
176 self
177 }
178
179 /// Returns a [`ByteStream`] from this builder.
180 pub async fn build(self) -> Result<ByteStream, Error> {
181 if self.path.is_some() && self.file.is_some() {
182 panic!("The 'file' and 'path' options on an FsBuilder are mutually exclusive but both were set. Please set only one")
183 };
184
185 let buffer_size = self.buffer_size;
186 let offset = self.offset.unwrap_or(DEFAULT_OFFSET);
187 // Checking the file length like this does have a cost, but the benefit is that we can
188 // notify users when file/chunk is smaller than expected.
189 let file_length = self.get_file_size().await?;
190 if offset > file_length {
191 return Err(ErrorKind::OffsetLargerThanFileSize.into());
192 }
193
194 let remaining_file_length = file_length - offset;
195 let length = match self.length {
196 Some(Length::Exact(length)) => {
197 if length > remaining_file_length {
198 return Err(ErrorKind::LengthLargerThanFileSizeMinusReadOffset.into());
199 }
200 length
201 }
202 Some(Length::UpTo(length)) => min(length, remaining_file_length),
203 None => remaining_file_length,
204 };
205
206 if let Some(path) = self.path {
207 let body_loader = move || {
208 // If an offset was provided, seeking will be handled in `PathBody::poll_data` each
209 // time the file is loaded.
210 SdkBody::from_body_0_4_internal(PathBody::from_path(
211 path.clone(),
212 length,
213 buffer_size,
214 self.offset,
215 ))
216 };
217
218 Ok(ByteStream::new(SdkBody::retryable(body_loader)))
219 } else if let Some(mut file) = self.file {
220 // When starting from a `File`, we need to do our own seeking
221 if offset != 0 {
222 let _s = file.seek(io::SeekFrom::Start(offset)).await?;
223 }
224
225 let body =
226 SdkBody::from_body_0_4_internal(PathBody::from_file(file, length, buffer_size));
227
228 Ok(ByteStream::new(body))
229 } else {
230 panic!("FsBuilder constructed without a file or a path")
231 }
232 }
233
234 async fn get_file_size(&self) -> Result<u64, Error> {
235 Ok(match self.path.as_ref() {
236 Some(path) => tokio::fs::metadata(path).await,
237 // If it's not path-based then it's file-based
238 None => self.file.as_ref().unwrap().metadata().await,
239 }
240 .map(|metadata| metadata.len())?)
241 }
242}
243
244enum State {
245 Unloaded(PathBuf),
246 Loading(Pin<Box<dyn Future<Output = io::Result<File>> + Send + Sync + 'static>>),
247 Loaded {
248 stream: ReaderStream<io::Take<File>>,
249 bytes_left: u64,
250 },
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use std::io::Write;
257 use tempfile::NamedTempFile;
258
259 #[tokio::test]
260 async fn length_up_to_should_work() {
261 const FILE_LEN: usize = 1000;
262 // up to less than `FILE_LEN`
263 {
264 let mut file = NamedTempFile::new().unwrap();
265 file.write_all(vec![0; FILE_LEN].as_slice()).unwrap();
266 let byte_stream = FsBuilder::new()
267 .path(file.path())
268 .length(Length::UpTo((FILE_LEN / 2) as u64))
269 .build()
270 .await
271 .unwrap();
272 let (lower, upper) = byte_stream.size_hint();
273 assert_eq!(lower, upper.unwrap());
274 assert_eq!((FILE_LEN / 2) as u64, lower);
275 }
276 // up to equal to `FILE_LEN`
277 {
278 let mut file = NamedTempFile::new().unwrap();
279 file.write_all(vec![0; FILE_LEN].as_slice()).unwrap();
280 let byte_stream = FsBuilder::new()
281 .path(file.path())
282 .length(Length::UpTo(FILE_LEN as u64))
283 .build()
284 .await
285 .unwrap();
286 let (lower, upper) = byte_stream.size_hint();
287 assert_eq!(lower, upper.unwrap());
288 assert_eq!(FILE_LEN as u64, lower);
289 }
290 // up to greater than `FILE_LEN`
291 {
292 let mut file = NamedTempFile::new().unwrap();
293 file.write_all(vec![0; FILE_LEN].as_slice()).unwrap();
294 let byte_stream = FsBuilder::new()
295 .path(file.path())
296 .length(Length::UpTo((FILE_LEN * 2) as u64))
297 .build()
298 .await
299 .unwrap();
300 let (lower, upper) = byte_stream.size_hint();
301 assert_eq!(lower, upper.unwrap());
302 assert_eq!(FILE_LEN as u64, lower);
303 }
304 }
305}