ndn_discovery/strategy/
reactive.rs

1//! Event-driven (reactive) probe scheduler.
2//!
3//! No periodic timer in the steady state.  A probe is sent only when a
4//! [`TriggerEvent`] is received:
5//!
6//! - Face comes up — bootstrap the neighbor table immediately.
7//! - Forwarding failure — verify the affected face is still alive.
8//! - Neighbor goes stale — re-probe before declaring it absent.
9//!
10//! [`PassiveDetection`](TriggerEvent::PassiveDetection) is ignored; use
11//! [`PassiveScheduler`](super::PassiveScheduler) for that case.
12//!
13//! ## Rate limiting
14//!
15//! To prevent a storm of hellos after a flap, a minimum interval is enforced.
16//! No probe is emitted within [`DiscoveryConfig::hello_interval_base`] of the
17//! previous one.  The implementation collapses multiple queued triggers into
18//! one probe when they arrive within the minimum interval.
19
20use std::time::{Duration, Instant};
21
22use crate::config::DiscoveryConfig;
23use crate::strategy::{NeighborProbeStrategy, ProbeRequest, TriggerEvent};
24
25// ─── ReactiveScheduler ───────────────────────────────────────────────────────
26
27/// Probe scheduler that sends only on topology events, with rate limiting.
28pub struct ReactiveScheduler {
29    /// Minimum gap between consecutive probes.
30    min_interval: Duration,
31    /// When the last probe was sent (`None` = never sent).
32    last_sent: Option<Instant>,
33    /// A probe has been requested but not yet emitted (rate-limited).
34    pending: bool,
35}
36
37impl ReactiveScheduler {
38    /// Build from the relevant fields of a [`DiscoveryConfig`].
39    pub fn from_discovery_config(cfg: &DiscoveryConfig) -> Self {
40        Self {
41            min_interval: cfg.hello_interval_base,
42            last_sent: None,
43            pending: true, // fire on the first tick to bootstrap
44        }
45    }
46}
47
48impl NeighborProbeStrategy for ReactiveScheduler {
49    fn on_tick(&mut self, now: Instant) -> Vec<ProbeRequest> {
50        if !self.pending {
51            return Vec::new();
52        }
53
54        // Enforce minimum interval.
55        if let Some(last) = self.last_sent
56            && now.duration_since(last) < self.min_interval
57        {
58            return Vec::new(); // still rate-limited; keep pending
59        }
60
61        self.pending = false;
62        self.last_sent = Some(now);
63        vec![ProbeRequest::Broadcast]
64    }
65
66    fn on_probe_success(&mut self, _rtt: Duration) {
67        // Nothing to reset; the next probe fires only on a trigger.
68    }
69
70    fn on_probe_timeout(&mut self) {
71        // Timed out — re-trigger to verify.
72        self.pending = true;
73    }
74
75    fn trigger(&mut self, event: TriggerEvent) {
76        match event {
77            TriggerEvent::PassiveDetection => {
78                // Not applicable to this scheduler; ignore.
79            }
80            TriggerEvent::FaceUp
81            | TriggerEvent::ForwardingFailure
82            | TriggerEvent::NeighborStale => {
83                self.pending = true;
84            }
85        }
86    }
87}
88
89// ─── Tests ────────────────────────────────────────────────────────────────────
90
91#[cfg(test)]
92mod tests {
93    use std::time::Duration;
94
95    use super::*;
96    use crate::config::{DiscoveryConfig, DiscoveryProfile};
97
98    fn mobile_sched() -> ReactiveScheduler {
99        ReactiveScheduler::from_discovery_config(&DiscoveryConfig::for_profile(
100            &DiscoveryProfile::Mobile,
101        ))
102    }
103
104    #[test]
105    fn fires_on_first_tick() {
106        let mut s = mobile_sched();
107        let reqs = s.on_tick(Instant::now());
108        assert_eq!(reqs, vec![ProbeRequest::Broadcast]);
109    }
110
111    #[test]
112    fn does_not_fire_without_trigger() {
113        let mut s = mobile_sched();
114        let now = Instant::now();
115        s.on_tick(now); // initial bootstrap probe
116
117        // No trigger — subsequent ticks are silent.
118        let reqs = s.on_tick(now + Duration::from_secs(1));
119        assert!(reqs.is_empty());
120    }
121
122    #[test]
123    fn fires_after_trigger() {
124        let mut s = mobile_sched();
125        let now = Instant::now();
126        s.on_tick(now); // initial
127
128        s.trigger(TriggerEvent::ForwardingFailure);
129        let reqs = s.on_tick(now + Duration::from_secs(1));
130        assert_eq!(reqs, vec![ProbeRequest::Broadcast]);
131    }
132
133    #[test]
134    fn rate_limits_rapid_triggers() {
135        let mut s = mobile_sched();
136        let now = Instant::now();
137        s.on_tick(now); // initial probe; sets last_sent
138
139        // Trigger immediately — still within min_interval.
140        s.trigger(TriggerEvent::NeighborStale);
141        let reqs = s.on_tick(now); // same instant — still rate-limited
142        assert!(reqs.is_empty(), "should be rate-limited");
143
144        // After min_interval has elapsed, the pending probe fires.
145        let later = now + s.min_interval + Duration::from_millis(1);
146        let reqs = s.on_tick(later);
147        assert_eq!(reqs, vec![ProbeRequest::Broadcast]);
148    }
149
150    #[test]
151    fn passive_detection_is_ignored() {
152        let mut s = mobile_sched();
153        let now = Instant::now();
154        s.on_tick(now); // initial
155
156        s.trigger(TriggerEvent::PassiveDetection);
157        let reqs = s.on_tick(now + Duration::from_secs(10));
158        assert!(reqs.is_empty());
159    }
160}