ndn_strategy/
measurements.rs

1#[cfg(not(target_arch = "wasm32"))]
2use dashmap::DashMap;
3use ndn_packet::Name;
4use ndn_transport::FaceId;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8/// EWMA RTT measurement for a (prefix, face) pair.
9#[derive(Clone, Debug)]
10pub struct EwmaRtt {
11    /// Smoothed RTT in nanoseconds.
12    pub srtt_ns: f64,
13    /// RTT variance.
14    pub rttvar_ns: f64,
15    /// Number of samples.
16    pub samples: u32,
17}
18
19impl EwmaRtt {
20    /// Incorporate an RTT sample (nanoseconds) using EWMA smoothing.
21    pub fn update(&mut self, sample_ns: f64) {
22        const ALPHA: f64 = 0.125;
23        const BETA: f64 = 0.25;
24        if self.samples == 0 {
25            self.srtt_ns = sample_ns;
26            self.rttvar_ns = sample_ns / 2.0;
27        } else {
28            let diff = (sample_ns - self.srtt_ns).abs();
29            self.rttvar_ns = (1.0 - BETA) * self.rttvar_ns + BETA * diff;
30            self.srtt_ns = (1.0 - ALPHA) * self.srtt_ns + ALPHA * sample_ns;
31        }
32        self.samples += 1;
33    }
34
35    /// RTO estimate: srtt + 4 * rttvar.
36    pub fn rto_ns(&self) -> f64 {
37        self.srtt_ns + 4.0 * self.rttvar_ns
38    }
39}
40
41impl Default for EwmaRtt {
42    fn default() -> Self {
43        Self {
44            srtt_ns: 0.0,
45            rttvar_ns: 0.0,
46            samples: 0,
47        }
48    }
49}
50
51/// Per-prefix measurements entry.
52#[derive(Clone, Debug, Default)]
53pub struct MeasurementsEntry {
54    /// Per-face RTT measurements.
55    pub rtt_per_face: HashMap<FaceId, EwmaRtt>,
56    /// EWMA satisfaction rate over the last N Interests (0.0–1.0).
57    pub satisfaction_rate: f32,
58    /// Timestamp of last update (ns since Unix epoch).
59    pub last_updated: u64,
60}
61
62/// Concurrent measurements table — one entry per name prefix, keyed by the
63/// longest-matching prefix used during the forwarding decision.
64///
65/// Updated on every Data arrival by the `MeasurementsUpdateStage`.
66/// Read by strategies via `StrategyContext`.
67///
68/// On native targets uses `DashMap` for sharded concurrent access.
69/// On `wasm32` uses a `Mutex<HashMap>` (single-threaded WASM has no contention).
70pub struct MeasurementsTable {
71    #[cfg(not(target_arch = "wasm32"))]
72    entries: DashMap<Arc<Name>, MeasurementsEntry>,
73    #[cfg(target_arch = "wasm32")]
74    entries: std::sync::Mutex<HashMap<Arc<Name>, MeasurementsEntry>>,
75}
76
77impl MeasurementsTable {
78    /// Create an empty measurements table.
79    pub fn new() -> Self {
80        Self {
81            #[cfg(not(target_arch = "wasm32"))]
82            entries: DashMap::new(),
83            #[cfg(target_arch = "wasm32")]
84            entries: std::sync::Mutex::new(HashMap::new()),
85        }
86    }
87
88    /// Look up the measurements entry for a name prefix, returning a clone.
89    pub fn get(&self, name: &Arc<Name>) -> Option<MeasurementsEntry> {
90        #[cfg(not(target_arch = "wasm32"))]
91        return self.entries.get(name).map(|r| r.clone());
92        #[cfg(target_arch = "wasm32")]
93        return self.entries.lock().unwrap().get(name).cloned();
94    }
95
96    /// Record an RTT sample for a (prefix, face) pair, creating the entry if needed.
97    pub fn update_rtt(&self, name: Arc<Name>, face: FaceId, rtt_ns: f64) {
98        #[cfg(not(target_arch = "wasm32"))]
99        {
100            let mut entry = self.entries.entry(name).or_default();
101            entry.rtt_per_face.entry(face).or_default().update(rtt_ns);
102            entry.last_updated = now_ns();
103        }
104        #[cfg(target_arch = "wasm32")]
105        {
106            let mut entries = self.entries.lock().unwrap();
107            let entry = entries.entry(name).or_default();
108            entry.rtt_per_face.entry(face).or_default().update(rtt_ns);
109            entry.last_updated = now_ns();
110        }
111    }
112
113    /// Snapshot all entries, returning a `Vec` of `(prefix, entry)` pairs.
114    ///
115    /// Intended for management dataset queries (`measurements/list`).
116    pub fn dump(&self) -> Vec<(Arc<Name>, MeasurementsEntry)> {
117        #[cfg(not(target_arch = "wasm32"))]
118        return self
119            .entries
120            .iter()
121            .map(|r| (Arc::clone(r.key()), r.value().clone()))
122            .collect();
123        #[cfg(target_arch = "wasm32")]
124        return self
125            .entries
126            .lock()
127            .unwrap()
128            .iter()
129            .map(|(k, v)| (Arc::clone(k), v.clone()))
130            .collect();
131    }
132
133    /// Record an Interest satisfaction outcome, updating the EWMA satisfaction rate.
134    pub fn update_satisfaction(&self, name: Arc<Name>, satisfied: bool) {
135        const ALPHA: f32 = 0.1;
136        #[cfg(not(target_arch = "wasm32"))]
137        {
138            let mut entry = self.entries.entry(name).or_default();
139            let sample = if satisfied { 1.0f32 } else { 0.0 };
140            entry.satisfaction_rate = (1.0 - ALPHA) * entry.satisfaction_rate + ALPHA * sample;
141            entry.last_updated = now_ns();
142        }
143        #[cfg(target_arch = "wasm32")]
144        {
145            let mut entries = self.entries.lock().unwrap();
146            let entry = entries.entry(name).or_default();
147            let sample = if satisfied { 1.0f32 } else { 0.0 };
148            entry.satisfaction_rate = (1.0 - ALPHA) * entry.satisfaction_rate + ALPHA * sample;
149            entry.last_updated = now_ns();
150        }
151    }
152}
153
154impl Default for MeasurementsTable {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160fn now_ns() -> u64 {
161    use std::time::{SystemTime, UNIX_EPOCH};
162    SystemTime::now()
163        .duration_since(UNIX_EPOCH)
164        .map(|d| d.as_nanos() as u64)
165        .unwrap_or(0)
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use ndn_packet::Name;
172    use ndn_transport::FaceId;
173    use std::sync::Arc;
174
175    #[test]
176    fn ewma_first_sample_initialises_srtt() {
177        let mut rtt = EwmaRtt::default();
178        rtt.update(1_000_000.0); // 1 ms
179        assert_eq!(rtt.srtt_ns, 1_000_000.0);
180        assert_eq!(rtt.rttvar_ns, 500_000.0); // sample / 2
181        assert_eq!(rtt.samples, 1);
182    }
183
184    #[test]
185    fn ewma_second_sample_converges() {
186        let mut rtt = EwmaRtt::default();
187        rtt.update(1_000_000.0);
188        rtt.update(1_000_000.0); // same RTT → SRTT unchanged
189        assert_eq!(rtt.samples, 2);
190        assert!((rtt.srtt_ns - 1_000_000.0).abs() < 1.0);
191    }
192
193    #[test]
194    fn ewma_rto_is_srtt_plus_four_rttvar() {
195        let mut rtt = EwmaRtt::default();
196        rtt.update(1_000.0);
197        let expected = rtt.srtt_ns + 4.0 * rtt.rttvar_ns;
198        assert!((rtt.rto_ns() - expected).abs() < 1e-6);
199    }
200
201    #[test]
202    fn measurements_table_update_rtt_creates_entry() {
203        let table = MeasurementsTable::new();
204        let name = Arc::new(Name::root());
205        table.update_rtt(Arc::clone(&name), FaceId(1), 500_000.0);
206        let entry = table.get(&name).expect("entry created");
207        assert!(entry.rtt_per_face.contains_key(&FaceId(1)));
208        assert!(entry.last_updated > 0);
209    }
210
211    #[test]
212    fn measurements_table_update_satisfaction_converges() {
213        let table = MeasurementsTable::new();
214        let name = Arc::new(Name::root());
215        // Repeated satisfied updates should drive rate toward 1.0
216        for _ in 0..100 {
217            table.update_satisfaction(Arc::clone(&name), true);
218        }
219        let entry = table.get(&name).unwrap();
220        assert!(entry.satisfaction_rate > 0.9);
221    }
222
223    #[test]
224    fn measurements_table_unsatisfied_drives_rate_to_zero() {
225        let table = MeasurementsTable::new();
226        let name = Arc::new(Name::root());
227        // First push rate up...
228        for _ in 0..50 {
229            table.update_satisfaction(Arc::clone(&name), true);
230        }
231        // ...then push rate down
232        for _ in 0..100 {
233            table.update_satisfaction(Arc::clone(&name), false);
234        }
235        let entry = table.get(&name).unwrap();
236        assert!(entry.satisfaction_rate < 0.1);
237    }
238
239    #[test]
240    fn measurements_table_default_is_empty() {
241        let table = MeasurementsTable::default();
242        let name = Arc::new(Name::root());
243        assert!(table.get(&name).is_none());
244    }
245}