tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33/// A reference to an open file on the filesystem.
34///
35/// This is a specialized version of [`std::fs::File`] for usage from the
36/// Tokio runtime.
37///
38/// An instance of a `File` can be read and/or written depending on what options
39/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
40/// cursor that the file contains internally.
41///
42/// A file will not be closed immediately when it goes out of scope if there
43/// are any IO operations that have not yet completed. To ensure that a file is
44/// closed immediately when it is dropped, you should call [`flush`] before
45/// dropping it. Note that this does not ensure that the file has been fully
46/// written to disk; the operating system might keep the changes around in an
47/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
48/// the data to disk.
49///
50/// Reading and writing to a `File` is usually done using the convenience
51/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
52///
53/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
54/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
55/// [`sync_all`]: fn@crate::fs::File::sync_all
56/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
57/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
58///
59/// # Examples
60///
61/// Create a new file and asynchronously write bytes to it:
62///
63/// ```no_run
64/// use tokio::fs::File;
65/// use tokio::io::AsyncWriteExt; // for write_all()
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let mut file = File::create("foo.txt").await?;
69/// file.write_all(b"hello, world!").await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Read the contents of a file into a buffer:
75///
76/// ```no_run
77/// use tokio::fs::File;
78/// use tokio::io::AsyncReadExt; // for read_to_end()
79///
80/// # async fn dox() -> std::io::Result<()> {
81/// let mut file = File::open("foo.txt").await?;
82///
83/// let mut contents = vec![];
84/// file.read_to_end(&mut contents).await?;
85///
86/// println!("len = {}", contents.len());
87/// # Ok(())
88/// # }
89/// ```
90pub struct File {
91 std: Arc<StdFile>,
92 inner: Mutex<Inner>,
93 max_buf_size: usize,
94}
95
96struct Inner {
97 state: State,
98
99 /// Errors from writes/flushes are returned in write/flush calls. If a write
100 /// error is observed while performing a read, it is saved until the next
101 /// write / flush call.
102 last_write_err: Option<io::ErrorKind>,
103
104 pos: u64,
105}
106
107#[derive(Debug)]
108enum State {
109 Idle(Option<Buf>),
110 Busy(JoinHandle<(Operation, Buf)>),
111}
112
113#[derive(Debug)]
114enum Operation {
115 Read(io::Result<usize>),
116 Write(io::Result<()>),
117 Seek(io::Result<u64>),
118}
119
120impl File {
121 /// Attempts to open a file in read-only mode.
122 ///
123 /// See [`OpenOptions`] for more details.
124 ///
125 /// # Errors
126 ///
127 /// This function will return an error if called from outside of the Tokio
128 /// runtime or if path does not already exist. Other errors may also be
129 /// returned according to `OpenOptions::open`.
130 ///
131 /// # Examples
132 ///
133 /// ```no_run
134 /// use tokio::fs::File;
135 /// use tokio::io::AsyncReadExt;
136 ///
137 /// # async fn dox() -> std::io::Result<()> {
138 /// let mut file = File::open("foo.txt").await?;
139 ///
140 /// let mut contents = vec![];
141 /// file.read_to_end(&mut contents).await?;
142 ///
143 /// println!("len = {}", contents.len());
144 /// # Ok(())
145 /// # }
146 /// ```
147 ///
148 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
149 ///
150 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
151 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
152 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
153 Self::options().read(true).open(path).await
154 }
155
156 /// Opens a file in write-only mode.
157 ///
158 /// This function will create a file if it does not exist, and will truncate
159 /// it if it does.
160 ///
161 /// See [`OpenOptions`] for more details.
162 ///
163 /// # Errors
164 ///
165 /// Results in an error if called from outside of the Tokio runtime or if
166 /// the underlying [`create`] call results in an error.
167 ///
168 /// [`create`]: std::fs::File::create
169 ///
170 /// # Examples
171 ///
172 /// ```no_run
173 /// use tokio::fs::File;
174 /// use tokio::io::AsyncWriteExt;
175 ///
176 /// # async fn dox() -> std::io::Result<()> {
177 /// let mut file = File::create("foo.txt").await?;
178 /// file.write_all(b"hello, world!").await?;
179 /// # Ok(())
180 /// # }
181 /// ```
182 ///
183 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
184 ///
185 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
186 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
187 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
188 Self::options()
189 .write(true)
190 .create(true)
191 .truncate(true)
192 .open(path)
193 .await
194 }
195
196 /// Opens a file in read-write mode.
197 ///
198 /// This function will create a file if it does not exist, or return an error
199 /// if it does. This way, if the call succeeds, the file returned is guaranteed
200 /// to be new.
201 ///
202 /// This option is useful because it is atomic. Otherwise between checking
203 /// whether a file exists and creating a new one, the file may have been
204 /// created by another process (a TOCTOU race condition / attack).
205 ///
206 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
207 ///
208 /// See [`OpenOptions`] for more details.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// use tokio::fs::File;
214 /// use tokio::io::AsyncWriteExt;
215 ///
216 /// # async fn dox() -> std::io::Result<()> {
217 /// let mut file = File::create_new("foo.txt").await?;
218 /// file.write_all(b"hello, world!").await?;
219 /// # Ok(())
220 /// # }
221 /// ```
222 ///
223 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
224 ///
225 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
226 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
227 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
228 Self::options()
229 .read(true)
230 .write(true)
231 .create_new(true)
232 .open(path)
233 .await
234 }
235
236 /// Returns a new [`OpenOptions`] object.
237 ///
238 /// This function returns a new `OpenOptions` object that you can use to
239 /// open or create a file with specific options if `open()` or `create()`
240 /// are not appropriate.
241 ///
242 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
243 /// readable code. Instead of
244 /// `OpenOptions::new().append(true).open("example.log")`,
245 /// you can write `File::options().append(true).open("example.log")`. This
246 /// also avoids the need to import `OpenOptions`.
247 ///
248 /// See the [`OpenOptions::new`] function for more details.
249 ///
250 /// # Examples
251 ///
252 /// ```no_run
253 /// use tokio::fs::File;
254 /// use tokio::io::AsyncWriteExt;
255 ///
256 /// # async fn dox() -> std::io::Result<()> {
257 /// let mut f = File::options().append(true).open("example.log").await?;
258 /// f.write_all(b"new line\n").await?;
259 /// # Ok(())
260 /// # }
261 /// ```
262 #[must_use]
263 pub fn options() -> OpenOptions {
264 OpenOptions::new()
265 }
266
267 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
268 ///
269 /// # Examples
270 ///
271 /// ```no_run
272 /// // This line could block. It is not recommended to do this on the Tokio
273 /// // runtime.
274 /// let std_file = std::fs::File::open("foo.txt").unwrap();
275 /// let file = tokio::fs::File::from_std(std_file);
276 /// ```
277 pub fn from_std(std: StdFile) -> File {
278 File {
279 std: Arc::new(std),
280 inner: Mutex::new(Inner {
281 state: State::Idle(Some(Buf::with_capacity(0))),
282 last_write_err: None,
283 pos: 0,
284 }),
285 max_buf_size: DEFAULT_MAX_BUF_SIZE,
286 }
287 }
288
289 /// Attempts to sync all OS-internal metadata to disk.
290 ///
291 /// This function will attempt to ensure that all in-core data reaches the
292 /// filesystem before returning.
293 ///
294 /// # Examples
295 ///
296 /// ```no_run
297 /// use tokio::fs::File;
298 /// use tokio::io::AsyncWriteExt;
299 ///
300 /// # async fn dox() -> std::io::Result<()> {
301 /// let mut file = File::create("foo.txt").await?;
302 /// file.write_all(b"hello, world!").await?;
303 /// file.sync_all().await?;
304 /// # Ok(())
305 /// # }
306 /// ```
307 ///
308 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
309 ///
310 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
311 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
312 pub async fn sync_all(&self) -> io::Result<()> {
313 let mut inner = self.inner.lock().await;
314 inner.complete_inflight().await;
315
316 let std = self.std.clone();
317 asyncify(move || std.sync_all()).await
318 }
319
320 /// This function is similar to `sync_all`, except that it may not
321 /// synchronize file metadata to the filesystem.
322 ///
323 /// This is intended for use cases that must synchronize content, but don't
324 /// need the metadata on disk. The goal of this method is to reduce disk
325 /// operations.
326 ///
327 /// Note that some platforms may simply implement this in terms of `sync_all`.
328 ///
329 /// # Examples
330 ///
331 /// ```no_run
332 /// use tokio::fs::File;
333 /// use tokio::io::AsyncWriteExt;
334 ///
335 /// # async fn dox() -> std::io::Result<()> {
336 /// let mut file = File::create("foo.txt").await?;
337 /// file.write_all(b"hello, world!").await?;
338 /// file.sync_data().await?;
339 /// # Ok(())
340 /// # }
341 /// ```
342 ///
343 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
344 ///
345 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
346 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
347 pub async fn sync_data(&self) -> io::Result<()> {
348 let mut inner = self.inner.lock().await;
349 inner.complete_inflight().await;
350
351 let std = self.std.clone();
352 asyncify(move || std.sync_data()).await
353 }
354
355 /// Truncates or extends the underlying file, updating the size of this file to become size.
356 ///
357 /// If the size is less than the current file's size, then the file will be
358 /// shrunk. If it is greater than the current file's size, then the file
359 /// will be extended to size and have all of the intermediate data filled in
360 /// with 0s.
361 ///
362 /// # Errors
363 ///
364 /// This function will return an error if the file is not opened for
365 /// writing.
366 ///
367 /// # Examples
368 ///
369 /// ```no_run
370 /// use tokio::fs::File;
371 /// use tokio::io::AsyncWriteExt;
372 ///
373 /// # async fn dox() -> std::io::Result<()> {
374 /// let mut file = File::create("foo.txt").await?;
375 /// file.write_all(b"hello, world!").await?;
376 /// file.set_len(10).await?;
377 /// # Ok(())
378 /// # }
379 /// ```
380 ///
381 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
382 ///
383 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
384 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
385 pub async fn set_len(&self, size: u64) -> io::Result<()> {
386 let mut inner = self.inner.lock().await;
387 inner.complete_inflight().await;
388
389 let mut buf = match inner.state {
390 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
391 _ => unreachable!(),
392 };
393
394 let seek = if !buf.is_empty() {
395 Some(SeekFrom::Current(buf.discard_read()))
396 } else {
397 None
398 };
399
400 let std = self.std.clone();
401
402 inner.state = State::Busy(spawn_blocking(move || {
403 let res = if let Some(seek) = seek {
404 (&*std).seek(seek).and_then(|_| std.set_len(size))
405 } else {
406 std.set_len(size)
407 }
408 .map(|()| 0); // the value is discarded later
409
410 // Return the result as a seek
411 (Operation::Seek(res), buf)
412 }));
413
414 let (op, buf) = match inner.state {
415 State::Idle(_) => unreachable!(),
416 State::Busy(ref mut rx) => rx.await?,
417 };
418
419 inner.state = State::Idle(Some(buf));
420
421 match op {
422 Operation::Seek(res) => res.map(|pos| {
423 inner.pos = pos;
424 }),
425 _ => unreachable!(),
426 }
427 }
428
429 /// Queries metadata about the underlying file.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// use tokio::fs::File;
435 ///
436 /// # async fn dox() -> std::io::Result<()> {
437 /// let file = File::open("foo.txt").await?;
438 /// let metadata = file.metadata().await?;
439 ///
440 /// println!("{:?}", metadata);
441 /// # Ok(())
442 /// # }
443 /// ```
444 pub async fn metadata(&self) -> io::Result<Metadata> {
445 let std = self.std.clone();
446 asyncify(move || std.metadata()).await
447 }
448
449 /// Creates a new `File` instance that shares the same underlying file handle
450 /// as the existing `File` instance. Reads, writes, and seeks will affect both
451 /// File instances simultaneously.
452 ///
453 /// # Examples
454 ///
455 /// ```no_run
456 /// use tokio::fs::File;
457 ///
458 /// # async fn dox() -> std::io::Result<()> {
459 /// let file = File::open("foo.txt").await?;
460 /// let file_clone = file.try_clone().await?;
461 /// # Ok(())
462 /// # }
463 /// ```
464 pub async fn try_clone(&self) -> io::Result<File> {
465 self.inner.lock().await.complete_inflight().await;
466 let std = self.std.clone();
467 let std_file = asyncify(move || std.try_clone()).await?;
468 let mut file = File::from_std(std_file);
469 file.set_max_buf_size(self.max_buf_size);
470 Ok(file)
471 }
472
473 /// Destructures `File` into a [`std::fs::File`]. This function is
474 /// async to allow any in-flight operations to complete.
475 ///
476 /// Use `File::try_into_std` to attempt conversion immediately.
477 ///
478 /// # Examples
479 ///
480 /// ```no_run
481 /// use tokio::fs::File;
482 ///
483 /// # async fn dox() -> std::io::Result<()> {
484 /// let tokio_file = File::open("foo.txt").await?;
485 /// let std_file = tokio_file.into_std().await;
486 /// # Ok(())
487 /// # }
488 /// ```
489 pub async fn into_std(mut self) -> StdFile {
490 self.inner.get_mut().complete_inflight().await;
491 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
492 }
493
494 /// Tries to immediately destructure `File` into a [`std::fs::File`].
495 ///
496 /// # Errors
497 ///
498 /// This function will return an error containing the file if some
499 /// operation is in-flight.
500 ///
501 /// # Examples
502 ///
503 /// ```no_run
504 /// use tokio::fs::File;
505 ///
506 /// # async fn dox() -> std::io::Result<()> {
507 /// let tokio_file = File::open("foo.txt").await?;
508 /// let std_file = tokio_file.try_into_std().unwrap();
509 /// # Ok(())
510 /// # }
511 /// ```
512 #[allow(clippy::result_large_err)]
513 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
514 match Arc::try_unwrap(self.std) {
515 Ok(file) => Ok(file),
516 Err(std_file_arc) => {
517 self.std = std_file_arc;
518 Err(self)
519 }
520 }
521 }
522
523 /// Changes the permissions on the underlying file.
524 ///
525 /// # Platform-specific behavior
526 ///
527 /// This function currently corresponds to the `fchmod` function on Unix and
528 /// the `SetFileInformationByHandle` function on Windows. Note that, this
529 /// [may change in the future][changes].
530 ///
531 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
532 ///
533 /// # Errors
534 ///
535 /// This function will return an error if the user lacks permission change
536 /// attributes on the underlying file. It may also return an error in other
537 /// os-specific unspecified cases.
538 ///
539 /// # Examples
540 ///
541 /// ```no_run
542 /// use tokio::fs::File;
543 ///
544 /// # async fn dox() -> std::io::Result<()> {
545 /// let file = File::open("foo.txt").await?;
546 /// let mut perms = file.metadata().await?.permissions();
547 /// perms.set_readonly(true);
548 /// file.set_permissions(perms).await?;
549 /// # Ok(())
550 /// # }
551 /// ```
552 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
553 let std = self.std.clone();
554 asyncify(move || std.set_permissions(perm)).await
555 }
556
557 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
558 ///
559 /// Although Tokio uses a sensible default value for this buffer size, this function would be
560 /// useful for changing that default depending on the situation.
561 ///
562 /// # Examples
563 ///
564 /// ```no_run
565 /// use tokio::fs::File;
566 /// use tokio::io::AsyncWriteExt;
567 ///
568 /// # async fn dox() -> std::io::Result<()> {
569 /// let mut file = File::open("foo.txt").await?;
570 ///
571 /// // Set maximum buffer size to 8 MiB
572 /// file.set_max_buf_size(8 * 1024 * 1024);
573 ///
574 /// let mut buf = vec![1; 1024 * 1024 * 1024];
575 ///
576 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
577 /// file.write_all(&mut buf).await?;
578 /// # Ok(())
579 /// # }
580 /// ```
581 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
582 self.max_buf_size = max_buf_size;
583 }
584
585 /// Get the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
586 pub fn max_buf_size(&self) -> usize {
587 self.max_buf_size
588 }
589}
590
591impl AsyncRead for File {
592 fn poll_read(
593 self: Pin<&mut Self>,
594 cx: &mut Context<'_>,
595 dst: &mut ReadBuf<'_>,
596 ) -> Poll<io::Result<()>> {
597 ready!(crate::trace::trace_leaf(cx));
598
599 let me = self.get_mut();
600 let inner = me.inner.get_mut();
601
602 loop {
603 match inner.state {
604 State::Idle(ref mut buf_cell) => {
605 let mut buf = buf_cell.take().unwrap();
606
607 if !buf.is_empty() || dst.remaining() == 0 {
608 buf.copy_to(dst);
609 *buf_cell = Some(buf);
610 return Poll::Ready(Ok(()));
611 }
612
613 let std = me.std.clone();
614
615 let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
616 inner.state = State::Busy(spawn_blocking(move || {
617 // SAFETY: the `Read` implementation of `std` does not
618 // read from the buffer it is borrowing and correctly
619 // reports the length of the data written into the buffer.
620 let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
621 (Operation::Read(res), buf)
622 }));
623 }
624 State::Busy(ref mut rx) => {
625 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
626
627 match op {
628 Operation::Read(Ok(_)) => {
629 buf.copy_to(dst);
630 inner.state = State::Idle(Some(buf));
631 return Poll::Ready(Ok(()));
632 }
633 Operation::Read(Err(e)) => {
634 assert!(buf.is_empty());
635
636 inner.state = State::Idle(Some(buf));
637 return Poll::Ready(Err(e));
638 }
639 Operation::Write(Ok(())) => {
640 assert!(buf.is_empty());
641 inner.state = State::Idle(Some(buf));
642 continue;
643 }
644 Operation::Write(Err(e)) => {
645 assert!(inner.last_write_err.is_none());
646 inner.last_write_err = Some(e.kind());
647 inner.state = State::Idle(Some(buf));
648 }
649 Operation::Seek(result) => {
650 assert!(buf.is_empty());
651 inner.state = State::Idle(Some(buf));
652 if let Ok(pos) = result {
653 inner.pos = pos;
654 }
655 continue;
656 }
657 }
658 }
659 }
660 }
661 }
662}
663
664impl AsyncSeek for File {
665 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
666 let me = self.get_mut();
667 let inner = me.inner.get_mut();
668
669 match inner.state {
670 State::Busy(_) => Err(io::Error::new(
671 io::ErrorKind::Other,
672 "other file operation is pending, call poll_complete before start_seek",
673 )),
674 State::Idle(ref mut buf_cell) => {
675 let mut buf = buf_cell.take().unwrap();
676
677 // Factor in any unread data from the buf
678 if !buf.is_empty() {
679 let n = buf.discard_read();
680
681 if let SeekFrom::Current(ref mut offset) = pos {
682 *offset += n;
683 }
684 }
685
686 let std = me.std.clone();
687
688 inner.state = State::Busy(spawn_blocking(move || {
689 let res = (&*std).seek(pos);
690 (Operation::Seek(res), buf)
691 }));
692 Ok(())
693 }
694 }
695 }
696
697 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
698 ready!(crate::trace::trace_leaf(cx));
699 let inner = self.inner.get_mut();
700
701 loop {
702 match inner.state {
703 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
704 State::Busy(ref mut rx) => {
705 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
706 inner.state = State::Idle(Some(buf));
707
708 match op {
709 Operation::Read(_) => {}
710 Operation::Write(Err(e)) => {
711 assert!(inner.last_write_err.is_none());
712 inner.last_write_err = Some(e.kind());
713 }
714 Operation::Write(_) => {}
715 Operation::Seek(res) => {
716 if let Ok(pos) = res {
717 inner.pos = pos;
718 }
719 return Poll::Ready(res);
720 }
721 }
722 }
723 }
724 }
725 }
726}
727
728impl AsyncWrite for File {
729 fn poll_write(
730 self: Pin<&mut Self>,
731 cx: &mut Context<'_>,
732 src: &[u8],
733 ) -> Poll<io::Result<usize>> {
734 ready!(crate::trace::trace_leaf(cx));
735 let me = self.get_mut();
736 let inner = me.inner.get_mut();
737
738 if let Some(e) = inner.last_write_err.take() {
739 return Poll::Ready(Err(e.into()));
740 }
741
742 loop {
743 match inner.state {
744 State::Idle(ref mut buf_cell) => {
745 let mut buf = buf_cell.take().unwrap();
746
747 let seek = if !buf.is_empty() {
748 Some(SeekFrom::Current(buf.discard_read()))
749 } else {
750 None
751 };
752
753 let n = buf.copy_from(src, me.max_buf_size);
754 let std = me.std.clone();
755
756 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
757 let res = if let Some(seek) = seek {
758 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
759 } else {
760 buf.write_to(&mut &*std)
761 };
762
763 (Operation::Write(res), buf)
764 })
765 .ok_or_else(|| {
766 io::Error::new(io::ErrorKind::Other, "background task failed")
767 })?;
768
769 inner.state = State::Busy(blocking_task_join_handle);
770
771 return Poll::Ready(Ok(n));
772 }
773 State::Busy(ref mut rx) => {
774 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
775 inner.state = State::Idle(Some(buf));
776
777 match op {
778 Operation::Read(_) => {
779 // We don't care about the result here. The fact
780 // that the cursor has advanced will be reflected in
781 // the next iteration of the loop
782 continue;
783 }
784 Operation::Write(res) => {
785 // If the previous write was successful, continue.
786 // Otherwise, error.
787 res?;
788 continue;
789 }
790 Operation::Seek(_) => {
791 // Ignore the seek
792 continue;
793 }
794 }
795 }
796 }
797 }
798 }
799
800 fn poll_write_vectored(
801 self: Pin<&mut Self>,
802 cx: &mut Context<'_>,
803 bufs: &[io::IoSlice<'_>],
804 ) -> Poll<Result<usize, io::Error>> {
805 ready!(crate::trace::trace_leaf(cx));
806 let me = self.get_mut();
807 let inner = me.inner.get_mut();
808
809 if let Some(e) = inner.last_write_err.take() {
810 return Poll::Ready(Err(e.into()));
811 }
812
813 loop {
814 match inner.state {
815 State::Idle(ref mut buf_cell) => {
816 let mut buf = buf_cell.take().unwrap();
817
818 let seek = if !buf.is_empty() {
819 Some(SeekFrom::Current(buf.discard_read()))
820 } else {
821 None
822 };
823
824 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
825 let std = me.std.clone();
826
827 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
828 let res = if let Some(seek) = seek {
829 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
830 } else {
831 buf.write_to(&mut &*std)
832 };
833
834 (Operation::Write(res), buf)
835 })
836 .ok_or_else(|| {
837 io::Error::new(io::ErrorKind::Other, "background task failed")
838 })?;
839
840 inner.state = State::Busy(blocking_task_join_handle);
841
842 return Poll::Ready(Ok(n));
843 }
844 State::Busy(ref mut rx) => {
845 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
846 inner.state = State::Idle(Some(buf));
847
848 match op {
849 Operation::Read(_) => {
850 // We don't care about the result here. The fact
851 // that the cursor has advanced will be reflected in
852 // the next iteration of the loop
853 continue;
854 }
855 Operation::Write(res) => {
856 // If the previous write was successful, continue.
857 // Otherwise, error.
858 res?;
859 continue;
860 }
861 Operation::Seek(_) => {
862 // Ignore the seek
863 continue;
864 }
865 }
866 }
867 }
868 }
869 }
870
871 fn is_write_vectored(&self) -> bool {
872 true
873 }
874
875 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
876 ready!(crate::trace::trace_leaf(cx));
877 let inner = self.inner.get_mut();
878 inner.poll_flush(cx)
879 }
880
881 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
882 ready!(crate::trace::trace_leaf(cx));
883 self.poll_flush(cx)
884 }
885}
886
887impl From<StdFile> for File {
888 fn from(std: StdFile) -> Self {
889 Self::from_std(std)
890 }
891}
892
893impl fmt::Debug for File {
894 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
895 fmt.debug_struct("tokio::fs::File")
896 .field("std", &self.std)
897 .finish()
898 }
899}
900
901#[cfg(unix)]
902impl std::os::unix::io::AsRawFd for File {
903 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
904 self.std.as_raw_fd()
905 }
906}
907
908#[cfg(unix)]
909impl std::os::unix::io::AsFd for File {
910 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
911 unsafe {
912 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
913 }
914 }
915}
916
917#[cfg(unix)]
918impl std::os::unix::io::FromRawFd for File {
919 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
920 StdFile::from_raw_fd(fd).into()
921 }
922}
923
924cfg_windows! {
925 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
926
927 impl AsRawHandle for File {
928 fn as_raw_handle(&self) -> RawHandle {
929 self.std.as_raw_handle()
930 }
931 }
932
933 impl AsHandle for File {
934 fn as_handle(&self) -> BorrowedHandle<'_> {
935 unsafe {
936 BorrowedHandle::borrow_raw(
937 AsRawHandle::as_raw_handle(self),
938 )
939 }
940 }
941 }
942
943 impl FromRawHandle for File {
944 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
945 StdFile::from_raw_handle(handle).into()
946 }
947 }
948}
949
950impl Inner {
951 async fn complete_inflight(&mut self) {
952 use std::future::poll_fn;
953
954 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
955 }
956
957 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
958 ready!(crate::trace::trace_leaf(cx));
959 match self.poll_flush(cx) {
960 Poll::Ready(Err(e)) => {
961 self.last_write_err = Some(e.kind());
962 Poll::Ready(())
963 }
964 Poll::Ready(Ok(())) => Poll::Ready(()),
965 Poll::Pending => Poll::Pending,
966 }
967 }
968
969 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
970 if let Some(e) = self.last_write_err.take() {
971 return Poll::Ready(Err(e.into()));
972 }
973
974 let (op, buf) = match self.state {
975 State::Idle(_) => return Poll::Ready(Ok(())),
976 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
977 };
978
979 // The buffer is not used here
980 self.state = State::Idle(Some(buf));
981
982 match op {
983 Operation::Read(_) => Poll::Ready(Ok(())),
984 Operation::Write(res) => Poll::Ready(res),
985 Operation::Seek(_) => Poll::Ready(Ok(())),
986 }
987 }
988}
989
990#[cfg(test)]
991mod tests;