ndn_strategy/
measurements.rs1#[cfg(not(target_arch = "wasm32"))]
2use dashmap::DashMap;
3use ndn_packet::Name;
4use ndn_transport::FaceId;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8#[derive(Clone, Debug)]
10pub struct EwmaRtt {
11 pub srtt_ns: f64,
13 pub rttvar_ns: f64,
15 pub samples: u32,
17}
18
19impl EwmaRtt {
20 pub fn update(&mut self, sample_ns: f64) {
22 const ALPHA: f64 = 0.125;
23 const BETA: f64 = 0.25;
24 if self.samples == 0 {
25 self.srtt_ns = sample_ns;
26 self.rttvar_ns = sample_ns / 2.0;
27 } else {
28 let diff = (sample_ns - self.srtt_ns).abs();
29 self.rttvar_ns = (1.0 - BETA) * self.rttvar_ns + BETA * diff;
30 self.srtt_ns = (1.0 - ALPHA) * self.srtt_ns + ALPHA * sample_ns;
31 }
32 self.samples += 1;
33 }
34
35 pub fn rto_ns(&self) -> f64 {
37 self.srtt_ns + 4.0 * self.rttvar_ns
38 }
39}
40
41impl Default for EwmaRtt {
42 fn default() -> Self {
43 Self {
44 srtt_ns: 0.0,
45 rttvar_ns: 0.0,
46 samples: 0,
47 }
48 }
49}
50
51#[derive(Clone, Debug, Default)]
53pub struct MeasurementsEntry {
54 pub rtt_per_face: HashMap<FaceId, EwmaRtt>,
56 pub satisfaction_rate: f32,
58 pub last_updated: u64,
60}
61
62pub struct MeasurementsTable {
71 #[cfg(not(target_arch = "wasm32"))]
72 entries: DashMap<Arc<Name>, MeasurementsEntry>,
73 #[cfg(target_arch = "wasm32")]
74 entries: std::sync::Mutex<HashMap<Arc<Name>, MeasurementsEntry>>,
75}
76
77impl MeasurementsTable {
78 pub fn new() -> Self {
80 Self {
81 #[cfg(not(target_arch = "wasm32"))]
82 entries: DashMap::new(),
83 #[cfg(target_arch = "wasm32")]
84 entries: std::sync::Mutex::new(HashMap::new()),
85 }
86 }
87
88 pub fn get(&self, name: &Arc<Name>) -> Option<MeasurementsEntry> {
90 #[cfg(not(target_arch = "wasm32"))]
91 return self.entries.get(name).map(|r| r.clone());
92 #[cfg(target_arch = "wasm32")]
93 return self.entries.lock().unwrap().get(name).cloned();
94 }
95
96 pub fn update_rtt(&self, name: Arc<Name>, face: FaceId, rtt_ns: f64) {
98 #[cfg(not(target_arch = "wasm32"))]
99 {
100 let mut entry = self.entries.entry(name).or_default();
101 entry.rtt_per_face.entry(face).or_default().update(rtt_ns);
102 entry.last_updated = now_ns();
103 }
104 #[cfg(target_arch = "wasm32")]
105 {
106 let mut entries = self.entries.lock().unwrap();
107 let entry = entries.entry(name).or_default();
108 entry.rtt_per_face.entry(face).or_default().update(rtt_ns);
109 entry.last_updated = now_ns();
110 }
111 }
112
113 pub fn dump(&self) -> Vec<(Arc<Name>, MeasurementsEntry)> {
117 #[cfg(not(target_arch = "wasm32"))]
118 return self
119 .entries
120 .iter()
121 .map(|r| (Arc::clone(r.key()), r.value().clone()))
122 .collect();
123 #[cfg(target_arch = "wasm32")]
124 return self
125 .entries
126 .lock()
127 .unwrap()
128 .iter()
129 .map(|(k, v)| (Arc::clone(k), v.clone()))
130 .collect();
131 }
132
133 pub fn update_satisfaction(&self, name: Arc<Name>, satisfied: bool) {
135 const ALPHA: f32 = 0.1;
136 #[cfg(not(target_arch = "wasm32"))]
137 {
138 let mut entry = self.entries.entry(name).or_default();
139 let sample = if satisfied { 1.0f32 } else { 0.0 };
140 entry.satisfaction_rate = (1.0 - ALPHA) * entry.satisfaction_rate + ALPHA * sample;
141 entry.last_updated = now_ns();
142 }
143 #[cfg(target_arch = "wasm32")]
144 {
145 let mut entries = self.entries.lock().unwrap();
146 let entry = entries.entry(name).or_default();
147 let sample = if satisfied { 1.0f32 } else { 0.0 };
148 entry.satisfaction_rate = (1.0 - ALPHA) * entry.satisfaction_rate + ALPHA * sample;
149 entry.last_updated = now_ns();
150 }
151 }
152}
153
154impl Default for MeasurementsTable {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160fn now_ns() -> u64 {
161 use std::time::{SystemTime, UNIX_EPOCH};
162 SystemTime::now()
163 .duration_since(UNIX_EPOCH)
164 .map(|d| d.as_nanos() as u64)
165 .unwrap_or(0)
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use ndn_packet::Name;
172 use ndn_transport::FaceId;
173 use std::sync::Arc;
174
175 #[test]
176 fn ewma_first_sample_initialises_srtt() {
177 let mut rtt = EwmaRtt::default();
178 rtt.update(1_000_000.0); assert_eq!(rtt.srtt_ns, 1_000_000.0);
180 assert_eq!(rtt.rttvar_ns, 500_000.0); assert_eq!(rtt.samples, 1);
182 }
183
184 #[test]
185 fn ewma_second_sample_converges() {
186 let mut rtt = EwmaRtt::default();
187 rtt.update(1_000_000.0);
188 rtt.update(1_000_000.0); assert_eq!(rtt.samples, 2);
190 assert!((rtt.srtt_ns - 1_000_000.0).abs() < 1.0);
191 }
192
193 #[test]
194 fn ewma_rto_is_srtt_plus_four_rttvar() {
195 let mut rtt = EwmaRtt::default();
196 rtt.update(1_000.0);
197 let expected = rtt.srtt_ns + 4.0 * rtt.rttvar_ns;
198 assert!((rtt.rto_ns() - expected).abs() < 1e-6);
199 }
200
201 #[test]
202 fn measurements_table_update_rtt_creates_entry() {
203 let table = MeasurementsTable::new();
204 let name = Arc::new(Name::root());
205 table.update_rtt(Arc::clone(&name), FaceId(1), 500_000.0);
206 let entry = table.get(&name).expect("entry created");
207 assert!(entry.rtt_per_face.contains_key(&FaceId(1)));
208 assert!(entry.last_updated > 0);
209 }
210
211 #[test]
212 fn measurements_table_update_satisfaction_converges() {
213 let table = MeasurementsTable::new();
214 let name = Arc::new(Name::root());
215 for _ in 0..100 {
217 table.update_satisfaction(Arc::clone(&name), true);
218 }
219 let entry = table.get(&name).unwrap();
220 assert!(entry.satisfaction_rate > 0.9);
221 }
222
223 #[test]
224 fn measurements_table_unsatisfied_drives_rate_to_zero() {
225 let table = MeasurementsTable::new();
226 let name = Arc::new(Name::root());
227 for _ in 0..50 {
229 table.update_satisfaction(Arc::clone(&name), true);
230 }
231 for _ in 0..100 {
233 table.update_satisfaction(Arc::clone(&name), false);
234 }
235 let entry = table.get(&name).unwrap();
236 assert!(entry.satisfaction_rate < 0.1);
237 }
238
239 #[test]
240 fn measurements_table_default_is_empty() {
241 let table = MeasurementsTable::default();
242 let name = Arc::new(Name::root());
243 assert!(table.get(&name).is_none());
244 }
245}