ndn_discovery/service_discovery/
mod.rs

1//! `ServiceDiscoveryProtocol` — `/ndn/local/sd/services/` and `/ndn/local/nd/peers`.
2//!
3//! This protocol implementation handles two closely related discovery functions:
4//!
5//! ## 1. Service record publication and browsing
6//!
7//! Producers call [`ServiceDiscoveryProtocol::publish`] to register a
8//! [`ServiceRecord`].  The protocol responds to incoming browse Interests
9//! (`/ndn/local/sd/services/` with `CanBePrefix`) with Data packets for each
10//! locally registered record.
11//!
12//! When an incoming service record Data arrives from a peer, the protocol
13//! optionally auto-populates the FIB using the announced prefix (governed by
14//! [`ServiceDiscoveryConfig::auto_populate_fib`] and related fields).
15//!
16//! ## 2. Demand-driven peer list (`/ndn/local/nd/peers`)
17//!
18//! Any node can express an Interest for `/ndn/local/nd/peers` to get a
19//! snapshot of the current neighbor table.  The protocol responds with a Data
20//! whose Content is a compact TLV list of neighbor names.
21//!
22//! ## Wire format — Peers response
23//!
24//! ```text
25//! PeerList ::= (PEER-ENTRY TLV)*
26//! PEER-ENTRY  ::= 0xE0 length Name
27//! ```
28
29mod browsing;
30mod fib_auto;
31mod records;
32
33use std::collections::{HashMap, HashSet};
34use std::sync::Mutex;
35use std::time::Instant;
36
37use bytes::Bytes;
38use ndn_packet::Name;
39use ndn_transport::FaceId;
40use tracing::{debug, info};
41
42use crate::config::ServiceDiscoveryConfig;
43use crate::context::DiscoveryContext;
44use crate::prefix_announce::ServiceRecord;
45use crate::protocol::{DiscoveryProtocol, InboundMeta, ProtocolId};
46use crate::scope::{peers_prefix, sd_services};
47
48pub use browsing::decode_peer_list;
49use fib_auto::AutoFibEntry;
50use records::{ProducerRateLimit, RecordEntry};
51
52const PROTOCOL: ProtocolId = ProtocolId("service-discovery");
53
54/// Service discovery and peer-list protocol.
55///
56/// Attach alongside [`UdpNeighborDiscovery`] or [`EtherNeighborDiscovery`] in
57/// a [`CompositeDiscovery`] to enable service record publication/browsing and
58/// demand-driven neighbor queries.
59///
60/// [`UdpNeighborDiscovery`]: crate::UdpNeighborDiscovery
61/// [`CompositeDiscovery`]: crate::CompositeDiscovery
62pub struct ServiceDiscoveryProtocol {
63    /// This node's NDN name (used when building responses).
64    #[expect(dead_code)]
65    node_name: Name,
66    /// Service discovery configuration.
67    pub(super) config: ServiceDiscoveryConfig,
68    /// Claimed name prefixes.
69    claimed: Vec<Name>,
70    /// Locally published service records.
71    pub(super) local_records: Mutex<Vec<RecordEntry>>,
72    /// Service records received from remote peers.
73    ///
74    /// Populated by [`handle_sd_data`].  Deduplicated on
75    /// `(announced_prefix, node_name)`: re-receiving a record updates it in-place.
76    pub(super) peer_records: Mutex<Vec<ServiceRecord>>,
77    /// Per-producer rate-limit state.
78    pub(super) rate_limits: Mutex<HashMap<String, ProducerRateLimit>>,
79    /// Auto-populated FIB entries pending TTL expiry.
80    pub(super) auto_fib: Mutex<Vec<AutoFibEntry>>,
81    /// Neighbors whose faces have already received an initial browse Interest.
82    ///
83    /// When `on_tick()` first sees a neighbor in `Established` state its name
84    /// is added here and a browse Interest is sent immediately (no interval
85    /// wait).  Periodic re-browse is then throttled by `last_browse`.
86    ///
87    /// Using the neighbor table (not raw face IDs) means management and app
88    /// faces — which are not NDN neighbors — are never browsed, avoiding the
89    /// "malformed management response" error when ndn-ctl connects.
90    pub(super) browsed_neighbors: Mutex<HashSet<Name>>,
91    /// Timestamp of the last periodic browse broadcast to all established
92    /// neighbors.  Used to throttle re-browse in `on_tick()`.
93    pub(super) last_browse: Mutex<Option<Instant>>,
94}
95
96impl ServiceDiscoveryProtocol {
97    /// Create a new `ServiceDiscoveryProtocol`.
98    ///
99    /// - `node_name`: this node's NDN name.
100    /// - `config`: service discovery parameters.
101    pub fn new(node_name: Name, config: ServiceDiscoveryConfig) -> Self {
102        // Claimed prefixes: sd/services for service records, nd/peers for the
103        // demand-driven neighbor list.  The nd/peers prefix is under nd_root,
104        // not hello_prefix, so it doesn't conflict with hello traffic.
105        let claimed = vec![sd_services().clone(), peers_prefix().clone()];
106        Self {
107            node_name,
108            config,
109            claimed,
110            local_records: Mutex::new(Vec::new()),
111            peer_records: Mutex::new(Vec::new()),
112            rate_limits: Mutex::new(HashMap::new()),
113            auto_fib: Mutex::new(Vec::new()),
114            browsed_neighbors: Mutex::new(HashSet::new()),
115            last_browse: Mutex::new(None),
116        }
117    }
118
119    /// Create with the default [`ServiceDiscoveryConfig`].
120    pub fn with_defaults(node_name: Name) -> Self {
121        Self::new(node_name, ServiceDiscoveryConfig::default())
122    }
123}
124
125// ── DiscoveryProtocol impl ────────────────────────────────────────────────────
126
127impl DiscoveryProtocol for ServiceDiscoveryProtocol {
128    fn protocol_id(&self) -> ProtocolId {
129        PROTOCOL
130    }
131
132    fn claimed_prefixes(&self) -> &[Name] {
133        &self.claimed
134    }
135
136    fn on_face_up(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {
137        // Browse is driven by on_tick() against the neighbor table, not here.
138        // on_face_up fires for ALL faces including management/app IPC faces;
139        // sending a browse Interest to those faces corrupts the management
140        // request/response serialisation at the client.
141    }
142
143    fn on_face_down(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
144        // Withdraw local service records that were owned by this face.
145        // This fires when an app's data face goes down, removing service
146        // records that would otherwise remain stale indefinitely.
147        {
148            let mut local = self.local_records.lock().unwrap();
149            let before = local.len();
150            local.retain(|e| e.owner_face != Some(face_id));
151            let removed = before - local.len();
152            if removed > 0 {
153                info!(face = ?face_id, count = removed, "ServiceDiscovery: withdrew local records for downed face");
154            }
155        }
156
157        // Find which neighbors were reachable via this face.
158        let affected: Vec<Name> = ctx
159            .neighbors()
160            .all()
161            .into_iter()
162            .filter(|e| e.faces.iter().any(|(fid, _, _)| *fid == face_id))
163            .map(|e| e.node_name.clone())
164            .collect();
165
166        // Evict peer records received from those nodes.
167        if !affected.is_empty() {
168            let mut peer_recs = self.peer_records.lock().unwrap();
169            peer_recs.retain(|r| !affected.contains(&r.node_name));
170            debug!(
171                nodes = ?affected.iter().map(|n| n.to_string()).collect::<Vec<_>>(),
172                "ServiceDiscovery: evicted peer records for face-down nodes",
173            );
174        }
175
176        // Remove auto-FIB entries that route via the downed face, and
177        // immediately remove the FIB entries from the engine.
178        let mut fib_removals: Vec<(Name, FaceId)> = Vec::new();
179        {
180            let mut auto_fib = self.auto_fib.lock().unwrap();
181            auto_fib.retain(|e| {
182                if e.face_id == face_id {
183                    fib_removals.push((e.prefix.clone(), e.face_id));
184                    false
185                } else {
186                    true
187                }
188            });
189        }
190        for (prefix, fid) in &fib_removals {
191            ctx.remove_fib_entry(prefix, *fid, PROTOCOL);
192        }
193        if !fib_removals.is_empty() {
194            debug!(count = fib_removals.len(), face = ?face_id, "ServiceDiscovery: removed auto-FIB entries for downed face");
195        }
196
197        // Reset browsed state for affected nodes so they receive a fresh
198        // initial browse when they reconnect (rather than waiting for the
199        // periodic interval).
200        if !affected.is_empty() {
201            let mut seen = self.browsed_neighbors.lock().unwrap();
202            for name in &affected {
203                seen.remove(name);
204            }
205        }
206    }
207
208    fn on_inbound(
209        &self,
210        raw: &Bytes,
211        incoming_face: FaceId,
212        _meta: &InboundMeta,
213        ctx: &dyn DiscoveryContext,
214    ) -> bool {
215        // Bytes arrive LP-unwrapped from the pipeline; dispatch directly.
216        match raw.first() {
217            Some(&0x05) => {
218                // Interest
219                if self.handle_sd_interest(raw, incoming_face, ctx) {
220                    return true;
221                }
222                self.handle_peers_interest(raw, incoming_face, ctx)
223            }
224            Some(&0x06) => {
225                // Data
226                self.handle_sd_data(raw, incoming_face, ctx)
227            }
228            _ => false,
229        }
230    }
231
232    fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
233        // Expire auto-populated FIB entries past their TTL.
234        self.expire_auto_fib(now, ctx);
235
236        // Expire local records that have a finite TTL (publish_with_ttl).
237        self.expire_local_records(now);
238
239        // Browse neighbor faces to exchange service records.
240        //
241        // Interval: half the shortest remaining auto-FIB TTL (guarantees
242        // refresh before expiry), floored at 10 s to avoid hammering on
243        // fast-tick profiles.  Newly-Established neighbors always get an
244        // immediate initial browse regardless of the interval.
245        let browse_interval = self.compute_browse_interval(now);
246        self.browse_neighbors(now, browse_interval, ctx);
247    }
248}
249
250// ── Tests ──────────────────────────────────────────────────────────────────────
251
252#[cfg(test)]
253mod tests {
254    use std::str::FromStr;
255    use std::time::Duration;
256
257    use super::*;
258    use crate::wire::write_name_tlv;
259    use crate::{MacAddr, NeighborTable};
260    use ndn_tlv::TlvWriter;
261
262    fn name(s: &str) -> Name {
263        Name::from_str(s).unwrap()
264    }
265
266    fn make_sd() -> ServiceDiscoveryProtocol {
267        ServiceDiscoveryProtocol::with_defaults(name("/ndn/test/node"))
268    }
269
270    #[test]
271    fn publish_and_withdraw() {
272        let sd = make_sd();
273        let rec = ServiceRecord::new(name("/ndn/sensor/temp"), name("/ndn/test/node"));
274        sd.publish(rec);
275        {
276            let records = sd.local_records.lock().unwrap();
277            assert_eq!(records.len(), 1);
278        }
279        sd.withdraw(&name("/ndn/sensor/temp"));
280        {
281            let records = sd.local_records.lock().unwrap();
282            assert!(records.is_empty());
283        }
284    }
285
286    #[test]
287    fn publish_replaces_existing() {
288        let sd = make_sd();
289        let rec1 = ServiceRecord {
290            announced_prefix: name("/ndn/sensor/temp"),
291            node_name: name("/ndn/test/node"),
292            freshness_ms: 30_000,
293            capabilities: 0,
294        };
295        let mut rec2 = rec1.clone();
296        rec2.freshness_ms = 60_000;
297        sd.publish(rec1);
298        sd.publish(rec2);
299        let records = sd.local_records.lock().unwrap();
300        assert_eq!(records.len(), 1);
301        assert_eq!(records[0].record.freshness_ms, 60_000);
302    }
303
304    #[test]
305    fn claimed_prefixes_includes_sd_and_peers() {
306        let sd = make_sd();
307        let claimed = sd.claimed_prefixes();
308        assert!(claimed.iter().any(|p| p.has_prefix(sd_services())));
309        assert!(claimed.iter().any(|p| p == peers_prefix()));
310    }
311
312    #[test]
313    fn decode_peer_list_roundtrip() {
314        let mut w = TlvWriter::new();
315        let n1 = name("/ndn/test/peer1");
316        let n2 = name("/ndn/test/peer2");
317        w.write_nested(0xE0, |w: &mut TlvWriter| {
318            write_name_tlv(w, &n1);
319        });
320        w.write_nested(0xE0, |w: &mut TlvWriter| {
321            write_name_tlv(w, &n2);
322        });
323        let content = w.finish();
324        let decoded = decode_peer_list(&content);
325        assert_eq!(decoded.len(), 2);
326    }
327
328    #[test]
329    fn auto_fib_ttl_expiry_on_tick() {
330        use crate::context::DiscoveryContext;
331        use crate::{NeighborTableView, NeighborUpdate};
332        use std::sync::{Arc, Mutex as StdMutex};
333
334        struct TrackCtx {
335            now: Instant,
336            removed: StdMutex<Vec<Name>>,
337        }
338        impl DiscoveryContext for TrackCtx {
339            fn alloc_face_id(&self) -> FaceId {
340                FaceId(0)
341            }
342            fn add_face(&self, _: Arc<dyn ndn_transport::ErasedFace>) -> FaceId {
343                FaceId(0)
344            }
345            fn remove_face(&self, _: FaceId) {}
346            fn add_fib_entry(&self, _: &Name, _: FaceId, _: u32, _: ProtocolId) {}
347            fn remove_fib_entry(&self, prefix: &Name, _: FaceId, _: ProtocolId) {
348                self.removed.lock().unwrap().push(prefix.clone());
349            }
350            fn remove_fib_entries_by_owner(&self, _: ProtocolId) {}
351            fn neighbors(&self) -> Arc<dyn NeighborTableView> {
352                NeighborTable::new()
353            }
354            fn update_neighbor(&self, _: NeighborUpdate) {}
355            fn send_on(&self, _: FaceId, _: Bytes) {}
356            fn now(&self) -> Instant {
357                self.now
358            }
359        }
360
361        let sd = make_sd();
362        let now = Instant::now();
363        let ctx = TrackCtx {
364            now,
365            removed: StdMutex::new(Vec::new()),
366        };
367
368        // Manually insert an already-expired auto-FIB entry.
369        {
370            let mut af = sd.auto_fib.lock().unwrap();
371            af.push(AutoFibEntry {
372                prefix: name("/ndn/sensor/temp"),
373                face_id: FaceId(7),
374                expires_at: now - Duration::from_millis(1),
375                node_name: name("/ndn/test/peer"),
376            });
377        }
378
379        sd.on_tick(now, &ctx);
380        let removed = ctx.removed.lock().unwrap();
381        assert_eq!(removed.len(), 1);
382        assert_eq!(removed[0], name("/ndn/sensor/temp"));
383        assert!(sd.auto_fib.lock().unwrap().is_empty());
384    }
385
386    #[test]
387    fn relay_records_sends_to_other_peers() {
388        use crate::context::DiscoveryContext;
389        use crate::{
390            NeighborEntry, NeighborState, NeighborTable, NeighborTableView, NeighborUpdate,
391        };
392        use std::sync::{Arc, Mutex as StdMutex};
393
394        struct RelayCtx {
395            neighbors: Arc<NeighborTable>,
396            sent: StdMutex<Vec<(FaceId, Bytes)>>,
397        }
398        impl DiscoveryContext for RelayCtx {
399            fn alloc_face_id(&self) -> FaceId {
400                FaceId(99)
401            }
402            fn add_face(&self, _: Arc<dyn ndn_transport::ErasedFace>) -> FaceId {
403                FaceId(99)
404            }
405            fn remove_face(&self, _: FaceId) {}
406            fn add_fib_entry(&self, _: &Name, _: FaceId, _: u32, _: ProtocolId) {}
407            fn remove_fib_entry(&self, _: &Name, _: FaceId, _: ProtocolId) {}
408            fn remove_fib_entries_by_owner(&self, _: ProtocolId) {}
409            fn neighbors(&self) -> Arc<dyn NeighborTableView> {
410                Arc::clone(&self.neighbors) as Arc<dyn NeighborTableView>
411            }
412            fn update_neighbor(&self, u: NeighborUpdate) {
413                self.neighbors.apply(u);
414            }
415            fn send_on(&self, face_id: FaceId, pkt: Bytes) {
416                self.sent.lock().unwrap().push((face_id, pkt));
417            }
418            fn now(&self) -> Instant {
419                Instant::now()
420            }
421        }
422
423        let cfg = ServiceDiscoveryConfig {
424            relay_records: true,
425            auto_populate_fib: false, // keep test focused on relay only
426            ..ServiceDiscoveryConfig::default()
427        };
428        let sd = ServiceDiscoveryProtocol::new(name("/ndn/test/node"), cfg);
429
430        let neighbors = NeighborTable::new();
431        // Add two reachable neighbors with different faces.
432        let mut e1 = NeighborEntry::new(name("/ndn/peer/a"));
433        e1.state = NeighborState::Established {
434            last_seen: Instant::now(),
435        };
436        e1.faces = vec![(FaceId(10), MacAddr([0u8; 6]), "eth0".into())];
437        let mut e2 = NeighborEntry::new(name("/ndn/peer/b"));
438        e2.state = NeighborState::Established {
439            last_seen: Instant::now(),
440        };
441        e2.faces = vec![(FaceId(20), MacAddr([0u8; 6]), "eth0".into())];
442        neighbors.apply(NeighborUpdate::Upsert(e1));
443        neighbors.apply(NeighborUpdate::Upsert(e2));
444
445        let ctx = RelayCtx {
446            neighbors,
447            sent: StdMutex::new(Vec::new()),
448        };
449
450        // Build a valid service record Data packet arriving on face 10.
451        let rec = ServiceRecord {
452            announced_prefix: name("/ndn/sensor/temp"),
453            node_name: name("/ndn/peer/a"),
454            freshness_ms: 10_000,
455            capabilities: 0,
456        };
457        let pkt = rec.build_data(1000);
458
459        sd.on_inbound(&pkt, FaceId(10), &crate::InboundMeta::none(), &ctx);
460
461        let sent = ctx.sent.lock().unwrap();
462        // Should relay to face 20 (peer/b), not back to face 10 (source).
463        assert!(
464            sent.iter().any(|(fid, _)| *fid == FaceId(20)),
465            "should relay to peer/b"
466        );
467        assert!(
468            !sent.iter().any(|(fid, _)| *fid == FaceId(10)),
469            "must not relay back to source face"
470        );
471    }
472}