quinn/connection.rs
1use std::{
2 any::Any,
3 fmt,
4 future::Future,
5 io,
6 net::{IpAddr, SocketAddr},
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
17use tracing::{Instrument, Span, debug_span};
18
19use crate::{
20 ConnectionEvent, Duration, Instant, VarInt,
21 mutex::Mutex,
22 recv_stream::RecvStream,
23 runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
24 send_stream::SendStream,
25 udp_transmit,
26};
27use proto::{
28 ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, Side, StreamEvent,
29 StreamId, congestion::Controller,
30};
31
32/// In-progress connection attempt future
33#[derive(Debug)]
34pub struct Connecting {
35 conn: Option<ConnectionRef>,
36 connected: oneshot::Receiver<bool>,
37 handshake_data_ready: Option<oneshot::Receiver<()>>,
38}
39
40impl Connecting {
41 pub(crate) fn new(
42 handle: ConnectionHandle,
43 conn: proto::Connection,
44 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
45 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
46 socket: Arc<dyn AsyncUdpSocket>,
47 runtime: Arc<dyn Runtime>,
48 ) -> Self {
49 let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
50 let (on_connected_send, on_connected_recv) = oneshot::channel();
51 let conn = ConnectionRef::new(
52 handle,
53 conn,
54 endpoint_events,
55 conn_events,
56 on_handshake_data_send,
57 on_connected_send,
58 socket,
59 runtime.clone(),
60 );
61
62 let driver = ConnectionDriver(conn.clone());
63 runtime.spawn(Box::pin(
64 async {
65 if let Err(e) = driver.await {
66 tracing::error!("I/O error: {e}");
67 }
68 }
69 .instrument(Span::current()),
70 ));
71
72 Self {
73 conn: Some(conn),
74 connected: on_connected_recv,
75 handshake_data_ready: Some(on_handshake_data_recv),
76 }
77 }
78
79 /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
80 ///
81 /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
82 /// If so, the returned [`Connection`] can be used to send application data without waiting for
83 /// the rest of the handshake to complete, at the cost of weakened cryptographic security
84 /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
85 /// complete, at which point subsequently opened streams and written data will have full
86 /// cryptographic protection.
87 ///
88 /// ## Outgoing
89 ///
90 /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
91 /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
92 /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
93 /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
94 /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
95 /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
96 /// If it was rejected, the existence of streams opened and other application data sent prior
97 /// to the handshake completing will not be conveyed to the remote application, and local
98 /// operations on them will return `ZeroRttRejected` errors.
99 ///
100 /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
101 /// relevant resumption state to be stored in the server, which servers may limit or lose for
102 /// various reasons including not persisting resumption state across server restarts.
103 ///
104 /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
105 /// implementation's docs for 0-RTT pitfalls.
106 ///
107 /// ## Incoming
108 ///
109 /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
110 /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
111 ///
112 /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
113 /// implementation's docs for 0-RTT pitfalls.
114 ///
115 /// ## Security
116 ///
117 /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
118 /// replay attacks, and should therefore never invoke non-idempotent operations.
119 ///
120 /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
121 /// before TLS client authentication has occurred, and should therefore not be used to send
122 /// data for which client authentication is being used.
123 pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
124 // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
125 // have to release it explicitly before returning `self` by value.
126 let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
127
128 let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
129 drop(conn);
130
131 if is_ok {
132 let conn = self.conn.take().unwrap();
133 Ok((Connection(conn), ZeroRttAccepted(self.connected)))
134 } else {
135 Err(self)
136 }
137 }
138
139 /// Parameters negotiated during the handshake
140 ///
141 /// The dynamic type returned is determined by the configured
142 /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
143 /// be [`downcast`](Box::downcast) to a
144 /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
145 pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
146 // Taking &mut self allows us to use a single oneshot channel rather than dealing with
147 // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
148 // simple.
149 if let Some(x) = self.handshake_data_ready.take() {
150 let _ = x.await;
151 }
152 let conn = self.conn.as_ref().unwrap();
153 let inner = conn.state.lock("handshake");
154 inner
155 .inner
156 .crypto_session()
157 .handshake_data()
158 .ok_or_else(|| {
159 inner
160 .error
161 .clone()
162 .expect("spurious handshake data ready notification")
163 })
164 }
165
166 /// The local IP address which was used when the peer established
167 /// the connection
168 ///
169 /// This can be different from the address the endpoint is bound to, in case
170 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
171 ///
172 /// This will return `None` for clients, or when the platform does not expose this
173 /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
174 /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
175 ///
176 /// Will panic if called after `poll` has returned `Ready`.
177 pub fn local_ip(&self) -> Option<IpAddr> {
178 let conn = self.conn.as_ref().unwrap();
179 let inner = conn.state.lock("local_ip");
180
181 inner.inner.local_ip()
182 }
183
184 /// The peer's UDP address
185 ///
186 /// Will panic if called after `poll` has returned `Ready`.
187 pub fn remote_address(&self) -> SocketAddr {
188 let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
189 conn_ref.state.lock("remote_address").inner.remote_address()
190 }
191}
192
193impl Future for Connecting {
194 type Output = Result<Connection, ConnectionError>;
195 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
196 Pin::new(&mut self.connected).poll(cx).map(|_| {
197 let conn = self.conn.take().unwrap();
198 let inner = conn.state.lock("connecting");
199 if inner.connected {
200 drop(inner);
201 Ok(Connection(conn))
202 } else {
203 Err(inner
204 .error
205 .clone()
206 .expect("connected signaled without connection success or error"))
207 }
208 })
209 }
210}
211
212/// Future that completes when a connection is fully established
213///
214/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
215/// value is meaningless.
216pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
217
218impl Future for ZeroRttAccepted {
219 type Output = bool;
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221 Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
222 }
223}
224
225/// A future that drives protocol logic for a connection
226///
227/// This future handles the protocol logic for a single connection, routing events from the
228/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
229/// It also keeps track of outstanding timeouts for the `Connection`.
230///
231/// If the connection encounters an error condition, this future will yield an error. It will
232/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
233/// connection-related futures, this waits for the draining period to complete to ensure that
234/// packets still in flight from the peer are handled gracefully.
235#[must_use = "connection drivers must be spawned for their connections to function"]
236#[derive(Debug)]
237struct ConnectionDriver(ConnectionRef);
238
239impl Future for ConnectionDriver {
240 type Output = Result<(), io::Error>;
241
242 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
243 let conn = &mut *self.0.state.lock("poll");
244
245 let span = debug_span!("drive", id = conn.handle.0);
246 let _guard = span.enter();
247
248 if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
249 conn.terminate(e, &self.0.shared);
250 return Poll::Ready(Ok(()));
251 }
252 let mut keep_going = conn.drive_transmit(cx)?;
253 // If a timer expires, there might be more to transmit. When we transmit something, we
254 // might need to reset a timer. Hence, we must loop until neither happens.
255 keep_going |= conn.drive_timer(cx);
256 conn.forward_endpoint_events();
257 conn.forward_app_events(&self.0.shared);
258
259 if !conn.inner.is_drained() {
260 if keep_going {
261 // If the connection hasn't processed all tasks, schedule it again
262 cx.waker().wake_by_ref();
263 } else {
264 conn.driver = Some(cx.waker().clone());
265 }
266 return Poll::Pending;
267 }
268 if conn.error.is_none() {
269 unreachable!("drained connections always have an error");
270 }
271 Poll::Ready(Ok(()))
272 }
273}
274
275/// A QUIC connection.
276///
277/// If all references to a connection (including every clone of the `Connection` handle, streams of
278/// incoming streams, and the various stream types) have been dropped, then the connection will be
279/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
280/// connection explicitly by calling [`Connection::close()`].
281///
282/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
283/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
284/// application. [`Connection::close()`] describes in more detail how to gracefully close a
285/// connection without losing application data.
286///
287/// May be cloned to obtain another handle to the same connection.
288///
289/// [`Connection::close()`]: Connection::close
290#[derive(Debug, Clone)]
291pub struct Connection(ConnectionRef);
292
293impl Connection {
294 /// Initiate a new outgoing unidirectional stream.
295 ///
296 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
297 /// consequence, the peer won't be notified that a stream has been opened until the stream is
298 /// actually used.
299 pub fn open_uni(&self) -> OpenUni<'_> {
300 OpenUni {
301 conn: &self.0,
302 notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
303 }
304 }
305
306 /// Initiate a new outgoing bidirectional stream.
307 ///
308 /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
309 /// consequence, the peer won't be notified that a stream has been opened until the stream is
310 /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
311 /// anything to [`SendStream`] will never succeed.
312 ///
313 /// [`open_bi()`]: crate::Connection::open_bi
314 /// [`SendStream`]: crate::SendStream
315 /// [`RecvStream`]: crate::RecvStream
316 pub fn open_bi(&self) -> OpenBi<'_> {
317 OpenBi {
318 conn: &self.0,
319 notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
320 }
321 }
322
323 /// Accept the next incoming uni-directional stream
324 pub fn accept_uni(&self) -> AcceptUni<'_> {
325 AcceptUni {
326 conn: &self.0,
327 notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
328 }
329 }
330
331 /// Accept the next incoming bidirectional stream
332 ///
333 /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
334 /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
335 /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
336 ///
337 /// [`accept_bi()`]: crate::Connection::accept_bi
338 /// [`open_bi()`]: crate::Connection::open_bi
339 /// [`SendStream`]: crate::SendStream
340 /// [`RecvStream`]: crate::RecvStream
341 pub fn accept_bi(&self) -> AcceptBi<'_> {
342 AcceptBi {
343 conn: &self.0,
344 notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
345 }
346 }
347
348 /// Receive an application datagram
349 pub fn read_datagram(&self) -> ReadDatagram<'_> {
350 ReadDatagram {
351 conn: &self.0,
352 notify: self.0.shared.datagram_received.notified(),
353 }
354 }
355
356 /// Wait for the connection to be closed for any reason
357 ///
358 /// Despite the return type's name, closed connections are often not an error condition at the
359 /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
360 /// and [`ConnectionError::ApplicationClosed`].
361 pub async fn closed(&self) -> ConnectionError {
362 {
363 let conn = self.0.state.lock("closed");
364 if let Some(error) = conn.error.as_ref() {
365 return error.clone();
366 }
367 // Construct the future while the lock is held to ensure we can't miss a wakeup if
368 // the `Notify` is signaled immediately after we release the lock. `await` it after
369 // the lock guard is out of scope.
370 self.0.shared.closed.notified()
371 }
372 .await;
373 self.0
374 .state
375 .lock("closed")
376 .error
377 .as_ref()
378 .expect("closed without an error")
379 .clone()
380 }
381
382 /// If the connection is closed, the reason why.
383 ///
384 /// Returns `None` if the connection is still open.
385 pub fn close_reason(&self) -> Option<ConnectionError> {
386 self.0.state.lock("close_reason").error.clone()
387 }
388
389 /// Close the connection immediately.
390 ///
391 /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
392 /// more data is sent to the peer and the peer may drop buffered data upon receiving
393 /// the CONNECTION_CLOSE frame.
394 ///
395 /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
396 ///
397 /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
398 /// is preserved in full, it should be kept under 1KiB.
399 ///
400 /// # Gracefully closing a connection
401 ///
402 /// Only the peer last receiving application data can be certain that all data is
403 /// delivered. The only reliable action it can then take is to close the connection,
404 /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
405 /// frame is very likely if both endpoints stay online long enough, and
406 /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
407 /// remote peer will time out the connection, provided that the idle timeout is not
408 /// disabled.
409 ///
410 /// The sending side can not guarantee all stream data is delivered to the remote
411 /// application. It only knows the data is delivered to the QUIC stack of the remote
412 /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
413 /// [`close()`] the remote endpoint may drop any data it received but is as yet
414 /// undelivered to the application, including data that was acknowledged as received to
415 /// the local endpoint.
416 ///
417 /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
418 /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
419 /// [`close()`]: Connection::close
420 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
421 let conn = &mut *self.0.state.lock("close");
422 conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
423 }
424
425 /// Transmit `data` as an unreliable, unordered application datagram
426 ///
427 /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
428 /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
429 /// dictated by the peer.
430 ///
431 /// Previously queued datagrams which are still unsent may be discarded to make space for this
432 /// datagram, in order of oldest to newest.
433 pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
434 let conn = &mut *self.0.state.lock("send_datagram");
435 if let Some(ref x) = conn.error {
436 return Err(SendDatagramError::ConnectionLost(x.clone()));
437 }
438 use proto::SendDatagramError::*;
439 match conn.inner.datagrams().send(data, true) {
440 Ok(()) => {
441 conn.wake();
442 Ok(())
443 }
444 Err(e) => Err(match e {
445 Blocked(..) => unreachable!(),
446 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
447 Disabled => SendDatagramError::Disabled,
448 TooLarge => SendDatagramError::TooLarge,
449 }),
450 }
451 }
452
453 /// Transmit `data` as an unreliable, unordered application datagram
454 ///
455 /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
456 /// conditions, which effectively prioritizes old datagrams over new datagrams.
457 ///
458 /// See [`send_datagram()`] for details.
459 ///
460 /// [`send_datagram()`]: Connection::send_datagram
461 pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
462 SendDatagram {
463 conn: &self.0,
464 data: Some(data),
465 notify: self.0.shared.datagrams_unblocked.notified(),
466 }
467 }
468
469 /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
470 ///
471 /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
472 ///
473 /// This may change over the lifetime of a connection according to variation in the path MTU
474 /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
475 /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
476 ///
477 /// Not necessarily the maximum size of received datagrams.
478 ///
479 /// [`send_datagram()`]: Connection::send_datagram
480 pub fn max_datagram_size(&self) -> Option<usize> {
481 self.0
482 .state
483 .lock("max_datagram_size")
484 .inner
485 .datagrams()
486 .max_size()
487 }
488
489 /// Bytes available in the outgoing datagram buffer
490 ///
491 /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
492 /// at most this size is guaranteed not to cause older datagrams to be dropped.
493 pub fn datagram_send_buffer_space(&self) -> usize {
494 self.0
495 .state
496 .lock("datagram_send_buffer_space")
497 .inner
498 .datagrams()
499 .send_buffer_space()
500 }
501
502 /// The side of the connection (client or server)
503 pub fn side(&self) -> Side {
504 self.0.state.lock("side").inner.side()
505 }
506
507 /// The peer's UDP address
508 ///
509 /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
510 /// switching to a cellular internet connection.
511 pub fn remote_address(&self) -> SocketAddr {
512 self.0.state.lock("remote_address").inner.remote_address()
513 }
514
515 /// The local IP address which was used when the peer established
516 /// the connection
517 ///
518 /// This can be different from the address the endpoint is bound to, in case
519 /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
520 ///
521 /// This will return `None` for clients, or when the platform does not expose this
522 /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
523 /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
524 pub fn local_ip(&self) -> Option<IpAddr> {
525 self.0.state.lock("local_ip").inner.local_ip()
526 }
527
528 /// Current best estimate of this connection's latency (round-trip-time)
529 pub fn rtt(&self) -> Duration {
530 self.0.state.lock("rtt").inner.rtt()
531 }
532
533 /// Returns connection statistics
534 pub fn stats(&self) -> ConnectionStats {
535 self.0.state.lock("stats").inner.stats()
536 }
537
538 /// Current state of the congestion control algorithm, for debugging purposes
539 pub fn congestion_state(&self) -> Box<dyn Controller> {
540 self.0
541 .state
542 .lock("congestion_state")
543 .inner
544 .congestion_state()
545 .clone_box()
546 }
547
548 /// Parameters negotiated during the handshake
549 ///
550 /// Guaranteed to return `Some` on fully established connections or after
551 /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
552 /// the returned value.
553 ///
554 /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
555 pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
556 self.0
557 .state
558 .lock("handshake_data")
559 .inner
560 .crypto_session()
561 .handshake_data()
562 }
563
564 /// Cryptographic identity of the peer
565 ///
566 /// The dynamic type returned is determined by the configured
567 /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
568 /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
569 pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
570 self.0
571 .state
572 .lock("peer_identity")
573 .inner
574 .crypto_session()
575 .peer_identity()
576 }
577
578 /// A stable identifier for this connection
579 ///
580 /// Peer addresses and connection IDs can change, but this value will remain
581 /// fixed for the lifetime of the connection.
582 pub fn stable_id(&self) -> usize {
583 self.0.stable_id()
584 }
585
586 /// Update traffic keys spontaneously
587 ///
588 /// This primarily exists for testing purposes.
589 pub fn force_key_update(&self) {
590 self.0
591 .state
592 .lock("force_key_update")
593 .inner
594 .force_key_update()
595 }
596
597 /// Derive keying material from this connection's TLS session secrets.
598 ///
599 /// When both peers call this method with the same `label` and `context`
600 /// arguments and `output` buffers of equal length, they will get the
601 /// same sequence of bytes in `output`. These bytes are cryptographically
602 /// strong and pseudorandom, and are suitable for use as keying material.
603 ///
604 /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
605 pub fn export_keying_material(
606 &self,
607 output: &mut [u8],
608 label: &[u8],
609 context: &[u8],
610 ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
611 self.0
612 .state
613 .lock("export_keying_material")
614 .inner
615 .crypto_session()
616 .export_keying_material(output, label, context)
617 }
618
619 /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
620 ///
621 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
622 /// `count`s increase both minimum and worst-case memory consumption.
623 pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
624 let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
625 conn.inner.set_max_concurrent_streams(Dir::Uni, count);
626 // May need to send MAX_STREAMS to make progress
627 conn.wake();
628 }
629
630 /// See [`proto::TransportConfig::send_window()`]
631 pub fn set_send_window(&self, send_window: u64) {
632 let mut conn = self.0.state.lock("set_send_window");
633 conn.inner.set_send_window(send_window);
634 conn.wake();
635 }
636
637 /// See [`proto::TransportConfig::receive_window()`]
638 pub fn set_receive_window(&self, receive_window: VarInt) {
639 let mut conn = self.0.state.lock("set_receive_window");
640 conn.inner.set_receive_window(receive_window);
641 conn.wake();
642 }
643
644 /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
645 ///
646 /// No streams may be opened by the peer unless fewer than `count` are already open. Large
647 /// `count`s increase both minimum and worst-case memory consumption.
648 pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
649 let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
650 conn.inner.set_max_concurrent_streams(Dir::Bi, count);
651 // May need to send MAX_STREAMS to make progress
652 conn.wake();
653 }
654}
655
656pin_project! {
657 /// Future produced by [`Connection::open_uni`]
658 pub struct OpenUni<'a> {
659 conn: &'a ConnectionRef,
660 #[pin]
661 notify: Notified<'a>,
662 }
663}
664
665impl Future for OpenUni<'_> {
666 type Output = Result<SendStream, ConnectionError>;
667 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
668 let this = self.project();
669 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
670 Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
671 }
672}
673
674pin_project! {
675 /// Future produced by [`Connection::open_bi`]
676 pub struct OpenBi<'a> {
677 conn: &'a ConnectionRef,
678 #[pin]
679 notify: Notified<'a>,
680 }
681}
682
683impl Future for OpenBi<'_> {
684 type Output = Result<(SendStream, RecvStream), ConnectionError>;
685 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
686 let this = self.project();
687 let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
688
689 Poll::Ready(Ok((
690 SendStream::new(conn.clone(), id, is_0rtt),
691 RecvStream::new(conn, id, is_0rtt),
692 )))
693 }
694}
695
696fn poll_open<'a>(
697 ctx: &mut Context<'_>,
698 conn: &'a ConnectionRef,
699 mut notify: Pin<&mut Notified<'a>>,
700 dir: Dir,
701) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
702 let mut state = conn.state.lock("poll_open");
703 if let Some(ref e) = state.error {
704 return Poll::Ready(Err(e.clone()));
705 } else if let Some(id) = state.inner.streams().open(dir) {
706 let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
707 drop(state); // Release the lock so clone can take it
708 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
709 }
710 loop {
711 match notify.as_mut().poll(ctx) {
712 // `state` lock ensures we didn't race with readiness
713 Poll::Pending => return Poll::Pending,
714 // Spurious wakeup, get a new future
715 Poll::Ready(()) => {
716 notify.set(conn.shared.stream_budget_available[dir as usize].notified())
717 }
718 }
719 }
720}
721
722pin_project! {
723 /// Future produced by [`Connection::accept_uni`]
724 pub struct AcceptUni<'a> {
725 conn: &'a ConnectionRef,
726 #[pin]
727 notify: Notified<'a>,
728 }
729}
730
731impl Future for AcceptUni<'_> {
732 type Output = Result<RecvStream, ConnectionError>;
733
734 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
735 let this = self.project();
736 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
737 Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
738 }
739}
740
741pin_project! {
742 /// Future produced by [`Connection::accept_bi`]
743 pub struct AcceptBi<'a> {
744 conn: &'a ConnectionRef,
745 #[pin]
746 notify: Notified<'a>,
747 }
748}
749
750impl Future for AcceptBi<'_> {
751 type Output = Result<(SendStream, RecvStream), ConnectionError>;
752
753 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
754 let this = self.project();
755 let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
756 Poll::Ready(Ok((
757 SendStream::new(conn.clone(), id, is_0rtt),
758 RecvStream::new(conn, id, is_0rtt),
759 )))
760 }
761}
762
763fn poll_accept<'a>(
764 ctx: &mut Context<'_>,
765 conn: &'a ConnectionRef,
766 mut notify: Pin<&mut Notified<'a>>,
767 dir: Dir,
768) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
769 let mut state = conn.state.lock("poll_accept");
770 // Check for incoming streams before checking `state.error` so that already-received streams,
771 // which are necessarily finite, can be drained from a closed connection.
772 if let Some(id) = state.inner.streams().accept(dir) {
773 let is_0rtt = state.inner.is_handshaking();
774 state.wake(); // To send additional stream ID credit
775 drop(state); // Release the lock so clone can take it
776 return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
777 } else if let Some(ref e) = state.error {
778 return Poll::Ready(Err(e.clone()));
779 }
780 loop {
781 match notify.as_mut().poll(ctx) {
782 // `state` lock ensures we didn't race with readiness
783 Poll::Pending => return Poll::Pending,
784 // Spurious wakeup, get a new future
785 Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
786 }
787 }
788}
789
790pin_project! {
791 /// Future produced by [`Connection::read_datagram`]
792 pub struct ReadDatagram<'a> {
793 conn: &'a ConnectionRef,
794 #[pin]
795 notify: Notified<'a>,
796 }
797}
798
799impl Future for ReadDatagram<'_> {
800 type Output = Result<Bytes, ConnectionError>;
801 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
802 let mut this = self.project();
803 let mut state = this.conn.state.lock("ReadDatagram::poll");
804 // Check for buffered datagrams before checking `state.error` so that already-received
805 // datagrams, which are necessarily finite, can be drained from a closed connection.
806 if let Some(x) = state.inner.datagrams().recv() {
807 return Poll::Ready(Ok(x));
808 } else if let Some(ref e) = state.error {
809 return Poll::Ready(Err(e.clone()));
810 }
811 loop {
812 match this.notify.as_mut().poll(ctx) {
813 // `state` lock ensures we didn't race with readiness
814 Poll::Pending => return Poll::Pending,
815 // Spurious wakeup, get a new future
816 Poll::Ready(()) => this
817 .notify
818 .set(this.conn.shared.datagram_received.notified()),
819 }
820 }
821 }
822}
823
824pin_project! {
825 /// Future produced by [`Connection::send_datagram_wait`]
826 pub struct SendDatagram<'a> {
827 conn: &'a ConnectionRef,
828 data: Option<Bytes>,
829 #[pin]
830 notify: Notified<'a>,
831 }
832}
833
834impl Future for SendDatagram<'_> {
835 type Output = Result<(), SendDatagramError>;
836 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
837 let mut this = self.project();
838 let mut state = this.conn.state.lock("SendDatagram::poll");
839 if let Some(ref e) = state.error {
840 return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
841 }
842 use proto::SendDatagramError::*;
843 match state
844 .inner
845 .datagrams()
846 .send(this.data.take().unwrap(), false)
847 {
848 Ok(()) => {
849 state.wake();
850 Poll::Ready(Ok(()))
851 }
852 Err(e) => Poll::Ready(Err(match e {
853 Blocked(data) => {
854 this.data.replace(data);
855 loop {
856 match this.notify.as_mut().poll(ctx) {
857 Poll::Pending => return Poll::Pending,
858 // Spurious wakeup, get a new future
859 Poll::Ready(()) => this
860 .notify
861 .set(this.conn.shared.datagrams_unblocked.notified()),
862 }
863 }
864 }
865 UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
866 Disabled => SendDatagramError::Disabled,
867 TooLarge => SendDatagramError::TooLarge,
868 })),
869 }
870 }
871}
872
873#[derive(Debug)]
874pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
875
876impl ConnectionRef {
877 #[allow(clippy::too_many_arguments)]
878 fn new(
879 handle: ConnectionHandle,
880 conn: proto::Connection,
881 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
882 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
883 on_handshake_data: oneshot::Sender<()>,
884 on_connected: oneshot::Sender<bool>,
885 socket: Arc<dyn AsyncUdpSocket>,
886 runtime: Arc<dyn Runtime>,
887 ) -> Self {
888 Self(Arc::new(ConnectionInner {
889 state: Mutex::new(State {
890 inner: conn,
891 driver: None,
892 handle,
893 on_handshake_data: Some(on_handshake_data),
894 on_connected: Some(on_connected),
895 connected: false,
896 timer: None,
897 timer_deadline: None,
898 conn_events,
899 endpoint_events,
900 blocked_writers: FxHashMap::default(),
901 blocked_readers: FxHashMap::default(),
902 stopped: FxHashMap::default(),
903 error: None,
904 ref_count: 0,
905 io_poller: socket.clone().create_io_poller(),
906 socket,
907 runtime,
908 send_buffer: Vec::new(),
909 buffered_transmit: None,
910 }),
911 shared: Shared::default(),
912 }))
913 }
914
915 fn stable_id(&self) -> usize {
916 &*self.0 as *const _ as usize
917 }
918}
919
920impl Clone for ConnectionRef {
921 fn clone(&self) -> Self {
922 self.state.lock("clone").ref_count += 1;
923 Self(self.0.clone())
924 }
925}
926
927impl Drop for ConnectionRef {
928 fn drop(&mut self) {
929 let conn = &mut *self.state.lock("drop");
930 if let Some(x) = conn.ref_count.checked_sub(1) {
931 conn.ref_count = x;
932 if x == 0 && !conn.inner.is_closed() {
933 // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
934 // not, we can't do any harm. If there were any streams being opened, then either
935 // the connection will be closed for an unrelated reason or a fresh reference will
936 // be constructed for the newly opened stream.
937 conn.implicit_close(&self.shared);
938 }
939 }
940 }
941}
942
943impl std::ops::Deref for ConnectionRef {
944 type Target = ConnectionInner;
945 fn deref(&self) -> &Self::Target {
946 &self.0
947 }
948}
949
950#[derive(Debug)]
951pub(crate) struct ConnectionInner {
952 pub(crate) state: Mutex<State>,
953 pub(crate) shared: Shared,
954}
955
956#[derive(Debug, Default)]
957pub(crate) struct Shared {
958 /// Notified when new streams may be locally initiated due to an increase in stream ID flow
959 /// control budget
960 stream_budget_available: [Notify; 2],
961 /// Notified when the peer has initiated a new stream
962 stream_incoming: [Notify; 2],
963 datagram_received: Notify,
964 datagrams_unblocked: Notify,
965 closed: Notify,
966}
967
968pub(crate) struct State {
969 pub(crate) inner: proto::Connection,
970 driver: Option<Waker>,
971 handle: ConnectionHandle,
972 on_handshake_data: Option<oneshot::Sender<()>>,
973 on_connected: Option<oneshot::Sender<bool>>,
974 connected: bool,
975 timer: Option<Pin<Box<dyn AsyncTimer>>>,
976 timer_deadline: Option<Instant>,
977 conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
978 endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
979 pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
980 pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
981 pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
982 /// Always set to Some before the connection becomes drained
983 pub(crate) error: Option<ConnectionError>,
984 /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
985 ref_count: usize,
986 socket: Arc<dyn AsyncUdpSocket>,
987 io_poller: Pin<Box<dyn UdpPoller>>,
988 runtime: Arc<dyn Runtime>,
989 send_buffer: Vec<u8>,
990 /// We buffer a transmit when the underlying I/O would block
991 buffered_transmit: Option<proto::Transmit>,
992}
993
994impl State {
995 fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
996 let now = self.runtime.now();
997 let mut transmits = 0;
998
999 let max_datagrams = self
1000 .socket
1001 .max_transmit_segments()
1002 .min(MAX_TRANSMIT_SEGMENTS);
1003
1004 loop {
1005 // Retry the last transmit, or get a new one.
1006 let t = match self.buffered_transmit.take() {
1007 Some(t) => t,
1008 None => {
1009 self.send_buffer.clear();
1010 self.send_buffer.reserve(self.inner.current_mtu() as usize);
1011 match self
1012 .inner
1013 .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1014 {
1015 Some(t) => {
1016 transmits += match t.segment_size {
1017 None => 1,
1018 Some(s) => t.size.div_ceil(s), // round up
1019 };
1020 t
1021 }
1022 None => break,
1023 }
1024 }
1025 };
1026
1027 if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1028 // Retry after a future wakeup
1029 self.buffered_transmit = Some(t);
1030 return Ok(false);
1031 }
1032
1033 let len = t.size;
1034 let retry = match self
1035 .socket
1036 .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1037 {
1038 Ok(()) => false,
1039 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1040 Err(e) => return Err(e),
1041 };
1042 if retry {
1043 // We thought the socket was writable, but it wasn't. Retry so that either another
1044 // `poll_writable` call determines that the socket is indeed not writable and
1045 // registers us for a wakeup, or the send succeeds if this really was just a
1046 // transient failure.
1047 self.buffered_transmit = Some(t);
1048 continue;
1049 }
1050
1051 if transmits >= MAX_TRANSMIT_DATAGRAMS {
1052 // TODO: What isn't ideal here yet is that if we don't poll all
1053 // datagrams that could be sent we don't go into the `app_limited`
1054 // state and CWND continues to grow until we get here the next time.
1055 // See https://github.com/quinn-rs/quinn/issues/1126
1056 return Ok(true);
1057 }
1058 }
1059
1060 Ok(false)
1061 }
1062
1063 fn forward_endpoint_events(&mut self) {
1064 while let Some(event) = self.inner.poll_endpoint_events() {
1065 // If the endpoint driver is gone, noop.
1066 let _ = self.endpoint_events.send((self.handle, event));
1067 }
1068 }
1069
1070 /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1071 fn process_conn_events(
1072 &mut self,
1073 shared: &Shared,
1074 cx: &mut Context,
1075 ) -> Result<(), ConnectionError> {
1076 loop {
1077 match self.conn_events.poll_recv(cx) {
1078 Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1079 self.socket = socket;
1080 self.io_poller = self.socket.clone().create_io_poller();
1081 self.inner.local_address_changed();
1082 }
1083 Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1084 self.inner.handle_event(event);
1085 }
1086 Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1087 self.close(error_code, reason, shared);
1088 }
1089 Poll::Ready(None) => {
1090 return Err(ConnectionError::TransportError(proto::TransportError {
1091 code: proto::TransportErrorCode::INTERNAL_ERROR,
1092 frame: None,
1093 reason: "endpoint driver future was dropped".to_string(),
1094 }));
1095 }
1096 Poll::Pending => {
1097 return Ok(());
1098 }
1099 }
1100 }
1101 }
1102
1103 fn forward_app_events(&mut self, shared: &Shared) {
1104 while let Some(event) = self.inner.poll() {
1105 use proto::Event::*;
1106 match event {
1107 HandshakeDataReady => {
1108 if let Some(x) = self.on_handshake_data.take() {
1109 let _ = x.send(());
1110 }
1111 }
1112 Connected => {
1113 self.connected = true;
1114 if let Some(x) = self.on_connected.take() {
1115 // We don't care if the on-connected future was dropped
1116 let _ = x.send(self.inner.accepted_0rtt());
1117 }
1118 if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1119 // Wake up rejected 0-RTT streams so they can fail immediately with
1120 // `ZeroRttRejected` errors.
1121 wake_all(&mut self.blocked_writers);
1122 wake_all(&mut self.blocked_readers);
1123 wake_all_notify(&mut self.stopped);
1124 }
1125 }
1126 ConnectionLost { reason } => {
1127 self.terminate(reason, shared);
1128 }
1129 Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1130 Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1131 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1132 }
1133 Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1134 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1135 }
1136 DatagramReceived => {
1137 shared.datagram_received.notify_waiters();
1138 }
1139 DatagramsUnblocked => {
1140 shared.datagrams_unblocked.notify_waiters();
1141 }
1142 Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1143 Stream(StreamEvent::Available { dir }) => {
1144 // Might mean any number of streams are ready, so we wake up everyone
1145 shared.stream_budget_available[dir as usize].notify_waiters();
1146 }
1147 Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1148 Stream(StreamEvent::Stopped { id, .. }) => {
1149 wake_stream_notify(id, &mut self.stopped);
1150 wake_stream(id, &mut self.blocked_writers);
1151 }
1152 }
1153 }
1154 }
1155
1156 fn drive_timer(&mut self, cx: &mut Context) -> bool {
1157 // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1158 // timer is registered with the runtime (and check whether it's already
1159 // expired).
1160 match self.inner.poll_timeout() {
1161 Some(deadline) => {
1162 if let Some(delay) = &mut self.timer {
1163 // There is no need to reset the tokio timer if the deadline
1164 // did not change
1165 if self
1166 .timer_deadline
1167 .map(|current_deadline| current_deadline != deadline)
1168 .unwrap_or(true)
1169 {
1170 delay.as_mut().reset(deadline);
1171 }
1172 } else {
1173 self.timer = Some(self.runtime.new_timer(deadline));
1174 }
1175 // Store the actual expiration time of the timer
1176 self.timer_deadline = Some(deadline);
1177 }
1178 None => {
1179 self.timer_deadline = None;
1180 return false;
1181 }
1182 }
1183
1184 if self.timer_deadline.is_none() {
1185 return false;
1186 }
1187
1188 let delay = self
1189 .timer
1190 .as_mut()
1191 .expect("timer must exist in this state")
1192 .as_mut();
1193 if delay.poll(cx).is_pending() {
1194 // Since there wasn't a timeout event, there is nothing new
1195 // for the connection to do
1196 return false;
1197 }
1198
1199 // A timer expired, so the caller needs to check for
1200 // new transmits, which might cause new timers to be set.
1201 self.inner.handle_timeout(self.runtime.now());
1202 self.timer_deadline = None;
1203 true
1204 }
1205
1206 /// Wake up a blocked `Driver` task to process I/O
1207 pub(crate) fn wake(&mut self) {
1208 if let Some(x) = self.driver.take() {
1209 x.wake();
1210 }
1211 }
1212
1213 /// Used to wake up all blocked futures when the connection becomes closed for any reason
1214 fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1215 self.error = Some(reason.clone());
1216 if let Some(x) = self.on_handshake_data.take() {
1217 let _ = x.send(());
1218 }
1219 wake_all(&mut self.blocked_writers);
1220 wake_all(&mut self.blocked_readers);
1221 shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1222 shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1223 shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1224 shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1225 shared.datagram_received.notify_waiters();
1226 shared.datagrams_unblocked.notify_waiters();
1227 if let Some(x) = self.on_connected.take() {
1228 let _ = x.send(false);
1229 }
1230 wake_all_notify(&mut self.stopped);
1231 shared.closed.notify_waiters();
1232 }
1233
1234 fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1235 self.inner.close(self.runtime.now(), error_code, reason);
1236 self.terminate(ConnectionError::LocallyClosed, shared);
1237 self.wake();
1238 }
1239
1240 /// Close for a reason other than the application's explicit request
1241 pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1242 self.close(0u32.into(), Bytes::new(), shared);
1243 }
1244
1245 pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1246 if self.inner.is_handshaking()
1247 || self.inner.accepted_0rtt()
1248 || self.inner.side().is_server()
1249 {
1250 Ok(())
1251 } else {
1252 Err(())
1253 }
1254 }
1255}
1256
1257impl Drop for State {
1258 fn drop(&mut self) {
1259 if !self.inner.is_drained() {
1260 // Ensure the endpoint can tidy up
1261 let _ = self
1262 .endpoint_events
1263 .send((self.handle, proto::EndpointEvent::drained()));
1264 }
1265 }
1266}
1267
1268impl fmt::Debug for State {
1269 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1270 f.debug_struct("State").field("inner", &self.inner).finish()
1271 }
1272}
1273
1274fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1275 if let Some(waker) = wakers.remove(&stream_id) {
1276 waker.wake();
1277 }
1278}
1279
1280fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1281 wakers.drain().for_each(|(_, waker)| waker.wake())
1282}
1283
1284fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1285 if let Some(notify) = wakers.remove(&stream_id) {
1286 notify.notify_waiters()
1287 }
1288}
1289
1290fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1291 wakers
1292 .drain()
1293 .for_each(|(_, notify)| notify.notify_waiters())
1294}
1295
1296/// Errors that can arise when sending a datagram
1297#[derive(Debug, Error, Clone, Eq, PartialEq)]
1298pub enum SendDatagramError {
1299 /// The peer does not support receiving datagram frames
1300 #[error("datagrams not supported by peer")]
1301 UnsupportedByPeer,
1302 /// Datagram support is disabled locally
1303 #[error("datagram support disabled")]
1304 Disabled,
1305 /// The datagram is larger than the connection can currently accommodate
1306 ///
1307 /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1308 /// exceeded.
1309 #[error("datagram too large")]
1310 TooLarge,
1311 /// The connection was lost
1312 #[error("connection lost")]
1313 ConnectionLost(#[from] ConnectionError),
1314}
1315
1316/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1317///
1318/// This limits the amount of CPU resources consumed by datagram generation,
1319/// and allows other tasks (like receiving ACKs) to run in between.
1320const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1321
1322/// The maximum amount of datagrams that are sent in a single transmit
1323///
1324/// This can be lower than the maximum platform capabilities, to avoid excessive
1325/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1326/// that numbers around 10 are a good compromise.
1327const MAX_TRANSMIT_SEGMENTS: usize = 10;