ndn_discovery/service_discovery/
browsing.rs

1//! Interest handling for service browsing, response encoding, and peer list.
2
3use std::collections::HashSet;
4use std::time::{Duration, Instant};
5
6use bytes::Bytes;
7use ndn_packet::{Name, encode::encode_interest, tlv_type};
8use ndn_tlv::TlvWriter;
9use ndn_transport::FaceId;
10use tracing::{debug, trace, warn};
11
12use crate::config::ServiceValidationPolicy;
13use crate::context::DiscoveryContext;
14use crate::prefix_announce::ServiceRecord;
15use crate::scope::{peers_prefix, sd_services};
16use crate::wire::{parse_raw_data, parse_raw_interest, write_name_tlv, write_nni};
17
18use super::ServiceDiscoveryProtocol;
19
20/// TLV type for a peer entry in the `/ndn/local/nd/peers` response.
21const T_PEER_ENTRY: u64 = 0xE0;
22
23impl ServiceDiscoveryProtocol {
24    // ── Inbound handlers ──────────────────────────────────────────────────────
25
26    pub(super) fn handle_sd_interest(
27        &self,
28        raw: &Bytes,
29        incoming_face: FaceId,
30        ctx: &dyn DiscoveryContext,
31    ) -> bool {
32        let parsed = match parse_raw_interest(raw) {
33            Some(p) => p,
34            None => return false,
35        };
36
37        let name = &parsed.name;
38        if !name.has_prefix(sd_services()) {
39            return false;
40        }
41
42        // Browse Interest: respond with all locally published records.
43        let records = self.local_records.lock().unwrap();
44        let mut responded = false;
45        for entry in records.iter() {
46            let pkt = entry.record.build_data(entry.published_at_ms);
47            ctx.send_on(incoming_face, pkt);
48            responded = true;
49        }
50        if responded {
51            debug!(
52                "ServiceDiscovery: answered browse Interest with {} records",
53                records.len()
54            );
55        }
56        true
57    }
58
59    pub(super) fn handle_sd_data(
60        &self,
61        raw: &Bytes,
62        incoming_face: FaceId,
63        ctx: &dyn DiscoveryContext,
64    ) -> bool {
65        let parsed = match parse_raw_data(raw) {
66            Some(d) => d,
67            None => return false,
68        };
69
70        if !parsed.name.has_prefix(sd_services()) {
71            return false;
72        }
73
74        let content = match parsed.content {
75            Some(c) => c,
76            None => return true, // no content, consume but ignore
77        };
78
79        let record = match ServiceRecord::decode(&content) {
80            Some(r) => r,
81            None => {
82                debug!("ServiceDiscovery: could not decode ServiceRecord");
83                return true;
84            }
85        };
86
87        // Validation policy check.
88        match self.config.validation {
89            ServiceValidationPolicy::Skip => {}
90            ServiceValidationPolicy::WarnOnly => {
91                // In a real implementation, check the signature.  For now, log.
92                debug!(
93                    "ServiceDiscovery: received unvalidated record for {:?}",
94                    record.announced_prefix
95                );
96            }
97            ServiceValidationPolicy::Required => {
98                // Drop unsigned records (signature check not yet wired).
99                warn!("ServiceDiscovery: dropping unvalidated record (Required policy)");
100                return true;
101            }
102        }
103
104        // Scope filter.
105        if !self.is_in_scope(&record.announced_prefix) {
106            debug!(
107                "ServiceDiscovery: record {:?} outside configured scope",
108                record.announced_prefix
109            );
110            return true;
111        }
112
113        // Prefix filter.
114        if !self.config.auto_populate_prefix_filter.is_empty() {
115            let allowed = self
116                .config
117                .auto_populate_prefix_filter
118                .iter()
119                .any(|f| record.announced_prefix.has_prefix(f));
120            if !allowed {
121                return true;
122            }
123        }
124
125        // Rate limit check.
126        if !self.check_rate_limit(&record.node_name, ctx.now()) {
127            debug!(
128                "ServiceDiscovery: rate-limiting producer {:?}",
129                record.node_name
130            );
131            return true;
132        }
133
134        // Cache the peer record for browse queries.
135        {
136            let mut peer_recs = self.peer_records.lock().unwrap();
137            if let Some(idx) = peer_recs.iter().position(|r| {
138                r.announced_prefix == record.announced_prefix && r.node_name == record.node_name
139            }) {
140                peer_recs[idx] = record.clone();
141            } else {
142                peer_recs.push(record.clone());
143            }
144        }
145
146        // Auto-populate FIB with TTL tracking.
147        //
148        // Use the producer's unicast face rather than incoming_face.  Browse
149        // responses arrive on the multicast face (because the remote sends
150        // them back via its own multicast face), and routing data traffic over
151        // the multicast face broadcasts every Interest to all peers.  The
152        // correct nexthop is the unicast face we have for the producer.
153        if self.config.auto_populate_fib {
154            self.auto_populate_fib(&record, incoming_face, ctx);
155        }
156
157        // Relay service record to established neighbors when enabled.
158        // Exclude the face the record arrived on to prevent loops.
159        if self.config.relay_records {
160            let relay_faces: Vec<FaceId> = ctx
161                .neighbors()
162                .all()
163                .into_iter()
164                .filter(|e| e.is_reachable())
165                .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid).collect::<Vec<_>>())
166                .filter(|fid| *fid != incoming_face)
167                .collect();
168            let relay_count = relay_faces.len();
169            for face_id in relay_faces {
170                ctx.send_on(face_id, raw.clone());
171            }
172            if relay_count > 0 {
173                debug!(
174                    "ServiceDiscovery: relayed record {:?} to {relay_count} peers",
175                    record.announced_prefix
176                );
177            }
178        }
179
180        true
181    }
182
183    pub(super) fn handle_peers_interest(
184        &self,
185        raw: &Bytes,
186        incoming_face: FaceId,
187        ctx: &dyn DiscoveryContext,
188    ) -> bool {
189        let parsed = match parse_raw_interest(raw) {
190            Some(p) => p,
191            None => return false,
192        };
193
194        if !parsed.name.has_prefix(peers_prefix()) {
195            return false;
196        }
197
198        let peers_depth = peers_prefix().components().len();
199        let extra_comps = parsed.name.components().len().saturating_sub(peers_depth);
200
201        let peer_list = if extra_comps > 0 {
202            // `/ndn/local/nd/peers/<node-name>` — single-peer query.
203            // The node name follows the peers prefix; reconstruct it from the
204            // extra components (everything after the prefix depth).
205            let comps = parsed.name.components();
206            let node_name_comps = &comps[peers_depth..];
207            let mut uri = String::new();
208            for comp in node_name_comps {
209                uri.push('/');
210                for byte in comp.value.iter() {
211                    if byte.is_ascii_alphanumeric() || b"-.~_".contains(byte) {
212                        uri.push(*byte as char);
213                    } else {
214                        uri.push_str(&format!("%{byte:02X}"));
215                    }
216                }
217            }
218            if uri.is_empty() {
219                uri.push('/');
220            }
221            let target = match std::str::FromStr::from_str(&uri) {
222                Ok(n) => n,
223                Err(_) => return true,
224            };
225            let entry = ctx.neighbors().get(&target);
226            let mut w = TlvWriter::new();
227            if let Some(e) = entry
228                && e.is_reachable()
229            {
230                w.write_nested(T_PEER_ENTRY, |w: &mut TlvWriter| {
231                    write_name_tlv(w, &e.node_name);
232                });
233            }
234            let content = w.finish();
235            debug!(
236                "ServiceDiscovery: answered single-peer query for {:?}",
237                target
238            );
239            content
240        } else {
241            // `/ndn/local/nd/peers` — full peer list.
242            let neighbors = ctx.neighbors().all();
243            let mut w = TlvWriter::new();
244            for entry in &neighbors {
245                if entry.is_reachable() {
246                    w.write_nested(T_PEER_ENTRY, |w: &mut TlvWriter| {
247                        write_name_tlv(w, &entry.node_name);
248                    });
249                }
250            }
251            debug!(
252                "ServiceDiscovery: answered peers query with {} neighbors",
253                neighbors.len()
254            );
255            w.finish()
256        };
257
258        // Build a Data response at the exact Interest name.
259        let data_name = &parsed.name;
260        let mut dw = TlvWriter::new();
261        dw.write_nested(tlv_type::DATA, |w: &mut TlvWriter| {
262            write_name_tlv(w, data_name);
263            w.write_nested(tlv_type::META_INFO, |w: &mut TlvWriter| {
264                // FreshnessPeriod = 1 s (peer list changes frequently)
265                write_nni(w, tlv_type::FRESHNESS_PERIOD, 1000);
266            });
267            w.write_tlv(tlv_type::CONTENT, &peer_list);
268            w.write_nested(tlv_type::SIGNATURE_INFO, |w: &mut TlvWriter| {
269                w.write_tlv(tlv_type::SIGNATURE_TYPE, &[0u8]);
270            });
271            w.write_tlv(tlv_type::SIGNATURE_VALUE, &[0u8; 32]);
272        });
273
274        ctx.send_on(incoming_face, dw.finish());
275        true
276    }
277
278    // ── Browse helpers ────────────────────────────────────────────────────────
279
280    /// Send a browse Interest on `face_id` to solicit service records from
281    /// the peer on that face.  When the peer's SD protocol receives this
282    /// Interest it will respond with its local records as Data packets;
283    /// those are handled by [`handle_sd_data`] which auto-populates the FIB.
284    pub(super) fn send_browse_interest(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
285        let interest = encode_interest(sd_services(), None);
286        ctx.send_on(face_id, interest);
287        trace!(face = ?face_id, "ServiceDiscovery: sent browse Interest");
288    }
289
290    /// Browse established neighbors, distinguishing two cases:
291    ///
292    /// - **Newly established** (not yet in `browsed_neighbors`): browse
293    ///   immediately regardless of the periodic interval.
294    /// - **Already browsed**: re-browse only if `browse_interval` has elapsed
295    ///   since `last_browse`.
296    ///
297    /// Using the neighbor table (not raw face IDs) ensures that management
298    /// and app faces are never sent unsolicited browse Interests.
299    pub(super) fn browse_neighbors(
300        &self,
301        now: Instant,
302        browse_interval: Duration,
303        ctx: &dyn DiscoveryContext,
304    ) {
305        let neighbors = ctx.neighbors().all();
306        let mut seen = self.browsed_neighbors.lock().unwrap();
307        let periodic_due = self
308            .last_browse
309            .lock()
310            .unwrap()
311            .is_none_or(|t| now.duration_since(t) >= browse_interval);
312
313        let mut new_count = 0usize;
314        let mut refresh_count = 0usize;
315
316        for entry in &neighbors {
317            if !entry.is_reachable() {
318                continue;
319            }
320            let is_new = seen.insert(entry.node_name.clone());
321            if is_new {
322                // First time we see this neighbor as Established — browse now.
323                for (face_id, _, _) in &entry.faces {
324                    self.send_browse_interest(*face_id, ctx);
325                }
326                new_count += 1;
327            } else if periodic_due {
328                // Periodic refresh for already-known neighbors.
329                for (face_id, _, _) in &entry.faces {
330                    self.send_browse_interest(*face_id, ctx);
331                }
332                refresh_count += 1;
333            }
334        }
335
336        if periodic_due {
337            *self.last_browse.lock().unwrap() = Some(now);
338        }
339        if new_count > 0 {
340            debug!(
341                peers = new_count,
342                "ServiceDiscovery: initial browse sent to new neighbors"
343            );
344        }
345        if refresh_count > 0 {
346            debug!(
347                peers = refresh_count,
348                "ServiceDiscovery: periodic browse refresh sent"
349            );
350        }
351
352        // Prune departed neighbors from the seen set so they get a fresh
353        // initial browse if they reconnect later.
354        let active: HashSet<Name> = neighbors
355            .iter()
356            .filter(|e| e.is_reachable())
357            .map(|e| e.node_name.clone())
358            .collect();
359        seen.retain(|n| active.contains(n));
360    }
361}
362
363// ── Wire helpers ──────────────────────────────────────────────────────────────
364
365/// Decode a `PeerList` Data Content into a `Vec<Name>`.
366///
367/// Used by consumers of the `/ndn/local/nd/peers` response.
368pub fn decode_peer_list(content: &[u8]) -> Vec<Name> {
369    let mut peers = Vec::new();
370    let mut pos = 0;
371    while pos < content.len() {
372        let Some((typ, len, hl)) = read_tlv_header(content, pos) else {
373            break;
374        };
375        let val = &content[pos + hl..pos + hl + len];
376        if typ == T_PEER_ENTRY as u32
377            && let Some(name) = decode_name_tlv(val)
378        {
379            peers.push(name);
380        }
381        pos += hl + len;
382    }
383    peers
384}
385
386fn read_tlv_header(b: &[u8], pos: usize) -> Option<(u32, usize, usize)> {
387    if pos >= b.len() {
388        return None;
389    }
390    let (typ, t_len) = read_varnumber(b, pos)?;
391    let (len, l_len) = read_varnumber(b, pos + t_len)?;
392    Some((typ as u32, len as usize, t_len + l_len))
393}
394
395fn read_varnumber(b: &[u8], pos: usize) -> Option<(u64, usize)> {
396    let first = *b.get(pos)?;
397    match first {
398        0xFD => {
399            let hi = *b.get(pos + 1)? as u64;
400            let lo = *b.get(pos + 2)? as u64;
401            Some(((hi << 8) | lo, 3))
402        }
403        0xFE => {
404            let v = u32::from_be_bytes(b[pos + 1..pos + 5].try_into().ok()?);
405            Some((v as u64, 5))
406        }
407        0xFF => {
408            let v = u64::from_be_bytes(b[pos + 1..pos + 9].try_into().ok()?);
409            Some((v, 9))
410        }
411        _ => Some((first as u64, 1)),
412    }
413}
414
415fn decode_name_tlv(b: &[u8]) -> Option<Name> {
416    // b is the value of a T_PEER_ENTRY, which is a Name TLV (type 0x07).
417    if b.is_empty() || b[0] != 0x07 {
418        return None;
419    }
420    use ndn_packet::NameComponent;
421    let (_, len, hl) = read_tlv_header(b, 0)?;
422    let comps_bytes = &b[hl..hl + len];
423    let mut comps = Vec::new();
424    let mut pos = 0;
425    while pos < comps_bytes.len() {
426        let (typ, clen, chl) = read_tlv_header(comps_bytes, pos)?;
427        let val = comps_bytes[pos + chl..pos + chl + clen].to_vec();
428        comps.push(NameComponent {
429            typ: typ as u64,
430            value: val.into(),
431        });
432        pos += chl + clen;
433    }
434    if comps.is_empty() {
435        return Some(Name::root());
436    }
437    // Build canonical URI to re-parse.
438    let mut uri = String::new();
439    for comp in &comps {
440        uri.push('/');
441        for byte in comp.value.iter() {
442            if byte.is_ascii_alphanumeric() || b"-.~_".contains(byte) {
443                uri.push(*byte as char);
444            } else {
445                uri.push_str(&format!("%{byte:02X}"));
446            }
447        }
448    }
449    if uri.is_empty() {
450        uri.push('/');
451    }
452    use std::str::FromStr;
453    Name::from_str(&uri).ok()
454}