ndn_discovery/strategy/
swim.rs

1//! SWIM-style fixed-interval probe scheduler.
2//!
3//! Unlike [`BackoffScheduler`], `SwimScheduler` does NOT apply exponential
4//! back-off — it broadcasts at a constant period T, matching the SWIM paper's
5//! requirement that each node probes exactly one random peer per protocol
6//! period.
7//!
8//! **Indirect probing** (K-fanout via `/ndn/local/nd/probe/via/`) is handled
9//! by the protocol layer, not here.  This scheduler only decides *when* to
10//! send the next broadcast hello.
11
12use std::time::{Duration, Instant};
13
14use crate::config::DiscoveryConfig;
15use crate::strategy::{NeighborProbeStrategy, ProbeRequest, TriggerEvent};
16
17// ── SwimScheduler ─────────────────────────────────────────────────────────────
18
19/// Fixed-interval SWIM probe scheduler.
20///
21/// Broadcasts a hello at every `interval`.  Unlike [`BackoffScheduler`] it
22/// never increases its interval after a successful exchange.  Topology changes
23/// (`trigger()`) cause an immediate probe on the next tick, after which the
24/// regular period resumes.
25///
26/// [`BackoffScheduler`]: super::BackoffScheduler
27pub struct SwimScheduler {
28    interval: Duration,
29    next_probe_at: Instant,
30    pending_immediate: bool,
31}
32
33impl SwimScheduler {
34    /// Create a scheduler firing every `interval`.
35    ///
36    /// Sends an immediate probe on the first tick to bootstrap the neighbor
37    /// table.
38    pub fn new(interval: Duration) -> Self {
39        let now = Instant::now();
40        Self {
41            interval,
42            next_probe_at: now + interval,
43            pending_immediate: true,
44        }
45    }
46
47    /// Build from a [`DiscoveryConfig`], using `hello_interval_base` as the
48    /// fixed SWIM protocol period T.
49    pub fn from_discovery_config(cfg: &DiscoveryConfig) -> Self {
50        Self::new(cfg.hello_interval_base)
51    }
52}
53
54impl NeighborProbeStrategy for SwimScheduler {
55    fn on_tick(&mut self, now: Instant) -> Vec<ProbeRequest> {
56        if self.pending_immediate || now >= self.next_probe_at {
57            self.pending_immediate = false;
58            self.next_probe_at = now + self.interval;
59            vec![ProbeRequest::Broadcast]
60        } else {
61            vec![]
62        }
63    }
64
65    fn on_probe_success(&mut self, _rtt: Duration) {
66        // SWIM uses a fixed interval; a successful exchange does not alter the
67        // schedule.
68    }
69
70    fn on_probe_timeout(&mut self) {
71        // Indirect probing on failure is handled by the protocol layer.
72        // The scheduler keeps its fixed rate regardless of probe outcomes.
73    }
74
75    fn trigger(&mut self, event: TriggerEvent) {
76        match event {
77            TriggerEvent::FaceUp
78            | TriggerEvent::ForwardingFailure
79            | TriggerEvent::NeighborStale => {
80                self.pending_immediate = true;
81            }
82            TriggerEvent::PassiveDetection => {
83                // Passive MAC overhearing is not a SWIM topology event; ignore.
84            }
85        }
86    }
87}
88
89// ── Tests ──────────────────────────────────────────────────────────────────────
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    #[test]
96    fn fires_immediately_on_first_tick() {
97        let mut s = SwimScheduler::new(Duration::from_secs(5));
98        let now = Instant::now();
99        let probes = s.on_tick(now);
100        assert_eq!(probes, vec![ProbeRequest::Broadcast]);
101    }
102
103    #[test]
104    fn no_second_fire_before_interval() {
105        let mut s = SwimScheduler::new(Duration::from_secs(5));
106        let now = Instant::now();
107        s.on_tick(now);
108        let probes = s.on_tick(now + Duration::from_millis(100));
109        assert!(probes.is_empty());
110    }
111
112    #[test]
113    fn fires_after_interval() {
114        let mut s = SwimScheduler::new(Duration::from_secs(5));
115        let now = Instant::now();
116        s.on_tick(now); // consume initial
117        let probes = s.on_tick(now + Duration::from_secs(6));
118        assert_eq!(probes, vec![ProbeRequest::Broadcast]);
119    }
120
121    #[test]
122    fn timeout_does_not_change_interval() {
123        let interval = Duration::from_secs(5);
124        let mut s = SwimScheduler::new(interval);
125        let now = Instant::now();
126        s.on_tick(now); // consume initial
127        s.on_probe_timeout();
128        // Should NOT fire early
129        assert!(s.on_tick(now + Duration::from_millis(100)).is_empty());
130        // Should fire at the regular interval
131        assert_eq!(
132            s.on_tick(now + interval + Duration::from_millis(100)),
133            vec![ProbeRequest::Broadcast]
134        );
135    }
136
137    #[test]
138    fn trigger_schedules_immediate_probe() {
139        let mut s = SwimScheduler::new(Duration::from_secs(60));
140        let now = Instant::now();
141        s.on_tick(now); // consume initial
142        s.trigger(TriggerEvent::NeighborStale);
143        let probes = s.on_tick(now + Duration::from_millis(10));
144        assert_eq!(probes, vec![ProbeRequest::Broadcast]);
145    }
146
147    #[test]
148    fn passive_detection_ignored() {
149        let mut s = SwimScheduler::new(Duration::from_secs(60));
150        let now = Instant::now();
151        s.on_tick(now); // consume initial
152        s.trigger(TriggerEvent::PassiveDetection);
153        // PassiveDetection must NOT trigger an immediate probe
154        assert!(s.on_tick(now + Duration::from_millis(1)).is_empty());
155    }
156}