ndn_transport/congestion.rs
1//! Consumer-side congestion control for NDN.
2//!
3//! Provides window-based algorithms that react to Data arrivals, congestion
4//! marks (NDNLPv2 CongestionMark), and timeouts. Consumers use these to
5//! regulate how many Interests are in flight.
6//!
7//! # Design
8//!
9//! `CongestionController` is an enum (not a trait) — avoids dynamic dispatch
10//! and matches the `RtoStrategy`/`ReliabilityConfig` pattern used elsewhere.
11//! All state is internal; callers only see `window()` and the event methods.
12//!
13//! # Example
14//!
15//! ```rust
16//! use ndn_transport::CongestionController;
17//!
18//! let mut cc = CongestionController::default(); // AIMD
19//! assert_eq!(cc.window(), 2.0);
20//!
21//! // Data arrived successfully — grow window.
22//! cc.on_data();
23//! assert!(cc.window() > 2.0);
24//!
25//! // Congestion mark received — cut window.
26//! cc.on_congestion_mark();
27//! assert!(cc.window() < 3.0);
28//! ```
29
30/// Consumer-side congestion control algorithm.
31///
32/// Each variant carries its own internal state. The caller drives events
33/// (`on_data`, `on_congestion_mark`, `on_timeout`) and reads the current
34/// window via `window()`.
35///
36/// # Variants
37///
38/// | Algorithm | Best for | Behaviour |
39/// |-----------|----------|-----------|
40/// | `Aimd` | General-purpose, matches NFD consumers | Linear increase, multiplicative decrease |
41/// | `Cubic` | High-bandwidth, long-RTT links | Cubic function ramp-up after loss |
42/// | `Fixed` | Benchmarks, known-capacity links | Constant window, no adaptation |
43#[derive(Debug, Clone)]
44pub enum CongestionController {
45 /// Additive-Increase Multiplicative-Decrease.
46 ///
47 /// Standard algorithm used by `ndncatchunks`. Window grows by
48 /// `additive_increase / window` per ack (≈ +1 per RTT) and is
49 /// multiplied by `multiplicative_decrease` on congestion/timeout.
50 Aimd {
51 window: f64,
52 min_window: f64,
53 max_window: f64,
54 additive_increase: f64,
55 multiplicative_decrease: f64,
56 /// Slow-start threshold. While `window < ssthresh`, window grows
57 /// by 1.0 per ack (exponential); above it, grows additively.
58 ssthresh: f64,
59 },
60 /// CUBIC (RFC 8312).
61 ///
62 /// Concave/convex window growth based on time since last loss event.
63 /// More aggressive ramp-up than AIMD on high-bandwidth links.
64 Cubic {
65 window: f64,
66 min_window: f64,
67 max_window: f64,
68 /// Window size at last loss event.
69 w_max: f64,
70 /// Ack count since last loss event (proxy for time).
71 acks_since_loss: u64,
72 /// CUBIC scaling constant (default: 0.4).
73 c: f64,
74 /// Multiplicative decrease factor (default: 0.7).
75 beta: f64,
76 ssthresh: f64,
77 },
78 /// Fixed window — no adaptation.
79 Fixed { window: f64 },
80}
81
82// ─── Defaults ────────────────────────────────────────────────────────────────
83
84const DEFAULT_INITIAL_WINDOW: f64 = 2.0;
85const DEFAULT_MIN_WINDOW: f64 = 2.0;
86const DEFAULT_MAX_WINDOW: f64 = 65536.0;
87const DEFAULT_SSTHRESH: f64 = f64::MAX;
88
89// AIMD defaults (matches ndncatchunks).
90const AIMD_ADDITIVE_INCREASE: f64 = 1.0;
91const AIMD_MULTIPLICATIVE_DECREASE: f64 = 0.5;
92
93// CUBIC defaults (RFC 8312).
94const CUBIC_C: f64 = 0.4;
95const CUBIC_BETA: f64 = 0.7;
96
97impl Default for CongestionController {
98 /// Default: AIMD with standard parameters.
99 fn default() -> Self {
100 Self::aimd()
101 }
102}
103
104impl CongestionController {
105 /// AIMD with standard parameters (matches `ndncatchunks`).
106 pub fn aimd() -> Self {
107 Self::Aimd {
108 window: DEFAULT_INITIAL_WINDOW,
109 min_window: DEFAULT_MIN_WINDOW,
110 max_window: DEFAULT_MAX_WINDOW,
111 additive_increase: AIMD_ADDITIVE_INCREASE,
112 multiplicative_decrease: AIMD_MULTIPLICATIVE_DECREASE,
113 ssthresh: DEFAULT_SSTHRESH,
114 }
115 }
116
117 /// CUBIC with standard parameters (RFC 8312).
118 pub fn cubic() -> Self {
119 Self::Cubic {
120 window: DEFAULT_INITIAL_WINDOW,
121 min_window: DEFAULT_MIN_WINDOW,
122 max_window: DEFAULT_MAX_WINDOW,
123 w_max: DEFAULT_INITIAL_WINDOW,
124 acks_since_loss: 0,
125 c: CUBIC_C,
126 beta: CUBIC_BETA,
127 ssthresh: DEFAULT_SSTHRESH,
128 }
129 }
130
131 /// Fixed window (no adaptation).
132 pub fn fixed(window: f64) -> Self {
133 Self::Fixed { window }
134 }
135
136 // ─── Builder-style parameter setters ────────────────────────────────
137
138 /// Set the initial and current window size.
139 ///
140 /// For Cubic, this also updates `w_max` to match so the cubic
141 /// formula's "recovery target" reflects the user's intent. Without
142 /// this, a caller that does `cubic().with_window(N).with_ssthresh(N)`
143 /// for large N would leave `w_max` at its tiny default value (2.0),
144 /// and the first `on_data()` call would take the cubic branch
145 /// (since `window >= ssthresh`) and collapse the window back toward
146 /// `w_max = 2`. See the `cubic_does_not_collapse_at_large_initial_window`
147 /// regression test.
148 pub fn with_window(mut self, w: f64) -> Self {
149 match &mut self {
150 Self::Aimd { window, .. } | Self::Fixed { window } => *window = w,
151 Self::Cubic { window, w_max, .. } => {
152 *window = w;
153 // Treat the new initial window as the current "best"
154 // so the cubic formula's post-loss trajectory points
155 // at this window, not the default 2.0.
156 if *w_max < w {
157 *w_max = w;
158 }
159 }
160 }
161 self
162 }
163
164 /// Set the minimum window (floor after decrease). Ignored by Fixed.
165 pub fn with_min_window(mut self, w: f64) -> Self {
166 match &mut self {
167 Self::Aimd { min_window, .. } | Self::Cubic { min_window, .. } => *min_window = w,
168 Self::Fixed { .. } => {}
169 }
170 self
171 }
172
173 /// Set the maximum window (ceiling). Ignored by Fixed.
174 pub fn with_max_window(mut self, w: f64) -> Self {
175 match &mut self {
176 Self::Aimd { max_window, .. } | Self::Cubic { max_window, .. } => *max_window = w,
177 Self::Fixed { .. } => {}
178 }
179 self
180 }
181
182 /// Set AIMD additive increase per RTT (default: 1.0). Only affects AIMD.
183 pub fn with_additive_increase(mut self, ai: f64) -> Self {
184 if let Self::Aimd {
185 additive_increase, ..
186 } = &mut self
187 {
188 *additive_increase = ai;
189 }
190 self
191 }
192
193 /// Set AIMD/CUBIC multiplicative decrease factor (default: 0.5 for AIMD, 0.7 for CUBIC).
194 pub fn with_decrease_factor(mut self, md: f64) -> Self {
195 match &mut self {
196 Self::Aimd {
197 multiplicative_decrease,
198 ..
199 } => *multiplicative_decrease = md,
200 Self::Cubic { beta, .. } => *beta = md,
201 Self::Fixed { .. } => {}
202 }
203 self
204 }
205
206 /// Set CUBIC scaling constant C (default: 0.4). Only affects CUBIC.
207 pub fn with_cubic_c(mut self, c_val: f64) -> Self {
208 if let Self::Cubic { c, .. } = &mut self {
209 *c = c_val;
210 }
211 self
212 }
213
214 /// Set the slow-start threshold.
215 ///
216 /// By default ssthresh is `f64::MAX` (unbounded slow start). Setting
217 /// this to the initial window size prevents the exponential ramp from
218 /// overshooting the link capacity on the first flow.
219 pub fn with_ssthresh(mut self, ss: f64) -> Self {
220 match &mut self {
221 Self::Aimd { ssthresh, .. } | Self::Cubic { ssthresh, .. } => *ssthresh = ss,
222 Self::Fixed { .. } => {}
223 }
224 self
225 }
226
227 /// Current window size (number of Interests allowed in flight).
228 ///
229 /// Callers should use `window().floor() as usize` for the actual limit.
230 pub fn window(&self) -> f64 {
231 match self {
232 Self::Aimd { window, .. } | Self::Cubic { window, .. } | Self::Fixed { window } => {
233 *window
234 }
235 }
236 }
237
238 /// A Data packet was received successfully (no congestion mark).
239 pub fn on_data(&mut self) {
240 match self {
241 Self::Aimd {
242 window,
243 additive_increase,
244 ssthresh,
245 max_window,
246 ..
247 } => {
248 if *window < *ssthresh {
249 // Slow start: exponential growth.
250 *window += 1.0;
251 } else {
252 // Congestion avoidance: additive increase.
253 *window += *additive_increase / *window;
254 }
255 *window = window.min(*max_window);
256 }
257 Self::Cubic {
258 window,
259 w_max,
260 acks_since_loss,
261 c,
262 beta,
263 ssthresh,
264 max_window,
265 ..
266 } => {
267 *acks_since_loss += 1;
268 if *window < *ssthresh {
269 *window += 1.0;
270 } else {
271 // CUBIC function: W(t) = C*(t - K)^3 + W_max
272 // where K = (W_max * (1-beta) / C)^(1/3)
273 // We approximate t by acks_since_loss / window (RTTs elapsed).
274 let t = *acks_since_loss as f64 / *window;
275 let k = ((*w_max * (1.0 - *beta)) / *c).cbrt();
276 let w_cubic = *c * (t - k).powi(3) + *w_max;
277 // TCP-friendly region: at least as aggressive as AIMD.
278 let w_tcp = *w_max * *beta
279 + (3.0 * (1.0 - *beta) / (1.0 + *beta))
280 * (*acks_since_loss as f64 / *window);
281 // Congestion avoidance must be *monotonic* — a
282 // successful data delivery can never justify
283 // shrinking the window. Without the `.max(*window)`
284 // clause, the cubic formula can return a value
285 // below the current window when `t < K` (i.e. we
286 // haven't "recovered" to `w_max` yet by the model's
287 // clock). That happens naturally right after a
288 // loss event but the model can also produce it at
289 // initialisation if `w_max` hasn't caught up to
290 // the user's initial window yet. The RFC 8312
291 // phrasing is "cwnd SHOULD be set to max(cwnd,
292 // W_cubic, W_est)"; we match that.
293 *window = w_cubic.max(w_tcp).max(*window);
294 }
295 *window = window.min(*max_window);
296 }
297 Self::Fixed { .. } => {}
298 }
299 }
300
301 /// A CongestionMark was received on a Data packet.
302 ///
303 /// Reduces the window but does NOT trigger retransmission — the Data
304 /// was delivered successfully, only the sending rate should decrease.
305 pub fn on_congestion_mark(&mut self) {
306 self.decrease("mark");
307 }
308
309 /// An Interest timed out (no Data received within lifetime).
310 ///
311 /// More aggressive reduction than congestion mark since timeout
312 /// indicates actual packet loss, not just queue buildup.
313 pub fn on_timeout(&mut self) {
314 self.decrease("timeout");
315 }
316
317 fn decrease(&mut self, _reason: &str) {
318 match self {
319 Self::Aimd {
320 window,
321 multiplicative_decrease,
322 min_window,
323 ssthresh,
324 ..
325 } => {
326 *ssthresh = (*window * *multiplicative_decrease).max(*min_window);
327 *window = *ssthresh;
328 }
329 Self::Cubic {
330 window,
331 w_max,
332 acks_since_loss,
333 beta,
334 min_window,
335 ssthresh,
336 ..
337 } => {
338 *w_max = *window;
339 *ssthresh = (*window * *beta).max(*min_window);
340 *window = *ssthresh;
341 *acks_since_loss = 0;
342 }
343 Self::Fixed { .. } => {}
344 }
345 }
346
347 /// Reset to initial state (e.g. on route change or new flow).
348 pub fn reset(&mut self) {
349 match self {
350 Self::Aimd {
351 window, ssthresh, ..
352 } => {
353 *window = DEFAULT_INITIAL_WINDOW;
354 *ssthresh = DEFAULT_SSTHRESH;
355 }
356 Self::Cubic {
357 window,
358 w_max,
359 acks_since_loss,
360 ssthresh,
361 ..
362 } => {
363 *window = DEFAULT_INITIAL_WINDOW;
364 *w_max = DEFAULT_INITIAL_WINDOW;
365 *acks_since_loss = 0;
366 *ssthresh = DEFAULT_SSTHRESH;
367 }
368 Self::Fixed { .. } => {}
369 }
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn aimd_slow_start() {
379 let mut cc = CongestionController::aimd();
380 assert_eq!(cc.window(), 2.0);
381 cc.on_data();
382 assert_eq!(cc.window(), 3.0); // +1 in slow start
383 cc.on_data();
384 assert_eq!(cc.window(), 4.0);
385 }
386
387 #[test]
388 fn aimd_congestion_avoidance() {
389 let mut cc = CongestionController::aimd();
390 // Force out of slow start.
391 cc.on_congestion_mark(); // ssthresh = 2*0.5 = 2, window = 2
392 assert_eq!(cc.window(), DEFAULT_MIN_WINDOW);
393
394 // Now in congestion avoidance: window grows by 1/window per ack.
395 let w_before = cc.window();
396 cc.on_data();
397 let expected = w_before + 1.0 / w_before;
398 assert!((cc.window() - expected).abs() < 1e-9);
399 }
400
401 #[test]
402 fn aimd_multiplicative_decrease() {
403 let mut cc = CongestionController::aimd();
404 // Grow window in slow start.
405 for _ in 0..10 {
406 cc.on_data();
407 }
408 let w_before = cc.window();
409 cc.on_congestion_mark();
410 assert!((cc.window() - w_before * 0.5).abs() < 1e-9);
411 }
412
413 #[test]
414 fn aimd_timeout_reduces_window() {
415 let mut cc = CongestionController::aimd();
416 for _ in 0..10 {
417 cc.on_data();
418 }
419 let w_before = cc.window();
420 cc.on_timeout();
421 assert!(cc.window() < w_before);
422 }
423
424 #[test]
425 fn aimd_respects_min_window() {
426 let mut cc = CongestionController::aimd();
427 // Repeated losses should not go below min_window.
428 for _ in 0..20 {
429 cc.on_timeout();
430 }
431 assert!(cc.window() >= DEFAULT_MIN_WINDOW);
432 }
433
434 #[test]
435 fn cubic_slow_start() {
436 let mut cc = CongestionController::cubic();
437 assert_eq!(cc.window(), 2.0);
438 cc.on_data();
439 assert_eq!(cc.window(), 3.0);
440 }
441
442 #[test]
443 fn cubic_recovers_after_loss() {
444 let mut cc = CongestionController::cubic();
445 // Grow to a decent window.
446 for _ in 0..50 {
447 cc.on_data();
448 }
449 let w_peak = cc.window();
450
451 // Loss event.
452 cc.on_congestion_mark();
453 let w_after_loss = cc.window();
454 assert!(w_after_loss < w_peak);
455 assert!((w_after_loss - w_peak * CUBIC_BETA).abs() < 1.0);
456
457 // Recovery: CUBIC should eventually return to w_peak.
458 for _ in 0..500 {
459 cc.on_data();
460 }
461 assert!(cc.window() >= w_peak * 0.9);
462 }
463
464 #[test]
465 fn cubic_respects_min_window() {
466 let mut cc = CongestionController::cubic();
467 for _ in 0..20 {
468 cc.on_timeout();
469 }
470 assert!(cc.window() >= DEFAULT_MIN_WINDOW);
471 }
472
473 /// Regression test for the cubic collapse bug observed in the
474 /// `testbed/tests/chunked/matrix.sh` sweep.
475 ///
476 /// A consumer that called
477 /// `CongestionController::cubic().with_window(64).with_ssthresh(64)`
478 /// would see the first `on_data()` call drop the window from 64
479 /// to ~1.4 — because `with_window` didn't update `w_max` (it
480 /// stayed at the default 2.0), and the cubic formula's "recovery
481 /// target" was therefore ~2, which the code assigned to `*window`
482 /// unconditionally. At pipe=64 this turned a 60 ms fetch into a
483 /// 4.5 second fetch because `window.floor()` was 1 for most of
484 /// the subsequent slow-start recovery.
485 ///
486 /// The fix has two parts: `with_window` now updates `w_max` for
487 /// Cubic, and `on_data`'s cubic branch clamps the result to
488 /// `max(cwnd, W_cubic, W_est)` so successful data delivery can
489 /// never shrink the window.
490 #[test]
491 fn cubic_does_not_collapse_at_large_initial_window() {
492 let mut cc = CongestionController::cubic()
493 .with_window(64.0)
494 .with_ssthresh(64.0);
495 assert_eq!(cc.window(), 64.0);
496
497 // First on_data must not shrink.
498 cc.on_data();
499 assert!(
500 cc.window() >= 64.0,
501 "first on_data shrank window to {} (cubic collapse bug)",
502 cc.window()
503 );
504
505 // Many subsequent on_data calls must not shrink either.
506 for i in 0..1000 {
507 cc.on_data();
508 assert!(
509 cc.window() >= 64.0,
510 "on_data iteration {i} shrank window to {}",
511 cc.window()
512 );
513 }
514 // Cubic's model eventually allows growth past w_max; the
515 // window should be at least the initial 64.
516 assert!(cc.window() >= 64.0);
517 }
518
519 /// Same shape of test but at the other end of the matrix's
520 /// parameter space — ensures the fix works at the window size
521 /// that was already passing before.
522 #[test]
523 fn cubic_does_not_collapse_at_initial_window_256() {
524 let mut cc = CongestionController::cubic()
525 .with_window(256.0)
526 .with_ssthresh(256.0);
527 cc.on_data();
528 assert!(
529 cc.window() >= 256.0,
530 "cubic collapsed at init_window=256: now {}",
531 cc.window()
532 );
533 }
534
535 /// `with_window` updates `w_max` for Cubic so the cubic formula's
536 /// post-loss trajectory points at the user-requested window, not
537 /// the default 2.0.
538 #[test]
539 fn cubic_with_window_updates_w_max() {
540 let cc = CongestionController::cubic().with_window(100.0);
541 match cc {
542 CongestionController::Cubic { w_max, window, .. } => {
543 assert_eq!(window, 100.0);
544 assert_eq!(w_max, 100.0);
545 }
546 _ => panic!("expected Cubic"),
547 }
548 }
549
550 /// If the caller sets a *smaller* window than the default w_max,
551 /// `with_window` must not lower w_max — that would erase loss
552 /// history. The `if *w_max < w` guard preserves the existing
553 /// value when the caller's w is smaller.
554 #[test]
555 fn cubic_with_window_never_shrinks_w_max() {
556 // Start with a larger w_max via one loss cycle, then call
557 // with_window with a smaller value. w_max should be unchanged.
558 let cc = CongestionController::cubic()
559 .with_window(500.0) // sets w_max = 500
560 .with_window(10.0); // must not shrink w_max below 500
561 match cc {
562 CongestionController::Cubic { w_max, window, .. } => {
563 assert_eq!(window, 10.0);
564 assert_eq!(
565 w_max, 500.0,
566 "w_max must not shrink when caller passes smaller window"
567 );
568 }
569 _ => panic!("expected Cubic"),
570 }
571 }
572
573 #[test]
574 fn fixed_never_changes() {
575 let mut cc = CongestionController::fixed(64.0);
576 assert_eq!(cc.window(), 64.0);
577 cc.on_data();
578 assert_eq!(cc.window(), 64.0);
579 cc.on_congestion_mark();
580 assert_eq!(cc.window(), 64.0);
581 cc.on_timeout();
582 assert_eq!(cc.window(), 64.0);
583 }
584
585 #[test]
586 fn reset_returns_to_initial() {
587 let mut cc = CongestionController::aimd();
588 for _ in 0..20 {
589 cc.on_data();
590 }
591 assert!(cc.window() > DEFAULT_INITIAL_WINDOW);
592 cc.reset();
593 assert_eq!(cc.window(), DEFAULT_INITIAL_WINDOW);
594 }
595
596 #[test]
597 fn default_is_aimd() {
598 let cc = CongestionController::default();
599 assert!(matches!(cc, CongestionController::Aimd { .. }));
600 }
601}