quinn_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use tracing::trace;
4
5use super::{
6    mtud::MtuDiscovery,
7    pacing::Pacer,
8    spaces::{PacketSpace, SentPacket},
9};
10use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
11
12#[cfg(feature = "qlog")]
13use qlog::events::quic::MetricsUpdated;
14
15/// Description of a particular network path
16pub(super) struct PathData {
17    pub(super) remote: SocketAddr,
18    pub(super) rtt: RttEstimator,
19    /// Whether we're enabling ECN on outgoing packets
20    pub(super) sending_ecn: bool,
21    /// Congestion controller state
22    pub(super) congestion: Box<dyn congestion::Controller>,
23    /// Pacing state
24    pub(super) pacing: Pacer,
25    pub(super) challenge: Option<u64>,
26    pub(super) challenge_pending: bool,
27    /// Whether we're certain the peer can both send and receive on this address
28    ///
29    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
30    /// migration. Always true for clients.
31    pub(super) validated: bool,
32    /// Total size of all UDP datagrams sent on this path
33    pub(super) total_sent: u64,
34    /// Total size of all UDP datagrams received on this path
35    pub(super) total_recvd: u64,
36    /// The state of the MTU discovery process
37    pub(super) mtud: MtuDiscovery,
38    /// Packet number of the first packet sent after an RTT sample was collected on this path
39    ///
40    /// Used in persistent congestion determination.
41    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
42    pub(super) in_flight: InFlight,
43    /// Number of the first packet sent on this path
44    ///
45    /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
46    /// a packet was sent on a later path.
47    first_packet: Option<u64>,
48
49    /// Snapshot of the qlog recovery metrics
50    #[cfg(feature = "qlog")]
51    recovery_metrics: RecoveryMetrics,
52
53    /// Tag uniquely identifying a path in a connection
54    generation: u64,
55}
56
57impl PathData {
58    pub(super) fn new(
59        remote: SocketAddr,
60        allow_mtud: bool,
61        peer_max_udp_payload_size: Option<u16>,
62        generation: u64,
63        now: Instant,
64        config: &TransportConfig,
65    ) -> Self {
66        let congestion = config
67            .congestion_controller_factory
68            .clone()
69            .build(now, config.get_initial_mtu());
70        Self {
71            remote,
72            rtt: RttEstimator::new(config.initial_rtt),
73            sending_ecn: true,
74            pacing: Pacer::new(
75                config.initial_rtt,
76                congestion.initial_window(),
77                config.get_initial_mtu(),
78                now,
79            ),
80            congestion,
81            challenge: None,
82            challenge_pending: false,
83            validated: false,
84            total_sent: 0,
85            total_recvd: 0,
86            mtud: config
87                .mtu_discovery_config
88                .as_ref()
89                .filter(|_| allow_mtud)
90                .map_or(
91                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
92                    |mtud_config| {
93                        MtuDiscovery::new(
94                            config.get_initial_mtu(),
95                            config.min_mtu,
96                            peer_max_udp_payload_size,
97                            mtud_config.clone(),
98                        )
99                    },
100                ),
101            first_packet_after_rtt_sample: None,
102            in_flight: InFlight::new(),
103            first_packet: None,
104            #[cfg(feature = "qlog")]
105            recovery_metrics: RecoveryMetrics::default(),
106            generation,
107        }
108    }
109
110    pub(super) fn from_previous(
111        remote: SocketAddr,
112        prev: &Self,
113        generation: u64,
114        now: Instant,
115    ) -> Self {
116        let congestion = prev.congestion.clone_box();
117        let smoothed_rtt = prev.rtt.get();
118        Self {
119            remote,
120            rtt: prev.rtt,
121            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
122            sending_ecn: true,
123            congestion,
124            challenge: None,
125            challenge_pending: false,
126            validated: false,
127            total_sent: 0,
128            total_recvd: 0,
129            mtud: prev.mtud.clone(),
130            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
131            in_flight: InFlight::new(),
132            first_packet: None,
133            #[cfg(feature = "qlog")]
134            recovery_metrics: prev.recovery_metrics.clone(),
135            generation,
136        }
137    }
138
139    /// Resets RTT, congestion control and MTU states.
140    ///
141    /// This is useful when it is known the underlying path has changed.
142    pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
143        self.rtt = RttEstimator::new(config.initial_rtt);
144        self.congestion = config
145            .congestion_controller_factory
146            .clone()
147            .build(now, config.get_initial_mtu());
148        self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
149    }
150
151    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
152    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
153    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
154        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
155    }
156
157    /// Returns the path's current MTU
158    pub(super) fn current_mtu(&self) -> u16 {
159        self.mtud.current_mtu()
160    }
161
162    /// Account for transmission of `packet` with number `pn` in `space`
163    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
164        self.in_flight.insert(&packet);
165        if self.first_packet.is_none() {
166            self.first_packet = Some(pn);
167        }
168        if let Some(forgotten) = space.sent(pn, packet) {
169            self.remove_in_flight(&forgotten);
170        }
171    }
172
173    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
174    /// `false` if `pn` was sent before this path was established.
175    pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
176        if packet.path_generation != self.generation {
177            return false;
178        }
179        self.in_flight.remove(packet);
180        true
181    }
182
183    #[cfg(feature = "qlog")]
184    pub(super) fn qlog_recovery_metrics(&mut self, pto_count: u32) -> Option<MetricsUpdated> {
185        let controller_metrics = self.congestion.metrics();
186
187        let metrics = RecoveryMetrics {
188            min_rtt: Some(self.rtt.min),
189            smoothed_rtt: Some(self.rtt.get()),
190            latest_rtt: Some(self.rtt.latest),
191            rtt_variance: Some(self.rtt.var),
192            pto_count: Some(pto_count),
193            bytes_in_flight: Some(self.in_flight.bytes),
194            packets_in_flight: Some(self.in_flight.ack_eliciting),
195
196            congestion_window: Some(controller_metrics.congestion_window),
197            ssthresh: controller_metrics.ssthresh,
198            pacing_rate: controller_metrics.pacing_rate,
199        };
200
201        let event = metrics.to_qlog_event(&self.recovery_metrics);
202        self.recovery_metrics = metrics;
203        event
204    }
205
206    pub(super) fn generation(&self) -> u64 {
207        self.generation
208    }
209}
210
211/// Congestion metrics as described in [`recovery_metrics_updated`].
212///
213/// [`recovery_metrics_updated`]: https://datatracker.ietf.org/doc/html/draft-ietf-quic-qlog-quic-events.html#name-recovery_metrics_updated
214#[cfg(feature = "qlog")]
215#[derive(Default, Clone, PartialEq)]
216#[non_exhaustive]
217struct RecoveryMetrics {
218    pub min_rtt: Option<Duration>,
219    pub smoothed_rtt: Option<Duration>,
220    pub latest_rtt: Option<Duration>,
221    pub rtt_variance: Option<Duration>,
222    pub pto_count: Option<u32>,
223    pub bytes_in_flight: Option<u64>,
224    pub packets_in_flight: Option<u64>,
225    pub congestion_window: Option<u64>,
226    pub ssthresh: Option<u64>,
227    pub pacing_rate: Option<u64>,
228}
229
230#[cfg(feature = "qlog")]
231impl RecoveryMetrics {
232    /// Retain only values that have been updated since the last snapshot.
233    fn retain_updated(&self, previous: &Self) -> Self {
234        macro_rules! keep_if_changed {
235            ($name:ident) => {
236                if previous.$name == self.$name {
237                    None
238                } else {
239                    self.$name
240                }
241            };
242        }
243
244        Self {
245            min_rtt: keep_if_changed!(min_rtt),
246            smoothed_rtt: keep_if_changed!(smoothed_rtt),
247            latest_rtt: keep_if_changed!(latest_rtt),
248            rtt_variance: keep_if_changed!(rtt_variance),
249            pto_count: keep_if_changed!(pto_count),
250            bytes_in_flight: keep_if_changed!(bytes_in_flight),
251            packets_in_flight: keep_if_changed!(packets_in_flight),
252            congestion_window: keep_if_changed!(congestion_window),
253            ssthresh: keep_if_changed!(ssthresh),
254            pacing_rate: keep_if_changed!(pacing_rate),
255        }
256    }
257
258    /// Emit a `MetricsUpdated` event containing only updated values
259    fn to_qlog_event(&self, previous: &Self) -> Option<MetricsUpdated> {
260        let updated = self.retain_updated(previous);
261
262        if updated == Self::default() {
263            return None;
264        }
265
266        Some(MetricsUpdated {
267            min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
268            smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
269            latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
270            rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
271            pto_count: updated
272                .pto_count
273                .map(|count| count.try_into().unwrap_or(u16::MAX)),
274            bytes_in_flight: updated.bytes_in_flight,
275            packets_in_flight: updated.packets_in_flight,
276            congestion_window: updated.congestion_window,
277            ssthresh: updated.ssthresh,
278            pacing_rate: updated.pacing_rate,
279        })
280    }
281}
282
283/// RTT estimation for a particular network path
284#[derive(Copy, Clone)]
285pub struct RttEstimator {
286    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
287    latest: Duration,
288    /// The smoothed RTT of the connection, computed as described in RFC6298
289    smoothed: Option<Duration>,
290    /// The RTT variance, computed as described in RFC6298
291    var: Duration,
292    /// The minimum RTT seen in the connection, ignoring ack delay.
293    min: Duration,
294}
295
296impl RttEstimator {
297    fn new(initial_rtt: Duration) -> Self {
298        Self {
299            latest: initial_rtt,
300            smoothed: None,
301            var: initial_rtt / 2,
302            min: initial_rtt,
303        }
304    }
305
306    /// The current best RTT estimation.
307    pub fn get(&self) -> Duration {
308        self.smoothed.unwrap_or(self.latest)
309    }
310
311    /// Conservative estimate of RTT
312    ///
313    /// Takes the maximum of smoothed and latest RTT, as recommended
314    /// in 6.1.2 of the recovery spec (draft 29).
315    pub fn conservative(&self) -> Duration {
316        self.get().max(self.latest)
317    }
318
319    /// Minimum RTT registered so far for this estimator.
320    pub fn min(&self) -> Duration {
321        self.min
322    }
323
324    // PTO computed as described in RFC9002#6.2.1
325    pub(crate) fn pto_base(&self) -> Duration {
326        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
327    }
328
329    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
330        self.latest = rtt;
331        // min_rtt ignores ack delay.
332        self.min = cmp::min(self.min, self.latest);
333        // Based on RFC6298.
334        if let Some(smoothed) = self.smoothed {
335            let adjusted_rtt = if self.min + ack_delay <= self.latest {
336                self.latest - ack_delay
337            } else {
338                self.latest
339            };
340            let var_sample = if smoothed > adjusted_rtt {
341                smoothed - adjusted_rtt
342            } else {
343                adjusted_rtt - smoothed
344            };
345            self.var = (3 * self.var + var_sample) / 4;
346            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
347        } else {
348            self.smoothed = Some(self.latest);
349            self.var = self.latest / 2;
350            self.min = self.latest;
351        }
352    }
353}
354
355#[derive(Default)]
356pub(crate) struct PathResponses {
357    pending: Vec<PathResponse>,
358}
359
360impl PathResponses {
361    pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
362        /// Arbitrary permissive limit to prevent abuse
363        const MAX_PATH_RESPONSES: usize = 16;
364        let response = PathResponse {
365            packet,
366            token,
367            remote,
368        };
369        let existing = self.pending.iter_mut().find(|x| x.remote == remote);
370        if let Some(existing) = existing {
371            // Update a queued response
372            if existing.packet <= packet {
373                *existing = response;
374            }
375            return;
376        }
377        if self.pending.len() < MAX_PATH_RESPONSES {
378            self.pending.push(response);
379        } else {
380            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
381            // older challenges.
382            trace!("ignoring excessive PATH_CHALLENGE");
383        }
384    }
385
386    pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
387        let response = *self.pending.last()?;
388        if response.remote == remote {
389            // We don't bother searching further because we expect that the on-path response will
390            // get drained in the immediate future by a call to `pop_on_path`
391            return None;
392        }
393        self.pending.pop();
394        Some((response.token, response.remote))
395    }
396
397    pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
398        let response = *self.pending.last()?;
399        if response.remote != remote {
400            // We don't bother searching further because we expect that the off-path response will
401            // get drained in the immediate future by a call to `pop_off_path`
402            return None;
403        }
404        self.pending.pop();
405        Some(response.token)
406    }
407
408    pub(crate) fn is_empty(&self) -> bool {
409        self.pending.is_empty()
410    }
411}
412
413#[derive(Copy, Clone)]
414struct PathResponse {
415    /// The packet number the corresponding PATH_CHALLENGE was received in
416    packet: u64,
417    token: u64,
418    /// The address the corresponding PATH_CHALLENGE was received from
419    remote: SocketAddr,
420}
421
422/// Summary statistics of packets that have been sent on a particular path, but which have not yet
423/// been acked or deemed lost
424pub(super) struct InFlight {
425    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
426    ///
427    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
428    /// count towards this to ensure congestion control does not impede congestion feedback.
429    pub(super) bytes: u64,
430    /// Number of packets in flight containing frames other than ACK and PADDING
431    ///
432    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
433    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
434    /// also be nonzero.
435    pub(super) ack_eliciting: u64,
436}
437
438impl InFlight {
439    fn new() -> Self {
440        Self {
441            bytes: 0,
442            ack_eliciting: 0,
443        }
444    }
445
446    fn insert(&mut self, packet: &SentPacket) {
447        self.bytes += u64::from(packet.size);
448        self.ack_eliciting += u64::from(packet.ack_eliciting);
449    }
450
451    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
452    fn remove(&mut self, packet: &SentPacket) {
453        self.bytes -= u64::from(packet.size);
454        self.ack_eliciting -= u64::from(packet.ack_eliciting);
455    }
456}