quinn/send_stream.rs
1use std::{
2 future::{Future, poll_fn},
3 io,
4 pin::{Pin, pin},
5 task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use proto::{ClosedStream, ConnectionError, FinishError, StreamId, Written};
10use thiserror::Error;
11
12use crate::{
13 VarInt,
14 connection::{ConnectionRef, State},
15};
16
17/// A stream that can only be used to send data
18///
19/// If dropped, streams that haven't been explicitly [`reset()`] will be implicitly [`finish()`]ed,
20/// continuing to (re)transmit previously written data until it has been fully acknowledged or the
21/// connection is closed.
22///
23/// # Cancellation
24///
25/// A `write` method is said to be *cancel-safe* when dropping its future before the future becomes
26/// ready will always result in no data being written to the stream. This is true of methods which
27/// succeed immediately when any progress is made, and is not true of methods which might need to
28/// perform multiple writes internally before succeeding. Each `write` method documents whether it is
29/// cancel-safe.
30///
31/// [`reset()`]: SendStream::reset
32/// [`finish()`]: SendStream::finish
33#[derive(Debug)]
34pub struct SendStream {
35 conn: ConnectionRef,
36 stream: StreamId,
37 is_0rtt: bool,
38}
39
40impl SendStream {
41 pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
42 Self {
43 conn,
44 stream,
45 is_0rtt,
46 }
47 }
48
49 /// Write a buffer into this stream, returning how many bytes were written
50 ///
51 /// Unless this method errors, it waits until some amount of `buf` can be written into this
52 /// stream, and then writes as much as it can without waiting again. Due to congestion and flow
53 /// control, this may be shorter than `buf.len()`. On success this yields the length of the
54 /// prefix that was written.
55 ///
56 /// # Cancel safety
57 ///
58 /// This method is cancellation safe. If this does not resolve, no bytes were written.
59 pub async fn write(&mut self, buf: &[u8]) -> Result<usize, WriteError> {
60 poll_fn(|cx| self.execute_poll(cx, |s| s.write(buf))).await
61 }
62
63 /// Write a buffer into this stream in its entirety
64 ///
65 /// This method repeatedly calls [`write`](Self::write) until all bytes are written, or an
66 /// error occurs.
67 ///
68 /// # Cancel safety
69 ///
70 /// This method is *not* cancellation safe. Even if this does not resolve, some prefix of `buf`
71 /// may have been written when previously polled.
72 pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), WriteError> {
73 while !buf.is_empty() {
74 let written = self.write(buf).await?;
75 buf = &buf[written..];
76 }
77 Ok(())
78 }
79
80 /// Write a slice of [`Bytes`] into this stream, returning how much was written
81 ///
82 /// Bytes to try to write are provided to this method as an array of cheaply cloneable chunks.
83 /// Unless this method errors, it waits until some amount of those bytes can be written into
84 /// this stream, and then writes as much as it can without waiting again. Due to congestion and
85 /// flow control, this may be less than the total number of bytes.
86 ///
87 /// On success, this method both mutates `bufs` and yields an informative [`Written`] struct
88 /// indicating how much was written:
89 ///
90 /// - [`Bytes`] chunks that were fully written are mutated to be [empty](Bytes::is_empty).
91 /// - If a [`Bytes`] chunk was partially written, it is [split to](Bytes::split_to) contain
92 /// only the suffix of bytes that were not written.
93 /// - The yielded [`Written`] struct indicates how many chunks were fully written as well as
94 /// how many bytes were written.
95 ///
96 /// # Cancel safety
97 ///
98 /// This method is cancellation safe. If this does not resolve, no bytes were written.
99 pub async fn write_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Written, WriteError> {
100 poll_fn(|cx| self.execute_poll(cx, |s| s.write_chunks(bufs))).await
101 }
102
103 /// Write a single [`Bytes`] into this stream in its entirety
104 ///
105 /// Bytes to write are provided to this method as an single cheaply cloneable chunk. This
106 /// method repeatedly calls [`write_chunks`](Self::write_chunks) until all bytes are written,
107 /// or an error occurs.
108 ///
109 /// # Cancel safety
110 ///
111 /// This method is *not* cancellation safe. Even if this does not resolve, some bytes may have
112 /// been written when previously polled.
113 pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> {
114 self.write_all_chunks(&mut [buf]).await?;
115 Ok(())
116 }
117
118 /// Write a slice of [`Bytes`] into this stream in its entirety
119 ///
120 /// Bytes to write are provided to this method as an array of cheaply cloneable chunks. This
121 /// method repeatedly calls [`write_chunks`](Self::write_chunks) until all bytes are written,
122 /// or an error occurs. This method mutates `bufs` by mutating all chunks to be
123 /// [empty](Bytes::is_empty).
124 ///
125 /// # Cancel safety
126 ///
127 /// This method is *not* cancellation safe. Even if this does not resolve, some bytes may have
128 /// been written when previously polled.
129 pub async fn write_all_chunks(&mut self, mut bufs: &mut [Bytes]) -> Result<(), WriteError> {
130 while !bufs.is_empty() {
131 let written = self.write_chunks(bufs).await?;
132 bufs = &mut bufs[written.chunks..];
133 }
134 Ok(())
135 }
136
137 fn execute_poll<F, R>(&mut self, cx: &mut Context, write_fn: F) -> Poll<Result<R, WriteError>>
138 where
139 F: FnOnce(&mut proto::SendStream) -> Result<R, proto::WriteError>,
140 {
141 use proto::WriteError::*;
142 let mut conn = self.conn.state.lock("SendStream::poll_write");
143 if self.is_0rtt {
144 conn.check_0rtt()
145 .map_err(|()| WriteError::ZeroRttRejected)?;
146 }
147 if let Some(ref x) = conn.error {
148 return Poll::Ready(Err(WriteError::ConnectionLost(x.clone())));
149 }
150
151 let result = match write_fn(&mut conn.inner.send_stream(self.stream)) {
152 Ok(result) => result,
153 Err(Blocked) => {
154 conn.blocked_writers.insert(self.stream, cx.waker().clone());
155 return Poll::Pending;
156 }
157 Err(Stopped(error_code)) => {
158 return Poll::Ready(Err(WriteError::Stopped(error_code)));
159 }
160 Err(ClosedStream) => {
161 return Poll::Ready(Err(WriteError::ClosedStream));
162 }
163 };
164
165 conn.wake();
166 Poll::Ready(Ok(result))
167 }
168
169 /// Notify the peer that no more data will ever be written to this stream
170 ///
171 /// It is an error to write to a [`SendStream`] after `finish()`ing it. [`reset()`](Self::reset)
172 /// may still be called after `finish` to abandon transmission of any stream data that might
173 /// still be buffered.
174 ///
175 /// To wait for the peer to receive all buffered stream data, see [`stopped()`](Self::stopped).
176 ///
177 /// May fail if [`finish()`](Self::finish) or [`reset()`](Self::reset) was previously
178 /// called. This error is harmless and serves only to indicate that the caller may have
179 /// incorrect assumptions about the stream's state.
180 pub fn finish(&mut self) -> Result<(), ClosedStream> {
181 let mut conn = self.conn.state.lock("finish");
182 match conn.inner.send_stream(self.stream).finish() {
183 Ok(()) => {
184 conn.wake();
185 Ok(())
186 }
187 Err(FinishError::ClosedStream) => Err(ClosedStream::default()),
188 // Harmless. If the application needs to know about stopped streams at this point, it
189 // should call `stopped`.
190 Err(FinishError::Stopped(_)) => Ok(()),
191 }
192 }
193
194 /// Close the send stream immediately.
195 ///
196 /// No new data can be written after calling this method. Locally buffered data is dropped, and
197 /// previously transmitted data will no longer be retransmitted if lost. If an attempt has
198 /// already been made to finish the stream, the peer may still receive all written data.
199 ///
200 /// May fail if [`finish()`](Self::finish) or [`reset()`](Self::reset) was previously
201 /// called. This error is harmless and serves only to indicate that the caller may have
202 /// incorrect assumptions about the stream's state.
203 pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
204 let mut conn = self.conn.state.lock("SendStream::reset");
205 if self.is_0rtt && conn.check_0rtt().is_err() {
206 return Ok(());
207 }
208 conn.inner.send_stream(self.stream).reset(error_code)?;
209 conn.wake();
210 Ok(())
211 }
212
213 /// Set the priority of the send stream
214 ///
215 /// Every send stream has an initial priority of 0. Locally buffered data from streams with
216 /// higher priority will be transmitted before data from streams with lower priority. Changing
217 /// the priority of a stream with pending data may only take effect after that data has been
218 /// transmitted. Using many different priority levels per connection may have a negative
219 /// impact on performance.
220 pub fn set_priority(&self, priority: i32) -> Result<(), ClosedStream> {
221 let mut conn = self.conn.state.lock("SendStream::set_priority");
222 conn.inner.send_stream(self.stream).set_priority(priority)?;
223 Ok(())
224 }
225
226 /// Get the priority of the send stream
227 pub fn priority(&self) -> Result<i32, ClosedStream> {
228 let mut conn = self.conn.state.lock("SendStream::priority");
229 conn.inner.send_stream(self.stream).priority()
230 }
231
232 /// Completes when the peer stops the stream or reads the stream to completion
233 ///
234 /// Yields `Some` with the stop error code if the peer stops the stream. Yields `None` if the
235 /// local side [`finish()`](Self::finish)es the stream and then the peer acknowledges receipt
236 /// of all stream data (although not necessarily the processing of it), after which the peer
237 /// closing the stream is no longer meaningful.
238 ///
239 /// For a variety of reasons, the peer may not send acknowledgements immediately upon receiving
240 /// data. As such, relying on `stopped` to know when the peer has read a stream to completion
241 /// may introduce more latency than using an application-level response of some sort.
242 pub fn stopped(
243 &self,
244 ) -> impl Future<Output = Result<Option<VarInt>, StoppedError>> + Send + Sync + 'static {
245 let conn = self.conn.clone();
246 let stream = self.stream;
247 let is_0rtt = self.is_0rtt;
248 async move {
249 loop {
250 // The `Notify::notified` future needs to be created while the lock is being held,
251 // otherwise a wakeup could be missed if triggered inbetween releasing the lock
252 // and creating the future.
253 // The lock may only be held in a block without `await`s, otherwise the future
254 // becomes `!Send`. `Notify::notified` is lifetime-bound to `Notify`, therefore
255 // we need to declare `notify` outside of the block, and initialize it inside.
256 let notify;
257 {
258 let mut conn = conn.state.lock("SendStream::stopped");
259 if let Some(output) = send_stream_stopped(&mut conn, stream, is_0rtt) {
260 return output;
261 }
262
263 notify = conn.stopped.entry(stream).or_default().clone();
264 notify.notified()
265 }
266 .await
267 }
268 }
269 }
270
271 /// Get the identity of this stream
272 pub fn id(&self) -> StreamId {
273 self.stream
274 }
275
276 /// Attempt to write bytes from buf into the stream.
277 ///
278 /// On success, returns Poll::Ready(Ok(num_bytes_written)).
279 ///
280 /// If the stream is not ready for writing, the method returns Poll::Pending and arranges
281 /// for the current task (via cx.waker().wake_by_ref()) to receive a notification when the
282 /// stream becomes writable or is closed.
283 pub fn poll_write(
284 self: Pin<&mut Self>,
285 cx: &mut Context,
286 buf: &[u8],
287 ) -> Poll<Result<usize, WriteError>> {
288 pin!(self.get_mut().write(buf)).as_mut().poll(cx)
289 }
290}
291
292/// Check if a send stream is stopped.
293///
294/// Returns `Some` if the stream is stopped or the connection is closed.
295/// Returns `None` if the stream is not stopped.
296fn send_stream_stopped(
297 conn: &mut State,
298 stream: StreamId,
299 is_0rtt: bool,
300) -> Option<Result<Option<VarInt>, StoppedError>> {
301 if is_0rtt && conn.check_0rtt().is_err() {
302 return Some(Err(StoppedError::ZeroRttRejected));
303 }
304 match conn.inner.send_stream(stream).stopped() {
305 Err(ClosedStream { .. }) => Some(Ok(None)),
306 Ok(Some(error_code)) => Some(Ok(Some(error_code))),
307 Ok(None) => conn.error.clone().map(|error| Err(error.into())),
308 }
309}
310
311#[cfg(feature = "futures-io")]
312impl futures_io::AsyncWrite for SendStream {
313 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
314 self.poll_write(cx, buf).map_err(Into::into)
315 }
316
317 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
318 Poll::Ready(Ok(()))
319 }
320
321 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
322 Poll::Ready(self.get_mut().finish().map_err(Into::into))
323 }
324}
325
326impl tokio::io::AsyncWrite for SendStream {
327 fn poll_write(
328 self: Pin<&mut Self>,
329 cx: &mut Context<'_>,
330 buf: &[u8],
331 ) -> Poll<io::Result<usize>> {
332 self.poll_write(cx, buf).map_err(Into::into)
333 }
334
335 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
336 Poll::Ready(Ok(()))
337 }
338
339 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
340 Poll::Ready(self.get_mut().finish().map_err(Into::into))
341 }
342}
343
344impl Drop for SendStream {
345 fn drop(&mut self) {
346 let mut conn = self.conn.state.lock("SendStream::drop");
347
348 // clean up any previously registered wakers
349 conn.blocked_writers.remove(&self.stream);
350
351 if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
352 return;
353 }
354 match conn.inner.send_stream(self.stream).finish() {
355 Ok(()) => conn.wake(),
356 Err(FinishError::Stopped(reason)) => {
357 if conn.inner.send_stream(self.stream).reset(reason).is_ok() {
358 conn.wake();
359 }
360 }
361 // Already finished or reset, which is fine.
362 Err(FinishError::ClosedStream) => {}
363 }
364 }
365}
366
367/// Errors that arise from writing to a stream
368#[derive(Debug, Error, Clone, PartialEq, Eq)]
369pub enum WriteError {
370 /// The peer is no longer accepting data on this stream
371 ///
372 /// Carries an application-defined error code.
373 #[error("sending stopped by peer: error {0}")]
374 Stopped(VarInt),
375 /// The connection was lost
376 #[error("connection lost")]
377 ConnectionLost(#[from] ConnectionError),
378 /// The stream has already been finished or reset
379 #[error("closed stream")]
380 ClosedStream,
381 /// This was a 0-RTT stream and the server rejected it
382 ///
383 /// Can only occur on clients for 0-RTT streams, which can be opened using
384 /// [`Connecting::into_0rtt()`].
385 ///
386 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
387 #[error("0-RTT rejected")]
388 ZeroRttRejected,
389}
390
391impl From<ClosedStream> for WriteError {
392 #[inline]
393 fn from(_: ClosedStream) -> Self {
394 Self::ClosedStream
395 }
396}
397
398impl From<StoppedError> for WriteError {
399 fn from(x: StoppedError) -> Self {
400 match x {
401 StoppedError::ConnectionLost(e) => Self::ConnectionLost(e),
402 StoppedError::ZeroRttRejected => Self::ZeroRttRejected,
403 }
404 }
405}
406
407impl From<WriteError> for io::Error {
408 fn from(x: WriteError) -> Self {
409 use WriteError::*;
410 let kind = match x {
411 Stopped(_) | ZeroRttRejected => io::ErrorKind::ConnectionReset,
412 ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
413 };
414 Self::new(kind, x)
415 }
416}
417
418/// Errors that arise while monitoring for a send stream stop from the peer
419#[derive(Debug, Error, Clone, PartialEq, Eq)]
420pub enum StoppedError {
421 /// The connection was lost
422 #[error("connection lost")]
423 ConnectionLost(#[from] ConnectionError),
424 /// This was a 0-RTT stream and the server rejected it
425 ///
426 /// Can only occur on clients for 0-RTT streams, which can be opened using
427 /// [`Connecting::into_0rtt()`].
428 ///
429 /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
430 #[error("0-RTT rejected")]
431 ZeroRttRejected,
432}
433
434impl From<StoppedError> for io::Error {
435 fn from(x: StoppedError) -> Self {
436 use StoppedError::*;
437 let kind = match x {
438 ZeroRttRejected => io::ErrorKind::ConnectionReset,
439 ConnectionLost(_) => io::ErrorKind::NotConnected,
440 };
441 Self::new(kind, x)
442 }
443}