ndn_discovery/service_discovery/
mod.rs1mod browsing;
30mod fib_auto;
31mod records;
32
33use std::collections::{HashMap, HashSet};
34use std::sync::Mutex;
35use std::time::Instant;
36
37use bytes::Bytes;
38use ndn_packet::Name;
39use ndn_transport::FaceId;
40use tracing::{debug, info};
41
42use crate::config::ServiceDiscoveryConfig;
43use crate::context::DiscoveryContext;
44use crate::prefix_announce::ServiceRecord;
45use crate::protocol::{DiscoveryProtocol, InboundMeta, ProtocolId};
46use crate::scope::{peers_prefix, sd_services};
47
48pub use browsing::decode_peer_list;
49use fib_auto::AutoFibEntry;
50use records::{ProducerRateLimit, RecordEntry};
51
52const PROTOCOL: ProtocolId = ProtocolId("service-discovery");
53
54pub struct ServiceDiscoveryProtocol {
63 #[expect(dead_code)]
65 node_name: Name,
66 pub(super) config: ServiceDiscoveryConfig,
68 claimed: Vec<Name>,
70 pub(super) local_records: Mutex<Vec<RecordEntry>>,
72 pub(super) peer_records: Mutex<Vec<ServiceRecord>>,
77 pub(super) rate_limits: Mutex<HashMap<String, ProducerRateLimit>>,
79 pub(super) auto_fib: Mutex<Vec<AutoFibEntry>>,
81 pub(super) browsed_neighbors: Mutex<HashSet<Name>>,
91 pub(super) last_browse: Mutex<Option<Instant>>,
94}
95
96impl ServiceDiscoveryProtocol {
97 pub fn new(node_name: Name, config: ServiceDiscoveryConfig) -> Self {
102 let claimed = vec![sd_services().clone(), peers_prefix().clone()];
106 Self {
107 node_name,
108 config,
109 claimed,
110 local_records: Mutex::new(Vec::new()),
111 peer_records: Mutex::new(Vec::new()),
112 rate_limits: Mutex::new(HashMap::new()),
113 auto_fib: Mutex::new(Vec::new()),
114 browsed_neighbors: Mutex::new(HashSet::new()),
115 last_browse: Mutex::new(None),
116 }
117 }
118
119 pub fn with_defaults(node_name: Name) -> Self {
121 Self::new(node_name, ServiceDiscoveryConfig::default())
122 }
123}
124
125impl DiscoveryProtocol for ServiceDiscoveryProtocol {
128 fn protocol_id(&self) -> ProtocolId {
129 PROTOCOL
130 }
131
132 fn claimed_prefixes(&self) -> &[Name] {
133 &self.claimed
134 }
135
136 fn on_face_up(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {
137 }
142
143 fn on_face_down(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
144 {
148 let mut local = self.local_records.lock().unwrap();
149 let before = local.len();
150 local.retain(|e| e.owner_face != Some(face_id));
151 let removed = before - local.len();
152 if removed > 0 {
153 info!(face = ?face_id, count = removed, "ServiceDiscovery: withdrew local records for downed face");
154 }
155 }
156
157 let affected: Vec<Name> = ctx
159 .neighbors()
160 .all()
161 .into_iter()
162 .filter(|e| e.faces.iter().any(|(fid, _, _)| *fid == face_id))
163 .map(|e| e.node_name.clone())
164 .collect();
165
166 if !affected.is_empty() {
168 let mut peer_recs = self.peer_records.lock().unwrap();
169 peer_recs.retain(|r| !affected.contains(&r.node_name));
170 debug!(
171 nodes = ?affected.iter().map(|n| n.to_string()).collect::<Vec<_>>(),
172 "ServiceDiscovery: evicted peer records for face-down nodes",
173 );
174 }
175
176 let mut fib_removals: Vec<(Name, FaceId)> = Vec::new();
179 {
180 let mut auto_fib = self.auto_fib.lock().unwrap();
181 auto_fib.retain(|e| {
182 if e.face_id == face_id {
183 fib_removals.push((e.prefix.clone(), e.face_id));
184 false
185 } else {
186 true
187 }
188 });
189 }
190 for (prefix, fid) in &fib_removals {
191 ctx.remove_fib_entry(prefix, *fid, PROTOCOL);
192 }
193 if !fib_removals.is_empty() {
194 debug!(count = fib_removals.len(), face = ?face_id, "ServiceDiscovery: removed auto-FIB entries for downed face");
195 }
196
197 if !affected.is_empty() {
201 let mut seen = self.browsed_neighbors.lock().unwrap();
202 for name in &affected {
203 seen.remove(name);
204 }
205 }
206 }
207
208 fn on_inbound(
209 &self,
210 raw: &Bytes,
211 incoming_face: FaceId,
212 _meta: &InboundMeta,
213 ctx: &dyn DiscoveryContext,
214 ) -> bool {
215 match raw.first() {
217 Some(&0x05) => {
218 if self.handle_sd_interest(raw, incoming_face, ctx) {
220 return true;
221 }
222 self.handle_peers_interest(raw, incoming_face, ctx)
223 }
224 Some(&0x06) => {
225 self.handle_sd_data(raw, incoming_face, ctx)
227 }
228 _ => false,
229 }
230 }
231
232 fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
233 self.expire_auto_fib(now, ctx);
235
236 self.expire_local_records(now);
238
239 let browse_interval = self.compute_browse_interval(now);
246 self.browse_neighbors(now, browse_interval, ctx);
247 }
248}
249
250#[cfg(test)]
253mod tests {
254 use std::str::FromStr;
255 use std::time::Duration;
256
257 use super::*;
258 use crate::wire::write_name_tlv;
259 use crate::{MacAddr, NeighborTable};
260 use ndn_tlv::TlvWriter;
261
262 fn name(s: &str) -> Name {
263 Name::from_str(s).unwrap()
264 }
265
266 fn make_sd() -> ServiceDiscoveryProtocol {
267 ServiceDiscoveryProtocol::with_defaults(name("/ndn/test/node"))
268 }
269
270 #[test]
271 fn publish_and_withdraw() {
272 let sd = make_sd();
273 let rec = ServiceRecord::new(name("/ndn/sensor/temp"), name("/ndn/test/node"));
274 sd.publish(rec);
275 {
276 let records = sd.local_records.lock().unwrap();
277 assert_eq!(records.len(), 1);
278 }
279 sd.withdraw(&name("/ndn/sensor/temp"));
280 {
281 let records = sd.local_records.lock().unwrap();
282 assert!(records.is_empty());
283 }
284 }
285
286 #[test]
287 fn publish_replaces_existing() {
288 let sd = make_sd();
289 let rec1 = ServiceRecord {
290 announced_prefix: name("/ndn/sensor/temp"),
291 node_name: name("/ndn/test/node"),
292 freshness_ms: 30_000,
293 capabilities: 0,
294 };
295 let mut rec2 = rec1.clone();
296 rec2.freshness_ms = 60_000;
297 sd.publish(rec1);
298 sd.publish(rec2);
299 let records = sd.local_records.lock().unwrap();
300 assert_eq!(records.len(), 1);
301 assert_eq!(records[0].record.freshness_ms, 60_000);
302 }
303
304 #[test]
305 fn claimed_prefixes_includes_sd_and_peers() {
306 let sd = make_sd();
307 let claimed = sd.claimed_prefixes();
308 assert!(claimed.iter().any(|p| p.has_prefix(sd_services())));
309 assert!(claimed.iter().any(|p| p == peers_prefix()));
310 }
311
312 #[test]
313 fn decode_peer_list_roundtrip() {
314 let mut w = TlvWriter::new();
315 let n1 = name("/ndn/test/peer1");
316 let n2 = name("/ndn/test/peer2");
317 w.write_nested(0xE0, |w: &mut TlvWriter| {
318 write_name_tlv(w, &n1);
319 });
320 w.write_nested(0xE0, |w: &mut TlvWriter| {
321 write_name_tlv(w, &n2);
322 });
323 let content = w.finish();
324 let decoded = decode_peer_list(&content);
325 assert_eq!(decoded.len(), 2);
326 }
327
328 #[test]
329 fn auto_fib_ttl_expiry_on_tick() {
330 use crate::context::DiscoveryContext;
331 use crate::{NeighborTableView, NeighborUpdate};
332 use std::sync::{Arc, Mutex as StdMutex};
333
334 struct TrackCtx {
335 now: Instant,
336 removed: StdMutex<Vec<Name>>,
337 }
338 impl DiscoveryContext for TrackCtx {
339 fn alloc_face_id(&self) -> FaceId {
340 FaceId(0)
341 }
342 fn add_face(&self, _: Arc<dyn ndn_transport::ErasedFace>) -> FaceId {
343 FaceId(0)
344 }
345 fn remove_face(&self, _: FaceId) {}
346 fn add_fib_entry(&self, _: &Name, _: FaceId, _: u32, _: ProtocolId) {}
347 fn remove_fib_entry(&self, prefix: &Name, _: FaceId, _: ProtocolId) {
348 self.removed.lock().unwrap().push(prefix.clone());
349 }
350 fn remove_fib_entries_by_owner(&self, _: ProtocolId) {}
351 fn neighbors(&self) -> Arc<dyn NeighborTableView> {
352 NeighborTable::new()
353 }
354 fn update_neighbor(&self, _: NeighborUpdate) {}
355 fn send_on(&self, _: FaceId, _: Bytes) {}
356 fn now(&self) -> Instant {
357 self.now
358 }
359 }
360
361 let sd = make_sd();
362 let now = Instant::now();
363 let ctx = TrackCtx {
364 now,
365 removed: StdMutex::new(Vec::new()),
366 };
367
368 {
370 let mut af = sd.auto_fib.lock().unwrap();
371 af.push(AutoFibEntry {
372 prefix: name("/ndn/sensor/temp"),
373 face_id: FaceId(7),
374 expires_at: now - Duration::from_millis(1),
375 node_name: name("/ndn/test/peer"),
376 });
377 }
378
379 sd.on_tick(now, &ctx);
380 let removed = ctx.removed.lock().unwrap();
381 assert_eq!(removed.len(), 1);
382 assert_eq!(removed[0], name("/ndn/sensor/temp"));
383 assert!(sd.auto_fib.lock().unwrap().is_empty());
384 }
385
386 #[test]
387 fn relay_records_sends_to_other_peers() {
388 use crate::context::DiscoveryContext;
389 use crate::{
390 NeighborEntry, NeighborState, NeighborTable, NeighborTableView, NeighborUpdate,
391 };
392 use std::sync::{Arc, Mutex as StdMutex};
393
394 struct RelayCtx {
395 neighbors: Arc<NeighborTable>,
396 sent: StdMutex<Vec<(FaceId, Bytes)>>,
397 }
398 impl DiscoveryContext for RelayCtx {
399 fn alloc_face_id(&self) -> FaceId {
400 FaceId(99)
401 }
402 fn add_face(&self, _: Arc<dyn ndn_transport::ErasedFace>) -> FaceId {
403 FaceId(99)
404 }
405 fn remove_face(&self, _: FaceId) {}
406 fn add_fib_entry(&self, _: &Name, _: FaceId, _: u32, _: ProtocolId) {}
407 fn remove_fib_entry(&self, _: &Name, _: FaceId, _: ProtocolId) {}
408 fn remove_fib_entries_by_owner(&self, _: ProtocolId) {}
409 fn neighbors(&self) -> Arc<dyn NeighborTableView> {
410 Arc::clone(&self.neighbors) as Arc<dyn NeighborTableView>
411 }
412 fn update_neighbor(&self, u: NeighborUpdate) {
413 self.neighbors.apply(u);
414 }
415 fn send_on(&self, face_id: FaceId, pkt: Bytes) {
416 self.sent.lock().unwrap().push((face_id, pkt));
417 }
418 fn now(&self) -> Instant {
419 Instant::now()
420 }
421 }
422
423 let cfg = ServiceDiscoveryConfig {
424 relay_records: true,
425 auto_populate_fib: false, ..ServiceDiscoveryConfig::default()
427 };
428 let sd = ServiceDiscoveryProtocol::new(name("/ndn/test/node"), cfg);
429
430 let neighbors = NeighborTable::new();
431 let mut e1 = NeighborEntry::new(name("/ndn/peer/a"));
433 e1.state = NeighborState::Established {
434 last_seen: Instant::now(),
435 };
436 e1.faces = vec![(FaceId(10), MacAddr([0u8; 6]), "eth0".into())];
437 let mut e2 = NeighborEntry::new(name("/ndn/peer/b"));
438 e2.state = NeighborState::Established {
439 last_seen: Instant::now(),
440 };
441 e2.faces = vec![(FaceId(20), MacAddr([0u8; 6]), "eth0".into())];
442 neighbors.apply(NeighborUpdate::Upsert(e1));
443 neighbors.apply(NeighborUpdate::Upsert(e2));
444
445 let ctx = RelayCtx {
446 neighbors,
447 sent: StdMutex::new(Vec::new()),
448 };
449
450 let rec = ServiceRecord {
452 announced_prefix: name("/ndn/sensor/temp"),
453 node_name: name("/ndn/peer/a"),
454 freshness_ms: 10_000,
455 capabilities: 0,
456 };
457 let pkt = rec.build_data(1000);
458
459 sd.on_inbound(&pkt, FaceId(10), &crate::InboundMeta::none(), &ctx);
460
461 let sent = ctx.sent.lock().unwrap();
462 assert!(
464 sent.iter().any(|(fid, _)| *fid == FaceId(20)),
465 "should relay to peer/b"
466 );
467 assert!(
468 !sent.iter().any(|(fid, _)| *fid == FaceId(10)),
469 "must not relay back to source face"
470 );
471 }
472}