1use std::{cmp, net::SocketAddr};
2
3use tracing::trace;
4
5use super::{
6 mtud::MtuDiscovery,
7 pacing::Pacer,
8 spaces::{PacketSpace, SentPacket},
9};
10use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
11
12#[cfg(feature = "qlog")]
13use qlog::events::quic::MetricsUpdated;
14
15pub(super) struct PathData {
17 pub(super) remote: SocketAddr,
18 pub(super) rtt: RttEstimator,
19 pub(super) sending_ecn: bool,
21 pub(super) congestion: Box<dyn congestion::Controller>,
23 pub(super) pacing: Pacer,
25 pub(super) challenge: Option<u64>,
26 pub(super) challenge_pending: bool,
27 pub(super) validated: bool,
32 pub(super) total_sent: u64,
34 pub(super) total_recvd: u64,
36 pub(super) mtud: MtuDiscovery,
38 pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
42 pub(super) in_flight: InFlight,
43 first_packet: Option<u64>,
48
49 #[cfg(feature = "qlog")]
51 recovery_metrics: RecoveryMetrics,
52
53 generation: u64,
55}
56
57impl PathData {
58 pub(super) fn new(
59 remote: SocketAddr,
60 allow_mtud: bool,
61 peer_max_udp_payload_size: Option<u16>,
62 generation: u64,
63 now: Instant,
64 config: &TransportConfig,
65 ) -> Self {
66 let congestion = config
67 .congestion_controller_factory
68 .clone()
69 .build(now, config.get_initial_mtu());
70 Self {
71 remote,
72 rtt: RttEstimator::new(config.initial_rtt),
73 sending_ecn: true,
74 pacing: Pacer::new(
75 config.initial_rtt,
76 congestion.initial_window(),
77 config.get_initial_mtu(),
78 now,
79 ),
80 congestion,
81 challenge: None,
82 challenge_pending: false,
83 validated: false,
84 total_sent: 0,
85 total_recvd: 0,
86 mtud: config
87 .mtu_discovery_config
88 .as_ref()
89 .filter(|_| allow_mtud)
90 .map_or(
91 MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
92 |mtud_config| {
93 MtuDiscovery::new(
94 config.get_initial_mtu(),
95 config.min_mtu,
96 peer_max_udp_payload_size,
97 mtud_config.clone(),
98 )
99 },
100 ),
101 first_packet_after_rtt_sample: None,
102 in_flight: InFlight::new(),
103 first_packet: None,
104 #[cfg(feature = "qlog")]
105 recovery_metrics: RecoveryMetrics::default(),
106 generation,
107 }
108 }
109
110 pub(super) fn from_previous(
111 remote: SocketAddr,
112 prev: &Self,
113 generation: u64,
114 now: Instant,
115 ) -> Self {
116 let congestion = prev.congestion.clone_box();
117 let smoothed_rtt = prev.rtt.get();
118 Self {
119 remote,
120 rtt: prev.rtt,
121 pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
122 sending_ecn: true,
123 congestion,
124 challenge: None,
125 challenge_pending: false,
126 validated: false,
127 total_sent: 0,
128 total_recvd: 0,
129 mtud: prev.mtud.clone(),
130 first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
131 in_flight: InFlight::new(),
132 first_packet: None,
133 #[cfg(feature = "qlog")]
134 recovery_metrics: prev.recovery_metrics.clone(),
135 generation,
136 }
137 }
138
139 pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
143 self.rtt = RttEstimator::new(config.initial_rtt);
144 self.congestion = config
145 .congestion_controller_factory
146 .clone()
147 .build(now, config.get_initial_mtu());
148 self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
149 }
150
151 pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
154 !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
155 }
156
157 pub(super) fn current_mtu(&self) -> u16 {
159 self.mtud.current_mtu()
160 }
161
162 pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
164 self.in_flight.insert(&packet);
165 if self.first_packet.is_none() {
166 self.first_packet = Some(pn);
167 }
168 if let Some(forgotten) = space.sent(pn, packet) {
169 self.remove_in_flight(&forgotten);
170 }
171 }
172
173 pub(super) fn remove_in_flight(&mut self, packet: &SentPacket) -> bool {
176 if packet.path_generation != self.generation {
177 return false;
178 }
179 self.in_flight.remove(packet);
180 true
181 }
182
183 #[cfg(feature = "qlog")]
184 pub(super) fn qlog_recovery_metrics(&mut self, pto_count: u32) -> Option<MetricsUpdated> {
185 let controller_metrics = self.congestion.metrics();
186
187 let metrics = RecoveryMetrics {
188 min_rtt: Some(self.rtt.min),
189 smoothed_rtt: Some(self.rtt.get()),
190 latest_rtt: Some(self.rtt.latest),
191 rtt_variance: Some(self.rtt.var),
192 pto_count: Some(pto_count),
193 bytes_in_flight: Some(self.in_flight.bytes),
194 packets_in_flight: Some(self.in_flight.ack_eliciting),
195
196 congestion_window: Some(controller_metrics.congestion_window),
197 ssthresh: controller_metrics.ssthresh,
198 pacing_rate: controller_metrics.pacing_rate,
199 };
200
201 let event = metrics.to_qlog_event(&self.recovery_metrics);
202 self.recovery_metrics = metrics;
203 event
204 }
205
206 pub(super) fn generation(&self) -> u64 {
207 self.generation
208 }
209}
210
211#[cfg(feature = "qlog")]
215#[derive(Default, Clone, PartialEq)]
216#[non_exhaustive]
217struct RecoveryMetrics {
218 pub min_rtt: Option<Duration>,
219 pub smoothed_rtt: Option<Duration>,
220 pub latest_rtt: Option<Duration>,
221 pub rtt_variance: Option<Duration>,
222 pub pto_count: Option<u32>,
223 pub bytes_in_flight: Option<u64>,
224 pub packets_in_flight: Option<u64>,
225 pub congestion_window: Option<u64>,
226 pub ssthresh: Option<u64>,
227 pub pacing_rate: Option<u64>,
228}
229
230#[cfg(feature = "qlog")]
231impl RecoveryMetrics {
232 fn retain_updated(&self, previous: &Self) -> Self {
234 macro_rules! keep_if_changed {
235 ($name:ident) => {
236 if previous.$name == self.$name {
237 None
238 } else {
239 self.$name
240 }
241 };
242 }
243
244 Self {
245 min_rtt: keep_if_changed!(min_rtt),
246 smoothed_rtt: keep_if_changed!(smoothed_rtt),
247 latest_rtt: keep_if_changed!(latest_rtt),
248 rtt_variance: keep_if_changed!(rtt_variance),
249 pto_count: keep_if_changed!(pto_count),
250 bytes_in_flight: keep_if_changed!(bytes_in_flight),
251 packets_in_flight: keep_if_changed!(packets_in_flight),
252 congestion_window: keep_if_changed!(congestion_window),
253 ssthresh: keep_if_changed!(ssthresh),
254 pacing_rate: keep_if_changed!(pacing_rate),
255 }
256 }
257
258 fn to_qlog_event(&self, previous: &Self) -> Option<MetricsUpdated> {
260 let updated = self.retain_updated(previous);
261
262 if updated == Self::default() {
263 return None;
264 }
265
266 Some(MetricsUpdated {
267 min_rtt: updated.min_rtt.map(|rtt| rtt.as_secs_f32()),
268 smoothed_rtt: updated.smoothed_rtt.map(|rtt| rtt.as_secs_f32()),
269 latest_rtt: updated.latest_rtt.map(|rtt| rtt.as_secs_f32()),
270 rtt_variance: updated.rtt_variance.map(|rtt| rtt.as_secs_f32()),
271 pto_count: updated
272 .pto_count
273 .map(|count| count.try_into().unwrap_or(u16::MAX)),
274 bytes_in_flight: updated.bytes_in_flight,
275 packets_in_flight: updated.packets_in_flight,
276 congestion_window: updated.congestion_window,
277 ssthresh: updated.ssthresh,
278 pacing_rate: updated.pacing_rate,
279 })
280 }
281}
282
283#[derive(Copy, Clone)]
285pub struct RttEstimator {
286 latest: Duration,
288 smoothed: Option<Duration>,
290 var: Duration,
292 min: Duration,
294}
295
296impl RttEstimator {
297 fn new(initial_rtt: Duration) -> Self {
298 Self {
299 latest: initial_rtt,
300 smoothed: None,
301 var: initial_rtt / 2,
302 min: initial_rtt,
303 }
304 }
305
306 pub fn get(&self) -> Duration {
308 self.smoothed.unwrap_or(self.latest)
309 }
310
311 pub fn conservative(&self) -> Duration {
316 self.get().max(self.latest)
317 }
318
319 pub fn min(&self) -> Duration {
321 self.min
322 }
323
324 pub(crate) fn pto_base(&self) -> Duration {
326 self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
327 }
328
329 pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
330 self.latest = rtt;
331 self.min = cmp::min(self.min, self.latest);
333 if let Some(smoothed) = self.smoothed {
335 let adjusted_rtt = if self.min + ack_delay <= self.latest {
336 self.latest - ack_delay
337 } else {
338 self.latest
339 };
340 let var_sample = if smoothed > adjusted_rtt {
341 smoothed - adjusted_rtt
342 } else {
343 adjusted_rtt - smoothed
344 };
345 self.var = (3 * self.var + var_sample) / 4;
346 self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
347 } else {
348 self.smoothed = Some(self.latest);
349 self.var = self.latest / 2;
350 self.min = self.latest;
351 }
352 }
353}
354
355#[derive(Default)]
356pub(crate) struct PathResponses {
357 pending: Vec<PathResponse>,
358}
359
360impl PathResponses {
361 pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
362 const MAX_PATH_RESPONSES: usize = 16;
364 let response = PathResponse {
365 packet,
366 token,
367 remote,
368 };
369 let existing = self.pending.iter_mut().find(|x| x.remote == remote);
370 if let Some(existing) = existing {
371 if existing.packet <= packet {
373 *existing = response;
374 }
375 return;
376 }
377 if self.pending.len() < MAX_PATH_RESPONSES {
378 self.pending.push(response);
379 } else {
380 trace!("ignoring excessive PATH_CHALLENGE");
383 }
384 }
385
386 pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
387 let response = *self.pending.last()?;
388 if response.remote == remote {
389 return None;
392 }
393 self.pending.pop();
394 Some((response.token, response.remote))
395 }
396
397 pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
398 let response = *self.pending.last()?;
399 if response.remote != remote {
400 return None;
403 }
404 self.pending.pop();
405 Some(response.token)
406 }
407
408 pub(crate) fn is_empty(&self) -> bool {
409 self.pending.is_empty()
410 }
411}
412
413#[derive(Copy, Clone)]
414struct PathResponse {
415 packet: u64,
417 token: u64,
418 remote: SocketAddr,
420}
421
422pub(super) struct InFlight {
425 pub(super) bytes: u64,
430 pub(super) ack_eliciting: u64,
436}
437
438impl InFlight {
439 fn new() -> Self {
440 Self {
441 bytes: 0,
442 ack_eliciting: 0,
443 }
444 }
445
446 fn insert(&mut self, packet: &SentPacket) {
447 self.bytes += u64::from(packet.size);
448 self.ack_eliciting += u64::from(packet.ack_eliciting);
449 }
450
451 fn remove(&mut self, packet: &SentPacket) {
453 self.bytes -= u64::from(packet.size);
454 self.ack_eliciting -= u64::from(packet.ack_eliciting);
455 }
456}