quinn/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
17use tracing::{Instrument, Span, debug_span};
18
19use crate::{
20    ConnectionEvent, Duration, Instant, VarInt,
21    mutex::Mutex,
22    recv_stream::RecvStream,
23    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
24    send_stream::SendStream,
25    udp_transmit,
26};
27use proto::{
28    ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, Side, StreamEvent,
29    StreamId, congestion::Controller,
30};
31
32/// In-progress connection attempt future
33#[derive(Debug)]
34pub struct Connecting {
35    conn: Option<ConnectionRef>,
36    connected: oneshot::Receiver<bool>,
37    handshake_data_ready: Option<oneshot::Receiver<()>>,
38}
39
40impl Connecting {
41    pub(crate) fn new(
42        handle: ConnectionHandle,
43        conn: proto::Connection,
44        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
45        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
46        socket: Arc<dyn AsyncUdpSocket>,
47        runtime: Arc<dyn Runtime>,
48    ) -> Self {
49        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
50        let (on_connected_send, on_connected_recv) = oneshot::channel();
51        let conn = ConnectionRef::new(
52            handle,
53            conn,
54            endpoint_events,
55            conn_events,
56            on_handshake_data_send,
57            on_connected_send,
58            socket,
59            runtime.clone(),
60        );
61
62        let driver = ConnectionDriver(conn.clone());
63        runtime.spawn(Box::pin(
64            async {
65                if let Err(e) = driver.await {
66                    tracing::error!("I/O error: {e}");
67                }
68            }
69            .instrument(Span::current()),
70        ));
71
72        Self {
73            conn: Some(conn),
74            connected: on_connected_recv,
75            handshake_data_ready: Some(on_handshake_data_recv),
76        }
77    }
78
79    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
80    ///
81    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
82    /// If so, the returned [`Connection`] can be used to send application data without waiting for
83    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
84    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
85    /// complete, at which point subsequently opened streams and written data will have full
86    /// cryptographic protection.
87    ///
88    /// ## Outgoing
89    ///
90    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
91    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
92    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
93    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
94    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
95    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
96    /// If it was rejected, the existence of streams opened and other application data sent prior
97    /// to the handshake completing will not be conveyed to the remote application, and local
98    /// operations on them will return `ZeroRttRejected` errors.
99    ///
100    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
101    /// relevant resumption state to be stored in the server, which servers may limit or lose for
102    /// various reasons including not persisting resumption state across server restarts.
103    ///
104    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
105    /// implementation's docs for 0-RTT pitfalls.
106    ///
107    /// ## Incoming
108    ///
109    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
110    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
111    ///
112    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
113    /// implementation's docs for 0-RTT pitfalls.
114    ///
115    /// ## Security
116    ///
117    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
118    /// replay attacks, and should therefore never invoke non-idempotent operations.
119    ///
120    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
121    /// before TLS client authentication has occurred, and should therefore not be used to send
122    /// data for which client authentication is being used.
123    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
124        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
125        // have to release it explicitly before returning `self` by value.
126        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
127
128        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
129        drop(conn);
130
131        if is_ok {
132            let conn = self.conn.take().unwrap();
133            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
134        } else {
135            Err(self)
136        }
137    }
138
139    /// Parameters negotiated during the handshake
140    ///
141    /// The dynamic type returned is determined by the configured
142    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
143    /// be [`downcast`](Box::downcast) to a
144    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
145    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
146        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
147        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
148        // simple.
149        if let Some(x) = self.handshake_data_ready.take() {
150            let _ = x.await;
151        }
152        let conn = self.conn.as_ref().unwrap();
153        let inner = conn.state.lock("handshake");
154        inner
155            .inner
156            .crypto_session()
157            .handshake_data()
158            .ok_or_else(|| {
159                inner
160                    .error
161                    .clone()
162                    .expect("spurious handshake data ready notification")
163            })
164    }
165
166    /// The local IP address which was used when the peer established
167    /// the connection
168    ///
169    /// This can be different from the address the endpoint is bound to, in case
170    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
171    ///
172    /// This will return `None` for clients, or when the platform does not expose this
173    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
174    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
175    ///
176    /// Will panic if called after `poll` has returned `Ready`.
177    pub fn local_ip(&self) -> Option<IpAddr> {
178        let conn = self.conn.as_ref().unwrap();
179        let inner = conn.state.lock("local_ip");
180
181        inner.inner.local_ip()
182    }
183
184    /// The peer's UDP address
185    ///
186    /// Will panic if called after `poll` has returned `Ready`.
187    pub fn remote_address(&self) -> SocketAddr {
188        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
189        conn_ref.state.lock("remote_address").inner.remote_address()
190    }
191}
192
193impl Future for Connecting {
194    type Output = Result<Connection, ConnectionError>;
195    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
196        Pin::new(&mut self.connected).poll(cx).map(|_| {
197            let conn = self.conn.take().unwrap();
198            let inner = conn.state.lock("connecting");
199            if inner.connected {
200                drop(inner);
201                Ok(Connection(conn))
202            } else {
203                Err(inner
204                    .error
205                    .clone()
206                    .expect("connected signaled without connection success or error"))
207            }
208        })
209    }
210}
211
212/// Future that completes when a connection is fully established
213///
214/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
215/// value is meaningless.
216pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
217
218impl Future for ZeroRttAccepted {
219    type Output = bool;
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
222    }
223}
224
225/// A future that drives protocol logic for a connection
226///
227/// This future handles the protocol logic for a single connection, routing events from the
228/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
229/// It also keeps track of outstanding timeouts for the `Connection`.
230///
231/// If the connection encounters an error condition, this future will yield an error. It will
232/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
233/// connection-related futures, this waits for the draining period to complete to ensure that
234/// packets still in flight from the peer are handled gracefully.
235#[must_use = "connection drivers must be spawned for their connections to function"]
236#[derive(Debug)]
237struct ConnectionDriver(ConnectionRef);
238
239impl Future for ConnectionDriver {
240    type Output = Result<(), io::Error>;
241
242    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
243        let conn = &mut *self.0.state.lock("poll");
244
245        let span = debug_span!("drive", id = conn.handle.0);
246        let _guard = span.enter();
247
248        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
249            conn.terminate(e, &self.0.shared);
250            return Poll::Ready(Ok(()));
251        }
252        let mut keep_going = conn.drive_transmit(cx)?;
253        // If a timer expires, there might be more to transmit. When we transmit something, we
254        // might need to reset a timer. Hence, we must loop until neither happens.
255        keep_going |= conn.drive_timer(cx);
256        conn.forward_endpoint_events();
257        conn.forward_app_events(&self.0.shared);
258
259        if !conn.inner.is_drained() {
260            if keep_going {
261                // If the connection hasn't processed all tasks, schedule it again
262                cx.waker().wake_by_ref();
263            } else {
264                conn.driver = Some(cx.waker().clone());
265            }
266            return Poll::Pending;
267        }
268        if conn.error.is_none() {
269            unreachable!("drained connections always have an error");
270        }
271        Poll::Ready(Ok(()))
272    }
273}
274
275/// A QUIC connection.
276///
277/// If all references to a connection (including every clone of the `Connection` handle, streams of
278/// incoming streams, and the various stream types) have been dropped, then the connection will be
279/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
280/// connection explicitly by calling [`Connection::close()`].
281///
282/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
283/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
284/// application. [`Connection::close()`] describes in more detail how to gracefully close a
285/// connection without losing application data.
286///
287/// May be cloned to obtain another handle to the same connection.
288///
289/// [`Connection::close()`]: Connection::close
290#[derive(Debug, Clone)]
291pub struct Connection(ConnectionRef);
292
293impl Connection {
294    /// Initiate a new outgoing unidirectional stream.
295    ///
296    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
297    /// consequence, the peer won't be notified that a stream has been opened until the stream is
298    /// actually used.
299    pub fn open_uni(&self) -> OpenUni<'_> {
300        OpenUni {
301            conn: &self.0,
302            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
303        }
304    }
305
306    /// Initiate a new outgoing bidirectional stream.
307    ///
308    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
309    /// consequence, the peer won't be notified that a stream has been opened until the stream is
310    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
311    /// anything to [`SendStream`] will never succeed.
312    ///
313    /// [`open_bi()`]: crate::Connection::open_bi
314    /// [`SendStream`]: crate::SendStream
315    /// [`RecvStream`]: crate::RecvStream
316    pub fn open_bi(&self) -> OpenBi<'_> {
317        OpenBi {
318            conn: &self.0,
319            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
320        }
321    }
322
323    /// Accept the next incoming uni-directional stream
324    pub fn accept_uni(&self) -> AcceptUni<'_> {
325        AcceptUni {
326            conn: &self.0,
327            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
328        }
329    }
330
331    /// Accept the next incoming bidirectional stream
332    ///
333    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
334    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
335    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
336    ///
337    /// [`accept_bi()`]: crate::Connection::accept_bi
338    /// [`open_bi()`]: crate::Connection::open_bi
339    /// [`SendStream`]: crate::SendStream
340    /// [`RecvStream`]: crate::RecvStream
341    pub fn accept_bi(&self) -> AcceptBi<'_> {
342        AcceptBi {
343            conn: &self.0,
344            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
345        }
346    }
347
348    /// Receive an application datagram
349    pub fn read_datagram(&self) -> ReadDatagram<'_> {
350        ReadDatagram {
351            conn: &self.0,
352            notify: self.0.shared.datagram_received.notified(),
353        }
354    }
355
356    /// Wait for the connection to be closed for any reason
357    ///
358    /// Despite the return type's name, closed connections are often not an error condition at the
359    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
360    /// and [`ConnectionError::ApplicationClosed`].
361    pub async fn closed(&self) -> ConnectionError {
362        {
363            let conn = self.0.state.lock("closed");
364            if let Some(error) = conn.error.as_ref() {
365                return error.clone();
366            }
367            // Construct the future while the lock is held to ensure we can't miss a wakeup if
368            // the `Notify` is signaled immediately after we release the lock. `await` it after
369            // the lock guard is out of scope.
370            self.0.shared.closed.notified()
371        }
372        .await;
373        self.0
374            .state
375            .lock("closed")
376            .error
377            .as_ref()
378            .expect("closed without an error")
379            .clone()
380    }
381
382    /// If the connection is closed, the reason why.
383    ///
384    /// Returns `None` if the connection is still open.
385    pub fn close_reason(&self) -> Option<ConnectionError> {
386        self.0.state.lock("close_reason").error.clone()
387    }
388
389    /// Close the connection immediately.
390    ///
391    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
392    /// more data is sent to the peer and the peer may drop buffered data upon receiving
393    /// the CONNECTION_CLOSE frame.
394    ///
395    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
396    ///
397    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
398    /// is preserved in full, it should be kept under 1KiB.
399    ///
400    /// # Gracefully closing a connection
401    ///
402    /// Only the peer last receiving application data can be certain that all data is
403    /// delivered. The only reliable action it can then take is to close the connection,
404    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
405    /// frame is very likely if both endpoints stay online long enough, and
406    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
407    /// remote peer will time out the connection, provided that the idle timeout is not
408    /// disabled.
409    ///
410    /// The sending side can not guarantee all stream data is delivered to the remote
411    /// application. It only knows the data is delivered to the QUIC stack of the remote
412    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
413    /// [`close()`] the remote endpoint may drop any data it received but is as yet
414    /// undelivered to the application, including data that was acknowledged as received to
415    /// the local endpoint.
416    ///
417    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
418    /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
419    /// [`close()`]: Connection::close
420    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
421        let conn = &mut *self.0.state.lock("close");
422        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
423    }
424
425    /// Transmit `data` as an unreliable, unordered application datagram
426    ///
427    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
428    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
429    /// dictated by the peer.
430    ///
431    /// Previously queued datagrams which are still unsent may be discarded to make space for this
432    /// datagram, in order of oldest to newest.
433    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
434        let conn = &mut *self.0.state.lock("send_datagram");
435        if let Some(ref x) = conn.error {
436            return Err(SendDatagramError::ConnectionLost(x.clone()));
437        }
438        use proto::SendDatagramError::*;
439        match conn.inner.datagrams().send(data, true) {
440            Ok(()) => {
441                conn.wake();
442                Ok(())
443            }
444            Err(e) => Err(match e {
445                Blocked(..) => unreachable!(),
446                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
447                Disabled => SendDatagramError::Disabled,
448                TooLarge => SendDatagramError::TooLarge,
449            }),
450        }
451    }
452
453    /// Transmit `data` as an unreliable, unordered application datagram
454    ///
455    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
456    /// conditions, which effectively prioritizes old datagrams over new datagrams.
457    ///
458    /// See [`send_datagram()`] for details.
459    ///
460    /// [`send_datagram()`]: Connection::send_datagram
461    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
462        SendDatagram {
463            conn: &self.0,
464            data: Some(data),
465            notify: self.0.shared.datagrams_unblocked.notified(),
466        }
467    }
468
469    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
470    ///
471    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
472    ///
473    /// This may change over the lifetime of a connection according to variation in the path MTU
474    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
475    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
476    ///
477    /// Not necessarily the maximum size of received datagrams.
478    ///
479    /// [`send_datagram()`]: Connection::send_datagram
480    pub fn max_datagram_size(&self) -> Option<usize> {
481        self.0
482            .state
483            .lock("max_datagram_size")
484            .inner
485            .datagrams()
486            .max_size()
487    }
488
489    /// Bytes available in the outgoing datagram buffer
490    ///
491    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
492    /// at most this size is guaranteed not to cause older datagrams to be dropped.
493    pub fn datagram_send_buffer_space(&self) -> usize {
494        self.0
495            .state
496            .lock("datagram_send_buffer_space")
497            .inner
498            .datagrams()
499            .send_buffer_space()
500    }
501
502    /// The side of the connection (client or server)
503    pub fn side(&self) -> Side {
504        self.0.state.lock("side").inner.side()
505    }
506
507    /// The peer's UDP address
508    ///
509    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
510    /// switching to a cellular internet connection.
511    pub fn remote_address(&self) -> SocketAddr {
512        self.0.state.lock("remote_address").inner.remote_address()
513    }
514
515    /// The local IP address which was used when the peer established
516    /// the connection
517    ///
518    /// This can be different from the address the endpoint is bound to, in case
519    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
520    ///
521    /// This will return `None` for clients, or when the platform does not expose this
522    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
523    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
524    pub fn local_ip(&self) -> Option<IpAddr> {
525        self.0.state.lock("local_ip").inner.local_ip()
526    }
527
528    /// Current best estimate of this connection's latency (round-trip-time)
529    pub fn rtt(&self) -> Duration {
530        self.0.state.lock("rtt").inner.rtt()
531    }
532
533    /// Returns connection statistics
534    pub fn stats(&self) -> ConnectionStats {
535        self.0.state.lock("stats").inner.stats()
536    }
537
538    /// Current state of the congestion control algorithm, for debugging purposes
539    pub fn congestion_state(&self) -> Box<dyn Controller> {
540        self.0
541            .state
542            .lock("congestion_state")
543            .inner
544            .congestion_state()
545            .clone_box()
546    }
547
548    /// Parameters negotiated during the handshake
549    ///
550    /// Guaranteed to return `Some` on fully established connections or after
551    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
552    /// the returned value.
553    ///
554    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
555    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
556        self.0
557            .state
558            .lock("handshake_data")
559            .inner
560            .crypto_session()
561            .handshake_data()
562    }
563
564    /// Cryptographic identity of the peer
565    ///
566    /// The dynamic type returned is determined by the configured
567    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
568    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
569    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
570        self.0
571            .state
572            .lock("peer_identity")
573            .inner
574            .crypto_session()
575            .peer_identity()
576    }
577
578    /// A stable identifier for this connection
579    ///
580    /// Peer addresses and connection IDs can change, but this value will remain
581    /// fixed for the lifetime of the connection.
582    pub fn stable_id(&self) -> usize {
583        self.0.stable_id()
584    }
585
586    /// Update traffic keys spontaneously
587    ///
588    /// This primarily exists for testing purposes.
589    pub fn force_key_update(&self) {
590        self.0
591            .state
592            .lock("force_key_update")
593            .inner
594            .force_key_update()
595    }
596
597    /// Derive keying material from this connection's TLS session secrets.
598    ///
599    /// When both peers call this method with the same `label` and `context`
600    /// arguments and `output` buffers of equal length, they will get the
601    /// same sequence of bytes in `output`. These bytes are cryptographically
602    /// strong and pseudorandom, and are suitable for use as keying material.
603    ///
604    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
605    pub fn export_keying_material(
606        &self,
607        output: &mut [u8],
608        label: &[u8],
609        context: &[u8],
610    ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
611        self.0
612            .state
613            .lock("export_keying_material")
614            .inner
615            .crypto_session()
616            .export_keying_material(output, label, context)
617    }
618
619    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
620    ///
621    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
622    /// `count`s increase both minimum and worst-case memory consumption.
623    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
624        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
625        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
626        // May need to send MAX_STREAMS to make progress
627        conn.wake();
628    }
629
630    /// See [`proto::TransportConfig::send_window()`]
631    pub fn set_send_window(&self, send_window: u64) {
632        let mut conn = self.0.state.lock("set_send_window");
633        conn.inner.set_send_window(send_window);
634        conn.wake();
635    }
636
637    /// See [`proto::TransportConfig::receive_window()`]
638    pub fn set_receive_window(&self, receive_window: VarInt) {
639        let mut conn = self.0.state.lock("set_receive_window");
640        conn.inner.set_receive_window(receive_window);
641        conn.wake();
642    }
643
644    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
645    ///
646    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
647    /// `count`s increase both minimum and worst-case memory consumption.
648    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
649        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
650        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
651        // May need to send MAX_STREAMS to make progress
652        conn.wake();
653    }
654}
655
656pin_project! {
657    /// Future produced by [`Connection::open_uni`]
658    pub struct OpenUni<'a> {
659        conn: &'a ConnectionRef,
660        #[pin]
661        notify: Notified<'a>,
662    }
663}
664
665impl Future for OpenUni<'_> {
666    type Output = Result<SendStream, ConnectionError>;
667    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
668        let this = self.project();
669        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
670        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
671    }
672}
673
674pin_project! {
675    /// Future produced by [`Connection::open_bi`]
676    pub struct OpenBi<'a> {
677        conn: &'a ConnectionRef,
678        #[pin]
679        notify: Notified<'a>,
680    }
681}
682
683impl Future for OpenBi<'_> {
684    type Output = Result<(SendStream, RecvStream), ConnectionError>;
685    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
686        let this = self.project();
687        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
688
689        Poll::Ready(Ok((
690            SendStream::new(conn.clone(), id, is_0rtt),
691            RecvStream::new(conn, id, is_0rtt),
692        )))
693    }
694}
695
696fn poll_open<'a>(
697    ctx: &mut Context<'_>,
698    conn: &'a ConnectionRef,
699    mut notify: Pin<&mut Notified<'a>>,
700    dir: Dir,
701) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
702    let mut state = conn.state.lock("poll_open");
703    if let Some(ref e) = state.error {
704        return Poll::Ready(Err(e.clone()));
705    } else if let Some(id) = state.inner.streams().open(dir) {
706        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
707        drop(state); // Release the lock so clone can take it
708        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
709    }
710    loop {
711        match notify.as_mut().poll(ctx) {
712            // `state` lock ensures we didn't race with readiness
713            Poll::Pending => return Poll::Pending,
714            // Spurious wakeup, get a new future
715            Poll::Ready(()) => {
716                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
717            }
718        }
719    }
720}
721
722pin_project! {
723    /// Future produced by [`Connection::accept_uni`]
724    pub struct AcceptUni<'a> {
725        conn: &'a ConnectionRef,
726        #[pin]
727        notify: Notified<'a>,
728    }
729}
730
731impl Future for AcceptUni<'_> {
732    type Output = Result<RecvStream, ConnectionError>;
733
734    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
735        let this = self.project();
736        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
737        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
738    }
739}
740
741pin_project! {
742    /// Future produced by [`Connection::accept_bi`]
743    pub struct AcceptBi<'a> {
744        conn: &'a ConnectionRef,
745        #[pin]
746        notify: Notified<'a>,
747    }
748}
749
750impl Future for AcceptBi<'_> {
751    type Output = Result<(SendStream, RecvStream), ConnectionError>;
752
753    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
754        let this = self.project();
755        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
756        Poll::Ready(Ok((
757            SendStream::new(conn.clone(), id, is_0rtt),
758            RecvStream::new(conn, id, is_0rtt),
759        )))
760    }
761}
762
763fn poll_accept<'a>(
764    ctx: &mut Context<'_>,
765    conn: &'a ConnectionRef,
766    mut notify: Pin<&mut Notified<'a>>,
767    dir: Dir,
768) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
769    let mut state = conn.state.lock("poll_accept");
770    // Check for incoming streams before checking `state.error` so that already-received streams,
771    // which are necessarily finite, can be drained from a closed connection.
772    if let Some(id) = state.inner.streams().accept(dir) {
773        let is_0rtt = state.inner.is_handshaking();
774        state.wake(); // To send additional stream ID credit
775        drop(state); // Release the lock so clone can take it
776        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
777    } else if let Some(ref e) = state.error {
778        return Poll::Ready(Err(e.clone()));
779    }
780    loop {
781        match notify.as_mut().poll(ctx) {
782            // `state` lock ensures we didn't race with readiness
783            Poll::Pending => return Poll::Pending,
784            // Spurious wakeup, get a new future
785            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
786        }
787    }
788}
789
790pin_project! {
791    /// Future produced by [`Connection::read_datagram`]
792    pub struct ReadDatagram<'a> {
793        conn: &'a ConnectionRef,
794        #[pin]
795        notify: Notified<'a>,
796    }
797}
798
799impl Future for ReadDatagram<'_> {
800    type Output = Result<Bytes, ConnectionError>;
801    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
802        let mut this = self.project();
803        let mut state = this.conn.state.lock("ReadDatagram::poll");
804        // Check for buffered datagrams before checking `state.error` so that already-received
805        // datagrams, which are necessarily finite, can be drained from a closed connection.
806        if let Some(x) = state.inner.datagrams().recv() {
807            return Poll::Ready(Ok(x));
808        } else if let Some(ref e) = state.error {
809            return Poll::Ready(Err(e.clone()));
810        }
811        loop {
812            match this.notify.as_mut().poll(ctx) {
813                // `state` lock ensures we didn't race with readiness
814                Poll::Pending => return Poll::Pending,
815                // Spurious wakeup, get a new future
816                Poll::Ready(()) => this
817                    .notify
818                    .set(this.conn.shared.datagram_received.notified()),
819            }
820        }
821    }
822}
823
824pin_project! {
825    /// Future produced by [`Connection::send_datagram_wait`]
826    pub struct SendDatagram<'a> {
827        conn: &'a ConnectionRef,
828        data: Option<Bytes>,
829        #[pin]
830        notify: Notified<'a>,
831    }
832}
833
834impl Future for SendDatagram<'_> {
835    type Output = Result<(), SendDatagramError>;
836    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
837        let mut this = self.project();
838        let mut state = this.conn.state.lock("SendDatagram::poll");
839        if let Some(ref e) = state.error {
840            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
841        }
842        use proto::SendDatagramError::*;
843        match state
844            .inner
845            .datagrams()
846            .send(this.data.take().unwrap(), false)
847        {
848            Ok(()) => {
849                state.wake();
850                Poll::Ready(Ok(()))
851            }
852            Err(e) => Poll::Ready(Err(match e {
853                Blocked(data) => {
854                    this.data.replace(data);
855                    loop {
856                        match this.notify.as_mut().poll(ctx) {
857                            Poll::Pending => return Poll::Pending,
858                            // Spurious wakeup, get a new future
859                            Poll::Ready(()) => this
860                                .notify
861                                .set(this.conn.shared.datagrams_unblocked.notified()),
862                        }
863                    }
864                }
865                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
866                Disabled => SendDatagramError::Disabled,
867                TooLarge => SendDatagramError::TooLarge,
868            })),
869        }
870    }
871}
872
873#[derive(Debug)]
874pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
875
876impl ConnectionRef {
877    #[allow(clippy::too_many_arguments)]
878    fn new(
879        handle: ConnectionHandle,
880        conn: proto::Connection,
881        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
882        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
883        on_handshake_data: oneshot::Sender<()>,
884        on_connected: oneshot::Sender<bool>,
885        socket: Arc<dyn AsyncUdpSocket>,
886        runtime: Arc<dyn Runtime>,
887    ) -> Self {
888        Self(Arc::new(ConnectionInner {
889            state: Mutex::new(State {
890                inner: conn,
891                driver: None,
892                handle,
893                on_handshake_data: Some(on_handshake_data),
894                on_connected: Some(on_connected),
895                connected: false,
896                timer: None,
897                timer_deadline: None,
898                conn_events,
899                endpoint_events,
900                blocked_writers: FxHashMap::default(),
901                blocked_readers: FxHashMap::default(),
902                stopped: FxHashMap::default(),
903                error: None,
904                ref_count: 0,
905                io_poller: socket.clone().create_io_poller(),
906                socket,
907                runtime,
908                send_buffer: Vec::new(),
909                buffered_transmit: None,
910            }),
911            shared: Shared::default(),
912        }))
913    }
914
915    fn stable_id(&self) -> usize {
916        &*self.0 as *const _ as usize
917    }
918}
919
920impl Clone for ConnectionRef {
921    fn clone(&self) -> Self {
922        self.state.lock("clone").ref_count += 1;
923        Self(self.0.clone())
924    }
925}
926
927impl Drop for ConnectionRef {
928    fn drop(&mut self) {
929        let conn = &mut *self.state.lock("drop");
930        if let Some(x) = conn.ref_count.checked_sub(1) {
931            conn.ref_count = x;
932            if x == 0 && !conn.inner.is_closed() {
933                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
934                // not, we can't do any harm. If there were any streams being opened, then either
935                // the connection will be closed for an unrelated reason or a fresh reference will
936                // be constructed for the newly opened stream.
937                conn.implicit_close(&self.shared);
938            }
939        }
940    }
941}
942
943impl std::ops::Deref for ConnectionRef {
944    type Target = ConnectionInner;
945    fn deref(&self) -> &Self::Target {
946        &self.0
947    }
948}
949
950#[derive(Debug)]
951pub(crate) struct ConnectionInner {
952    pub(crate) state: Mutex<State>,
953    pub(crate) shared: Shared,
954}
955
956#[derive(Debug, Default)]
957pub(crate) struct Shared {
958    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
959    /// control budget
960    stream_budget_available: [Notify; 2],
961    /// Notified when the peer has initiated a new stream
962    stream_incoming: [Notify; 2],
963    datagram_received: Notify,
964    datagrams_unblocked: Notify,
965    closed: Notify,
966}
967
968pub(crate) struct State {
969    pub(crate) inner: proto::Connection,
970    driver: Option<Waker>,
971    handle: ConnectionHandle,
972    on_handshake_data: Option<oneshot::Sender<()>>,
973    on_connected: Option<oneshot::Sender<bool>>,
974    connected: bool,
975    timer: Option<Pin<Box<dyn AsyncTimer>>>,
976    timer_deadline: Option<Instant>,
977    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
978    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
979    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
980    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
981    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
982    /// Always set to Some before the connection becomes drained
983    pub(crate) error: Option<ConnectionError>,
984    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
985    ref_count: usize,
986    socket: Arc<dyn AsyncUdpSocket>,
987    io_poller: Pin<Box<dyn UdpPoller>>,
988    runtime: Arc<dyn Runtime>,
989    send_buffer: Vec<u8>,
990    /// We buffer a transmit when the underlying I/O would block
991    buffered_transmit: Option<proto::Transmit>,
992}
993
994impl State {
995    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
996        let now = self.runtime.now();
997        let mut transmits = 0;
998
999        let max_datagrams = self
1000            .socket
1001            .max_transmit_segments()
1002            .min(MAX_TRANSMIT_SEGMENTS);
1003
1004        loop {
1005            // Retry the last transmit, or get a new one.
1006            let t = match self.buffered_transmit.take() {
1007                Some(t) => t,
1008                None => {
1009                    self.send_buffer.clear();
1010                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
1011                    match self
1012                        .inner
1013                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1014                    {
1015                        Some(t) => {
1016                            transmits += match t.segment_size {
1017                                None => 1,
1018                                Some(s) => t.size.div_ceil(s), // round up
1019                            };
1020                            t
1021                        }
1022                        None => break,
1023                    }
1024                }
1025            };
1026
1027            if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1028                // Retry after a future wakeup
1029                self.buffered_transmit = Some(t);
1030                return Ok(false);
1031            }
1032
1033            let len = t.size;
1034            let retry = match self
1035                .socket
1036                .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1037            {
1038                Ok(()) => false,
1039                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1040                Err(e) => return Err(e),
1041            };
1042            if retry {
1043                // We thought the socket was writable, but it wasn't. Retry so that either another
1044                // `poll_writable` call determines that the socket is indeed not writable and
1045                // registers us for a wakeup, or the send succeeds if this really was just a
1046                // transient failure.
1047                self.buffered_transmit = Some(t);
1048                continue;
1049            }
1050
1051            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1052                // TODO: What isn't ideal here yet is that if we don't poll all
1053                // datagrams that could be sent we don't go into the `app_limited`
1054                // state and CWND continues to grow until we get here the next time.
1055                // See https://github.com/quinn-rs/quinn/issues/1126
1056                return Ok(true);
1057            }
1058        }
1059
1060        Ok(false)
1061    }
1062
1063    fn forward_endpoint_events(&mut self) {
1064        while let Some(event) = self.inner.poll_endpoint_events() {
1065            // If the endpoint driver is gone, noop.
1066            let _ = self.endpoint_events.send((self.handle, event));
1067        }
1068    }
1069
1070    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1071    fn process_conn_events(
1072        &mut self,
1073        shared: &Shared,
1074        cx: &mut Context,
1075    ) -> Result<(), ConnectionError> {
1076        loop {
1077            match self.conn_events.poll_recv(cx) {
1078                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1079                    self.socket = socket;
1080                    self.io_poller = self.socket.clone().create_io_poller();
1081                    self.inner.local_address_changed();
1082                }
1083                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1084                    self.inner.handle_event(event);
1085                }
1086                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1087                    self.close(error_code, reason, shared);
1088                }
1089                Poll::Ready(None) => {
1090                    return Err(ConnectionError::TransportError(proto::TransportError {
1091                        code: proto::TransportErrorCode::INTERNAL_ERROR,
1092                        frame: None,
1093                        reason: "endpoint driver future was dropped".to_string(),
1094                    }));
1095                }
1096                Poll::Pending => {
1097                    return Ok(());
1098                }
1099            }
1100        }
1101    }
1102
1103    fn forward_app_events(&mut self, shared: &Shared) {
1104        while let Some(event) = self.inner.poll() {
1105            use proto::Event::*;
1106            match event {
1107                HandshakeDataReady => {
1108                    if let Some(x) = self.on_handshake_data.take() {
1109                        let _ = x.send(());
1110                    }
1111                }
1112                Connected => {
1113                    self.connected = true;
1114                    if let Some(x) = self.on_connected.take() {
1115                        // We don't care if the on-connected future was dropped
1116                        let _ = x.send(self.inner.accepted_0rtt());
1117                    }
1118                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1119                        // Wake up rejected 0-RTT streams so they can fail immediately with
1120                        // `ZeroRttRejected` errors.
1121                        wake_all(&mut self.blocked_writers);
1122                        wake_all(&mut self.blocked_readers);
1123                        wake_all_notify(&mut self.stopped);
1124                    }
1125                }
1126                ConnectionLost { reason } => {
1127                    self.terminate(reason, shared);
1128                }
1129                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1130                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1131                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1132                }
1133                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1134                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1135                }
1136                DatagramReceived => {
1137                    shared.datagram_received.notify_waiters();
1138                }
1139                DatagramsUnblocked => {
1140                    shared.datagrams_unblocked.notify_waiters();
1141                }
1142                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1143                Stream(StreamEvent::Available { dir }) => {
1144                    // Might mean any number of streams are ready, so we wake up everyone
1145                    shared.stream_budget_available[dir as usize].notify_waiters();
1146                }
1147                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1148                Stream(StreamEvent::Stopped { id, .. }) => {
1149                    wake_stream_notify(id, &mut self.stopped);
1150                    wake_stream(id, &mut self.blocked_writers);
1151                }
1152            }
1153        }
1154    }
1155
1156    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1157        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1158        // timer is registered with the runtime (and check whether it's already
1159        // expired).
1160        match self.inner.poll_timeout() {
1161            Some(deadline) => {
1162                if let Some(delay) = &mut self.timer {
1163                    // There is no need to reset the tokio timer if the deadline
1164                    // did not change
1165                    if self
1166                        .timer_deadline
1167                        .map(|current_deadline| current_deadline != deadline)
1168                        .unwrap_or(true)
1169                    {
1170                        delay.as_mut().reset(deadline);
1171                    }
1172                } else {
1173                    self.timer = Some(self.runtime.new_timer(deadline));
1174                }
1175                // Store the actual expiration time of the timer
1176                self.timer_deadline = Some(deadline);
1177            }
1178            None => {
1179                self.timer_deadline = None;
1180                return false;
1181            }
1182        }
1183
1184        if self.timer_deadline.is_none() {
1185            return false;
1186        }
1187
1188        let delay = self
1189            .timer
1190            .as_mut()
1191            .expect("timer must exist in this state")
1192            .as_mut();
1193        if delay.poll(cx).is_pending() {
1194            // Since there wasn't a timeout event, there is nothing new
1195            // for the connection to do
1196            return false;
1197        }
1198
1199        // A timer expired, so the caller needs to check for
1200        // new transmits, which might cause new timers to be set.
1201        self.inner.handle_timeout(self.runtime.now());
1202        self.timer_deadline = None;
1203        true
1204    }
1205
1206    /// Wake up a blocked `Driver` task to process I/O
1207    pub(crate) fn wake(&mut self) {
1208        if let Some(x) = self.driver.take() {
1209            x.wake();
1210        }
1211    }
1212
1213    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1214    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1215        self.error = Some(reason.clone());
1216        if let Some(x) = self.on_handshake_data.take() {
1217            let _ = x.send(());
1218        }
1219        wake_all(&mut self.blocked_writers);
1220        wake_all(&mut self.blocked_readers);
1221        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1222        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1223        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1224        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1225        shared.datagram_received.notify_waiters();
1226        shared.datagrams_unblocked.notify_waiters();
1227        if let Some(x) = self.on_connected.take() {
1228            let _ = x.send(false);
1229        }
1230        wake_all_notify(&mut self.stopped);
1231        shared.closed.notify_waiters();
1232    }
1233
1234    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1235        self.inner.close(self.runtime.now(), error_code, reason);
1236        self.terminate(ConnectionError::LocallyClosed, shared);
1237        self.wake();
1238    }
1239
1240    /// Close for a reason other than the application's explicit request
1241    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1242        self.close(0u32.into(), Bytes::new(), shared);
1243    }
1244
1245    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1246        if self.inner.is_handshaking()
1247            || self.inner.accepted_0rtt()
1248            || self.inner.side().is_server()
1249        {
1250            Ok(())
1251        } else {
1252            Err(())
1253        }
1254    }
1255}
1256
1257impl Drop for State {
1258    fn drop(&mut self) {
1259        if !self.inner.is_drained() {
1260            // Ensure the endpoint can tidy up
1261            let _ = self
1262                .endpoint_events
1263                .send((self.handle, proto::EndpointEvent::drained()));
1264        }
1265    }
1266}
1267
1268impl fmt::Debug for State {
1269    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1270        f.debug_struct("State").field("inner", &self.inner).finish()
1271    }
1272}
1273
1274fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1275    if let Some(waker) = wakers.remove(&stream_id) {
1276        waker.wake();
1277    }
1278}
1279
1280fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1281    wakers.drain().for_each(|(_, waker)| waker.wake())
1282}
1283
1284fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1285    if let Some(notify) = wakers.remove(&stream_id) {
1286        notify.notify_waiters()
1287    }
1288}
1289
1290fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1291    wakers
1292        .drain()
1293        .for_each(|(_, notify)| notify.notify_waiters())
1294}
1295
1296/// Errors that can arise when sending a datagram
1297#[derive(Debug, Error, Clone, Eq, PartialEq)]
1298pub enum SendDatagramError {
1299    /// The peer does not support receiving datagram frames
1300    #[error("datagrams not supported by peer")]
1301    UnsupportedByPeer,
1302    /// Datagram support is disabled locally
1303    #[error("datagram support disabled")]
1304    Disabled,
1305    /// The datagram is larger than the connection can currently accommodate
1306    ///
1307    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1308    /// exceeded.
1309    #[error("datagram too large")]
1310    TooLarge,
1311    /// The connection was lost
1312    #[error("connection lost")]
1313    ConnectionLost(#[from] ConnectionError),
1314}
1315
1316/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1317///
1318/// This limits the amount of CPU resources consumed by datagram generation,
1319/// and allows other tasks (like receiving ACKs) to run in between.
1320const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1321
1322/// The maximum amount of datagrams that are sent in a single transmit
1323///
1324/// This can be lower than the maximum platform capabilities, to avoid excessive
1325/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1326/// that numbers around 10 are a good compromise.
1327const MAX_TRANSMIT_SEGMENTS: usize = 10;