ndn_transport/
congestion.rs

1//! Consumer-side congestion control for NDN.
2//!
3//! Provides window-based algorithms that react to Data arrivals, congestion
4//! marks (NDNLPv2 CongestionMark), and timeouts.  Consumers use these to
5//! regulate how many Interests are in flight.
6//!
7//! # Design
8//!
9//! `CongestionController` is an enum (not a trait) — avoids dynamic dispatch
10//! and matches the `RtoStrategy`/`ReliabilityConfig` pattern used elsewhere.
11//! All state is internal; callers only see `window()` and the event methods.
12//!
13//! # Example
14//!
15//! ```rust
16//! use ndn_transport::CongestionController;
17//!
18//! let mut cc = CongestionController::default(); // AIMD
19//! assert_eq!(cc.window(), 2.0);
20//!
21//! // Data arrived successfully — grow window.
22//! cc.on_data();
23//! assert!(cc.window() > 2.0);
24//!
25//! // Congestion mark received — cut window.
26//! cc.on_congestion_mark();
27//! assert!(cc.window() < 3.0);
28//! ```
29
30/// Consumer-side congestion control algorithm.
31///
32/// Each variant carries its own internal state.  The caller drives events
33/// (`on_data`, `on_congestion_mark`, `on_timeout`) and reads the current
34/// window via `window()`.
35///
36/// # Variants
37///
38/// | Algorithm | Best for | Behaviour |
39/// |-----------|----------|-----------|
40/// | `Aimd`    | General-purpose, matches NFD consumers | Linear increase, multiplicative decrease |
41/// | `Cubic`   | High-bandwidth, long-RTT links | Cubic function ramp-up after loss |
42/// | `Fixed`   | Benchmarks, known-capacity links | Constant window, no adaptation |
43#[derive(Debug, Clone)]
44pub enum CongestionController {
45    /// Additive-Increase Multiplicative-Decrease.
46    ///
47    /// Standard algorithm used by `ndncatchunks`.  Window grows by
48    /// `additive_increase / window` per ack (≈ +1 per RTT) and is
49    /// multiplied by `multiplicative_decrease` on congestion/timeout.
50    Aimd {
51        window: f64,
52        min_window: f64,
53        max_window: f64,
54        additive_increase: f64,
55        multiplicative_decrease: f64,
56        /// Slow-start threshold. While `window < ssthresh`, window grows
57        /// by 1.0 per ack (exponential); above it, grows additively.
58        ssthresh: f64,
59    },
60    /// CUBIC (RFC 8312).
61    ///
62    /// Concave/convex window growth based on time since last loss event.
63    /// More aggressive ramp-up than AIMD on high-bandwidth links.
64    Cubic {
65        window: f64,
66        min_window: f64,
67        max_window: f64,
68        /// Window size at last loss event.
69        w_max: f64,
70        /// Ack count since last loss event (proxy for time).
71        acks_since_loss: u64,
72        /// CUBIC scaling constant (default: 0.4).
73        c: f64,
74        /// Multiplicative decrease factor (default: 0.7).
75        beta: f64,
76        ssthresh: f64,
77    },
78    /// Fixed window — no adaptation.
79    Fixed { window: f64 },
80}
81
82// ─── Defaults ────────────────────────────────────────────────────────────────
83
84const DEFAULT_INITIAL_WINDOW: f64 = 2.0;
85const DEFAULT_MIN_WINDOW: f64 = 2.0;
86const DEFAULT_MAX_WINDOW: f64 = 65536.0;
87const DEFAULT_SSTHRESH: f64 = f64::MAX;
88
89// AIMD defaults (matches ndncatchunks).
90const AIMD_ADDITIVE_INCREASE: f64 = 1.0;
91const AIMD_MULTIPLICATIVE_DECREASE: f64 = 0.5;
92
93// CUBIC defaults (RFC 8312).
94const CUBIC_C: f64 = 0.4;
95const CUBIC_BETA: f64 = 0.7;
96
97impl Default for CongestionController {
98    /// Default: AIMD with standard parameters.
99    fn default() -> Self {
100        Self::aimd()
101    }
102}
103
104impl CongestionController {
105    /// AIMD with standard parameters (matches `ndncatchunks`).
106    pub fn aimd() -> Self {
107        Self::Aimd {
108            window: DEFAULT_INITIAL_WINDOW,
109            min_window: DEFAULT_MIN_WINDOW,
110            max_window: DEFAULT_MAX_WINDOW,
111            additive_increase: AIMD_ADDITIVE_INCREASE,
112            multiplicative_decrease: AIMD_MULTIPLICATIVE_DECREASE,
113            ssthresh: DEFAULT_SSTHRESH,
114        }
115    }
116
117    /// CUBIC with standard parameters (RFC 8312).
118    pub fn cubic() -> Self {
119        Self::Cubic {
120            window: DEFAULT_INITIAL_WINDOW,
121            min_window: DEFAULT_MIN_WINDOW,
122            max_window: DEFAULT_MAX_WINDOW,
123            w_max: DEFAULT_INITIAL_WINDOW,
124            acks_since_loss: 0,
125            c: CUBIC_C,
126            beta: CUBIC_BETA,
127            ssthresh: DEFAULT_SSTHRESH,
128        }
129    }
130
131    /// Fixed window (no adaptation).
132    pub fn fixed(window: f64) -> Self {
133        Self::Fixed { window }
134    }
135
136    // ─── Builder-style parameter setters ────────────────────────────────
137
138    /// Set the initial and current window size.
139    ///
140    /// For Cubic, this also updates `w_max` to match so the cubic
141    /// formula's "recovery target" reflects the user's intent. Without
142    /// this, a caller that does `cubic().with_window(N).with_ssthresh(N)`
143    /// for large N would leave `w_max` at its tiny default value (2.0),
144    /// and the first `on_data()` call would take the cubic branch
145    /// (since `window >= ssthresh`) and collapse the window back toward
146    /// `w_max = 2`. See the `cubic_does_not_collapse_at_large_initial_window`
147    /// regression test.
148    pub fn with_window(mut self, w: f64) -> Self {
149        match &mut self {
150            Self::Aimd { window, .. } | Self::Fixed { window } => *window = w,
151            Self::Cubic { window, w_max, .. } => {
152                *window = w;
153                // Treat the new initial window as the current "best"
154                // so the cubic formula's post-loss trajectory points
155                // at this window, not the default 2.0.
156                if *w_max < w {
157                    *w_max = w;
158                }
159            }
160        }
161        self
162    }
163
164    /// Set the minimum window (floor after decrease). Ignored by Fixed.
165    pub fn with_min_window(mut self, w: f64) -> Self {
166        match &mut self {
167            Self::Aimd { min_window, .. } | Self::Cubic { min_window, .. } => *min_window = w,
168            Self::Fixed { .. } => {}
169        }
170        self
171    }
172
173    /// Set the maximum window (ceiling). Ignored by Fixed.
174    pub fn with_max_window(mut self, w: f64) -> Self {
175        match &mut self {
176            Self::Aimd { max_window, .. } | Self::Cubic { max_window, .. } => *max_window = w,
177            Self::Fixed { .. } => {}
178        }
179        self
180    }
181
182    /// Set AIMD additive increase per RTT (default: 1.0). Only affects AIMD.
183    pub fn with_additive_increase(mut self, ai: f64) -> Self {
184        if let Self::Aimd {
185            additive_increase, ..
186        } = &mut self
187        {
188            *additive_increase = ai;
189        }
190        self
191    }
192
193    /// Set AIMD/CUBIC multiplicative decrease factor (default: 0.5 for AIMD, 0.7 for CUBIC).
194    pub fn with_decrease_factor(mut self, md: f64) -> Self {
195        match &mut self {
196            Self::Aimd {
197                multiplicative_decrease,
198                ..
199            } => *multiplicative_decrease = md,
200            Self::Cubic { beta, .. } => *beta = md,
201            Self::Fixed { .. } => {}
202        }
203        self
204    }
205
206    /// Set CUBIC scaling constant C (default: 0.4). Only affects CUBIC.
207    pub fn with_cubic_c(mut self, c_val: f64) -> Self {
208        if let Self::Cubic { c, .. } = &mut self {
209            *c = c_val;
210        }
211        self
212    }
213
214    /// Set the slow-start threshold.
215    ///
216    /// By default ssthresh is `f64::MAX` (unbounded slow start).  Setting
217    /// this to the initial window size prevents the exponential ramp from
218    /// overshooting the link capacity on the first flow.
219    pub fn with_ssthresh(mut self, ss: f64) -> Self {
220        match &mut self {
221            Self::Aimd { ssthresh, .. } | Self::Cubic { ssthresh, .. } => *ssthresh = ss,
222            Self::Fixed { .. } => {}
223        }
224        self
225    }
226
227    /// Current window size (number of Interests allowed in flight).
228    ///
229    /// Callers should use `window().floor() as usize` for the actual limit.
230    pub fn window(&self) -> f64 {
231        match self {
232            Self::Aimd { window, .. } | Self::Cubic { window, .. } | Self::Fixed { window } => {
233                *window
234            }
235        }
236    }
237
238    /// A Data packet was received successfully (no congestion mark).
239    pub fn on_data(&mut self) {
240        match self {
241            Self::Aimd {
242                window,
243                additive_increase,
244                ssthresh,
245                max_window,
246                ..
247            } => {
248                if *window < *ssthresh {
249                    // Slow start: exponential growth.
250                    *window += 1.0;
251                } else {
252                    // Congestion avoidance: additive increase.
253                    *window += *additive_increase / *window;
254                }
255                *window = window.min(*max_window);
256            }
257            Self::Cubic {
258                window,
259                w_max,
260                acks_since_loss,
261                c,
262                beta,
263                ssthresh,
264                max_window,
265                ..
266            } => {
267                *acks_since_loss += 1;
268                if *window < *ssthresh {
269                    *window += 1.0;
270                } else {
271                    // CUBIC function: W(t) = C*(t - K)^3 + W_max
272                    // where K = (W_max * (1-beta) / C)^(1/3)
273                    // We approximate t by acks_since_loss / window (RTTs elapsed).
274                    let t = *acks_since_loss as f64 / *window;
275                    let k = ((*w_max * (1.0 - *beta)) / *c).cbrt();
276                    let w_cubic = *c * (t - k).powi(3) + *w_max;
277                    // TCP-friendly region: at least as aggressive as AIMD.
278                    let w_tcp = *w_max * *beta
279                        + (3.0 * (1.0 - *beta) / (1.0 + *beta))
280                            * (*acks_since_loss as f64 / *window);
281                    // Congestion avoidance must be *monotonic* — a
282                    // successful data delivery can never justify
283                    // shrinking the window. Without the `.max(*window)`
284                    // clause, the cubic formula can return a value
285                    // below the current window when `t < K` (i.e. we
286                    // haven't "recovered" to `w_max` yet by the model's
287                    // clock). That happens naturally right after a
288                    // loss event but the model can also produce it at
289                    // initialisation if `w_max` hasn't caught up to
290                    // the user's initial window yet. The RFC 8312
291                    // phrasing is "cwnd SHOULD be set to max(cwnd,
292                    // W_cubic, W_est)"; we match that.
293                    *window = w_cubic.max(w_tcp).max(*window);
294                }
295                *window = window.min(*max_window);
296            }
297            Self::Fixed { .. } => {}
298        }
299    }
300
301    /// A CongestionMark was received on a Data packet.
302    ///
303    /// Reduces the window but does NOT trigger retransmission — the Data
304    /// was delivered successfully, only the sending rate should decrease.
305    pub fn on_congestion_mark(&mut self) {
306        self.decrease("mark");
307    }
308
309    /// An Interest timed out (no Data received within lifetime).
310    ///
311    /// More aggressive reduction than congestion mark since timeout
312    /// indicates actual packet loss, not just queue buildup.
313    pub fn on_timeout(&mut self) {
314        self.decrease("timeout");
315    }
316
317    fn decrease(&mut self, _reason: &str) {
318        match self {
319            Self::Aimd {
320                window,
321                multiplicative_decrease,
322                min_window,
323                ssthresh,
324                ..
325            } => {
326                *ssthresh = (*window * *multiplicative_decrease).max(*min_window);
327                *window = *ssthresh;
328            }
329            Self::Cubic {
330                window,
331                w_max,
332                acks_since_loss,
333                beta,
334                min_window,
335                ssthresh,
336                ..
337            } => {
338                *w_max = *window;
339                *ssthresh = (*window * *beta).max(*min_window);
340                *window = *ssthresh;
341                *acks_since_loss = 0;
342            }
343            Self::Fixed { .. } => {}
344        }
345    }
346
347    /// Reset to initial state (e.g. on route change or new flow).
348    pub fn reset(&mut self) {
349        match self {
350            Self::Aimd {
351                window, ssthresh, ..
352            } => {
353                *window = DEFAULT_INITIAL_WINDOW;
354                *ssthresh = DEFAULT_SSTHRESH;
355            }
356            Self::Cubic {
357                window,
358                w_max,
359                acks_since_loss,
360                ssthresh,
361                ..
362            } => {
363                *window = DEFAULT_INITIAL_WINDOW;
364                *w_max = DEFAULT_INITIAL_WINDOW;
365                *acks_since_loss = 0;
366                *ssthresh = DEFAULT_SSTHRESH;
367            }
368            Self::Fixed { .. } => {}
369        }
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn aimd_slow_start() {
379        let mut cc = CongestionController::aimd();
380        assert_eq!(cc.window(), 2.0);
381        cc.on_data();
382        assert_eq!(cc.window(), 3.0); // +1 in slow start
383        cc.on_data();
384        assert_eq!(cc.window(), 4.0);
385    }
386
387    #[test]
388    fn aimd_congestion_avoidance() {
389        let mut cc = CongestionController::aimd();
390        // Force out of slow start.
391        cc.on_congestion_mark(); // ssthresh = 2*0.5 = 2, window = 2
392        assert_eq!(cc.window(), DEFAULT_MIN_WINDOW);
393
394        // Now in congestion avoidance: window grows by 1/window per ack.
395        let w_before = cc.window();
396        cc.on_data();
397        let expected = w_before + 1.0 / w_before;
398        assert!((cc.window() - expected).abs() < 1e-9);
399    }
400
401    #[test]
402    fn aimd_multiplicative_decrease() {
403        let mut cc = CongestionController::aimd();
404        // Grow window in slow start.
405        for _ in 0..10 {
406            cc.on_data();
407        }
408        let w_before = cc.window();
409        cc.on_congestion_mark();
410        assert!((cc.window() - w_before * 0.5).abs() < 1e-9);
411    }
412
413    #[test]
414    fn aimd_timeout_reduces_window() {
415        let mut cc = CongestionController::aimd();
416        for _ in 0..10 {
417            cc.on_data();
418        }
419        let w_before = cc.window();
420        cc.on_timeout();
421        assert!(cc.window() < w_before);
422    }
423
424    #[test]
425    fn aimd_respects_min_window() {
426        let mut cc = CongestionController::aimd();
427        // Repeated losses should not go below min_window.
428        for _ in 0..20 {
429            cc.on_timeout();
430        }
431        assert!(cc.window() >= DEFAULT_MIN_WINDOW);
432    }
433
434    #[test]
435    fn cubic_slow_start() {
436        let mut cc = CongestionController::cubic();
437        assert_eq!(cc.window(), 2.0);
438        cc.on_data();
439        assert_eq!(cc.window(), 3.0);
440    }
441
442    #[test]
443    fn cubic_recovers_after_loss() {
444        let mut cc = CongestionController::cubic();
445        // Grow to a decent window.
446        for _ in 0..50 {
447            cc.on_data();
448        }
449        let w_peak = cc.window();
450
451        // Loss event.
452        cc.on_congestion_mark();
453        let w_after_loss = cc.window();
454        assert!(w_after_loss < w_peak);
455        assert!((w_after_loss - w_peak * CUBIC_BETA).abs() < 1.0);
456
457        // Recovery: CUBIC should eventually return to w_peak.
458        for _ in 0..500 {
459            cc.on_data();
460        }
461        assert!(cc.window() >= w_peak * 0.9);
462    }
463
464    #[test]
465    fn cubic_respects_min_window() {
466        let mut cc = CongestionController::cubic();
467        for _ in 0..20 {
468            cc.on_timeout();
469        }
470        assert!(cc.window() >= DEFAULT_MIN_WINDOW);
471    }
472
473    /// Regression test for the cubic collapse bug observed in the
474    /// `testbed/tests/chunked/matrix.sh` sweep.
475    ///
476    /// A consumer that called
477    /// `CongestionController::cubic().with_window(64).with_ssthresh(64)`
478    /// would see the first `on_data()` call drop the window from 64
479    /// to ~1.4 — because `with_window` didn't update `w_max` (it
480    /// stayed at the default 2.0), and the cubic formula's "recovery
481    /// target" was therefore ~2, which the code assigned to `*window`
482    /// unconditionally. At pipe=64 this turned a 60 ms fetch into a
483    /// 4.5 second fetch because `window.floor()` was 1 for most of
484    /// the subsequent slow-start recovery.
485    ///
486    /// The fix has two parts: `with_window` now updates `w_max` for
487    /// Cubic, and `on_data`'s cubic branch clamps the result to
488    /// `max(cwnd, W_cubic, W_est)` so successful data delivery can
489    /// never shrink the window.
490    #[test]
491    fn cubic_does_not_collapse_at_large_initial_window() {
492        let mut cc = CongestionController::cubic()
493            .with_window(64.0)
494            .with_ssthresh(64.0);
495        assert_eq!(cc.window(), 64.0);
496
497        // First on_data must not shrink.
498        cc.on_data();
499        assert!(
500            cc.window() >= 64.0,
501            "first on_data shrank window to {} (cubic collapse bug)",
502            cc.window()
503        );
504
505        // Many subsequent on_data calls must not shrink either.
506        for i in 0..1000 {
507            cc.on_data();
508            assert!(
509                cc.window() >= 64.0,
510                "on_data iteration {i} shrank window to {}",
511                cc.window()
512            );
513        }
514        // Cubic's model eventually allows growth past w_max; the
515        // window should be at least the initial 64.
516        assert!(cc.window() >= 64.0);
517    }
518
519    /// Same shape of test but at the other end of the matrix's
520    /// parameter space — ensures the fix works at the window size
521    /// that was already passing before.
522    #[test]
523    fn cubic_does_not_collapse_at_initial_window_256() {
524        let mut cc = CongestionController::cubic()
525            .with_window(256.0)
526            .with_ssthresh(256.0);
527        cc.on_data();
528        assert!(
529            cc.window() >= 256.0,
530            "cubic collapsed at init_window=256: now {}",
531            cc.window()
532        );
533    }
534
535    /// `with_window` updates `w_max` for Cubic so the cubic formula's
536    /// post-loss trajectory points at the user-requested window, not
537    /// the default 2.0.
538    #[test]
539    fn cubic_with_window_updates_w_max() {
540        let cc = CongestionController::cubic().with_window(100.0);
541        match cc {
542            CongestionController::Cubic { w_max, window, .. } => {
543                assert_eq!(window, 100.0);
544                assert_eq!(w_max, 100.0);
545            }
546            _ => panic!("expected Cubic"),
547        }
548    }
549
550    /// If the caller sets a *smaller* window than the default w_max,
551    /// `with_window` must not lower w_max — that would erase loss
552    /// history. The `if *w_max < w` guard preserves the existing
553    /// value when the caller's w is smaller.
554    #[test]
555    fn cubic_with_window_never_shrinks_w_max() {
556        // Start with a larger w_max via one loss cycle, then call
557        // with_window with a smaller value. w_max should be unchanged.
558        let cc = CongestionController::cubic()
559            .with_window(500.0) // sets w_max = 500
560            .with_window(10.0); // must not shrink w_max below 500
561        match cc {
562            CongestionController::Cubic { w_max, window, .. } => {
563                assert_eq!(window, 10.0);
564                assert_eq!(
565                    w_max, 500.0,
566                    "w_max must not shrink when caller passes smaller window"
567                );
568            }
569            _ => panic!("expected Cubic"),
570        }
571    }
572
573    #[test]
574    fn fixed_never_changes() {
575        let mut cc = CongestionController::fixed(64.0);
576        assert_eq!(cc.window(), 64.0);
577        cc.on_data();
578        assert_eq!(cc.window(), 64.0);
579        cc.on_congestion_mark();
580        assert_eq!(cc.window(), 64.0);
581        cc.on_timeout();
582        assert_eq!(cc.window(), 64.0);
583    }
584
585    #[test]
586    fn reset_returns_to_initial() {
587        let mut cc = CongestionController::aimd();
588        for _ in 0..20 {
589            cc.on_data();
590        }
591        assert!(cc.window() > DEFAULT_INITIAL_WINDOW);
592        cc.reset();
593        assert_eq!(cc.window(), DEFAULT_INITIAL_WINDOW);
594    }
595
596    #[test]
597    fn default_is_aimd() {
598        let cc = CongestionController::default();
599        assert!(matches!(cc, CongestionController::Aimd { .. }));
600    }
601}