ndn_discovery/service_discovery/
browsing.rs1use std::collections::HashSet;
4use std::time::{Duration, Instant};
5
6use bytes::Bytes;
7use ndn_packet::{Name, encode::encode_interest, tlv_type};
8use ndn_tlv::TlvWriter;
9use ndn_transport::FaceId;
10use tracing::{debug, trace, warn};
11
12use crate::config::ServiceValidationPolicy;
13use crate::context::DiscoveryContext;
14use crate::prefix_announce::ServiceRecord;
15use crate::scope::{peers_prefix, sd_services};
16use crate::wire::{parse_raw_data, parse_raw_interest, write_name_tlv, write_nni};
17
18use super::ServiceDiscoveryProtocol;
19
20const T_PEER_ENTRY: u64 = 0xE0;
22
23impl ServiceDiscoveryProtocol {
24 pub(super) fn handle_sd_interest(
27 &self,
28 raw: &Bytes,
29 incoming_face: FaceId,
30 ctx: &dyn DiscoveryContext,
31 ) -> bool {
32 let parsed = match parse_raw_interest(raw) {
33 Some(p) => p,
34 None => return false,
35 };
36
37 let name = &parsed.name;
38 if !name.has_prefix(sd_services()) {
39 return false;
40 }
41
42 let records = self.local_records.lock().unwrap();
44 let mut responded = false;
45 for entry in records.iter() {
46 let pkt = entry.record.build_data(entry.published_at_ms);
47 ctx.send_on(incoming_face, pkt);
48 responded = true;
49 }
50 if responded {
51 debug!(
52 "ServiceDiscovery: answered browse Interest with {} records",
53 records.len()
54 );
55 }
56 true
57 }
58
59 pub(super) fn handle_sd_data(
60 &self,
61 raw: &Bytes,
62 incoming_face: FaceId,
63 ctx: &dyn DiscoveryContext,
64 ) -> bool {
65 let parsed = match parse_raw_data(raw) {
66 Some(d) => d,
67 None => return false,
68 };
69
70 if !parsed.name.has_prefix(sd_services()) {
71 return false;
72 }
73
74 let content = match parsed.content {
75 Some(c) => c,
76 None => return true, };
78
79 let record = match ServiceRecord::decode(&content) {
80 Some(r) => r,
81 None => {
82 debug!("ServiceDiscovery: could not decode ServiceRecord");
83 return true;
84 }
85 };
86
87 match self.config.validation {
89 ServiceValidationPolicy::Skip => {}
90 ServiceValidationPolicy::WarnOnly => {
91 debug!(
93 "ServiceDiscovery: received unvalidated record for {:?}",
94 record.announced_prefix
95 );
96 }
97 ServiceValidationPolicy::Required => {
98 warn!("ServiceDiscovery: dropping unvalidated record (Required policy)");
100 return true;
101 }
102 }
103
104 if !self.is_in_scope(&record.announced_prefix) {
106 debug!(
107 "ServiceDiscovery: record {:?} outside configured scope",
108 record.announced_prefix
109 );
110 return true;
111 }
112
113 if !self.config.auto_populate_prefix_filter.is_empty() {
115 let allowed = self
116 .config
117 .auto_populate_prefix_filter
118 .iter()
119 .any(|f| record.announced_prefix.has_prefix(f));
120 if !allowed {
121 return true;
122 }
123 }
124
125 if !self.check_rate_limit(&record.node_name, ctx.now()) {
127 debug!(
128 "ServiceDiscovery: rate-limiting producer {:?}",
129 record.node_name
130 );
131 return true;
132 }
133
134 {
136 let mut peer_recs = self.peer_records.lock().unwrap();
137 if let Some(idx) = peer_recs.iter().position(|r| {
138 r.announced_prefix == record.announced_prefix && r.node_name == record.node_name
139 }) {
140 peer_recs[idx] = record.clone();
141 } else {
142 peer_recs.push(record.clone());
143 }
144 }
145
146 if self.config.auto_populate_fib {
154 self.auto_populate_fib(&record, incoming_face, ctx);
155 }
156
157 if self.config.relay_records {
160 let relay_faces: Vec<FaceId> = ctx
161 .neighbors()
162 .all()
163 .into_iter()
164 .filter(|e| e.is_reachable())
165 .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid).collect::<Vec<_>>())
166 .filter(|fid| *fid != incoming_face)
167 .collect();
168 let relay_count = relay_faces.len();
169 for face_id in relay_faces {
170 ctx.send_on(face_id, raw.clone());
171 }
172 if relay_count > 0 {
173 debug!(
174 "ServiceDiscovery: relayed record {:?} to {relay_count} peers",
175 record.announced_prefix
176 );
177 }
178 }
179
180 true
181 }
182
183 pub(super) fn handle_peers_interest(
184 &self,
185 raw: &Bytes,
186 incoming_face: FaceId,
187 ctx: &dyn DiscoveryContext,
188 ) -> bool {
189 let parsed = match parse_raw_interest(raw) {
190 Some(p) => p,
191 None => return false,
192 };
193
194 if !parsed.name.has_prefix(peers_prefix()) {
195 return false;
196 }
197
198 let peers_depth = peers_prefix().components().len();
199 let extra_comps = parsed.name.components().len().saturating_sub(peers_depth);
200
201 let peer_list = if extra_comps > 0 {
202 let comps = parsed.name.components();
206 let node_name_comps = &comps[peers_depth..];
207 let mut uri = String::new();
208 for comp in node_name_comps {
209 uri.push('/');
210 for byte in comp.value.iter() {
211 if byte.is_ascii_alphanumeric() || b"-.~_".contains(byte) {
212 uri.push(*byte as char);
213 } else {
214 uri.push_str(&format!("%{byte:02X}"));
215 }
216 }
217 }
218 if uri.is_empty() {
219 uri.push('/');
220 }
221 let target = match std::str::FromStr::from_str(&uri) {
222 Ok(n) => n,
223 Err(_) => return true,
224 };
225 let entry = ctx.neighbors().get(&target);
226 let mut w = TlvWriter::new();
227 if let Some(e) = entry
228 && e.is_reachable()
229 {
230 w.write_nested(T_PEER_ENTRY, |w: &mut TlvWriter| {
231 write_name_tlv(w, &e.node_name);
232 });
233 }
234 let content = w.finish();
235 debug!(
236 "ServiceDiscovery: answered single-peer query for {:?}",
237 target
238 );
239 content
240 } else {
241 let neighbors = ctx.neighbors().all();
243 let mut w = TlvWriter::new();
244 for entry in &neighbors {
245 if entry.is_reachable() {
246 w.write_nested(T_PEER_ENTRY, |w: &mut TlvWriter| {
247 write_name_tlv(w, &entry.node_name);
248 });
249 }
250 }
251 debug!(
252 "ServiceDiscovery: answered peers query with {} neighbors",
253 neighbors.len()
254 );
255 w.finish()
256 };
257
258 let data_name = &parsed.name;
260 let mut dw = TlvWriter::new();
261 dw.write_nested(tlv_type::DATA, |w: &mut TlvWriter| {
262 write_name_tlv(w, data_name);
263 w.write_nested(tlv_type::META_INFO, |w: &mut TlvWriter| {
264 write_nni(w, tlv_type::FRESHNESS_PERIOD, 1000);
266 });
267 w.write_tlv(tlv_type::CONTENT, &peer_list);
268 w.write_nested(tlv_type::SIGNATURE_INFO, |w: &mut TlvWriter| {
269 w.write_tlv(tlv_type::SIGNATURE_TYPE, &[0u8]);
270 });
271 w.write_tlv(tlv_type::SIGNATURE_VALUE, &[0u8; 32]);
272 });
273
274 ctx.send_on(incoming_face, dw.finish());
275 true
276 }
277
278 pub(super) fn send_browse_interest(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
285 let interest = encode_interest(sd_services(), None);
286 ctx.send_on(face_id, interest);
287 trace!(face = ?face_id, "ServiceDiscovery: sent browse Interest");
288 }
289
290 pub(super) fn browse_neighbors(
300 &self,
301 now: Instant,
302 browse_interval: Duration,
303 ctx: &dyn DiscoveryContext,
304 ) {
305 let neighbors = ctx.neighbors().all();
306 let mut seen = self.browsed_neighbors.lock().unwrap();
307 let periodic_due = self
308 .last_browse
309 .lock()
310 .unwrap()
311 .is_none_or(|t| now.duration_since(t) >= browse_interval);
312
313 let mut new_count = 0usize;
314 let mut refresh_count = 0usize;
315
316 for entry in &neighbors {
317 if !entry.is_reachable() {
318 continue;
319 }
320 let is_new = seen.insert(entry.node_name.clone());
321 if is_new {
322 for (face_id, _, _) in &entry.faces {
324 self.send_browse_interest(*face_id, ctx);
325 }
326 new_count += 1;
327 } else if periodic_due {
328 for (face_id, _, _) in &entry.faces {
330 self.send_browse_interest(*face_id, ctx);
331 }
332 refresh_count += 1;
333 }
334 }
335
336 if periodic_due {
337 *self.last_browse.lock().unwrap() = Some(now);
338 }
339 if new_count > 0 {
340 debug!(
341 peers = new_count,
342 "ServiceDiscovery: initial browse sent to new neighbors"
343 );
344 }
345 if refresh_count > 0 {
346 debug!(
347 peers = refresh_count,
348 "ServiceDiscovery: periodic browse refresh sent"
349 );
350 }
351
352 let active: HashSet<Name> = neighbors
355 .iter()
356 .filter(|e| e.is_reachable())
357 .map(|e| e.node_name.clone())
358 .collect();
359 seen.retain(|n| active.contains(n));
360 }
361}
362
363pub fn decode_peer_list(content: &[u8]) -> Vec<Name> {
369 let mut peers = Vec::new();
370 let mut pos = 0;
371 while pos < content.len() {
372 let Some((typ, len, hl)) = read_tlv_header(content, pos) else {
373 break;
374 };
375 let val = &content[pos + hl..pos + hl + len];
376 if typ == T_PEER_ENTRY as u32
377 && let Some(name) = decode_name_tlv(val)
378 {
379 peers.push(name);
380 }
381 pos += hl + len;
382 }
383 peers
384}
385
386fn read_tlv_header(b: &[u8], pos: usize) -> Option<(u32, usize, usize)> {
387 if pos >= b.len() {
388 return None;
389 }
390 let (typ, t_len) = read_varnumber(b, pos)?;
391 let (len, l_len) = read_varnumber(b, pos + t_len)?;
392 Some((typ as u32, len as usize, t_len + l_len))
393}
394
395fn read_varnumber(b: &[u8], pos: usize) -> Option<(u64, usize)> {
396 let first = *b.get(pos)?;
397 match first {
398 0xFD => {
399 let hi = *b.get(pos + 1)? as u64;
400 let lo = *b.get(pos + 2)? as u64;
401 Some(((hi << 8) | lo, 3))
402 }
403 0xFE => {
404 let v = u32::from_be_bytes(b[pos + 1..pos + 5].try_into().ok()?);
405 Some((v as u64, 5))
406 }
407 0xFF => {
408 let v = u64::from_be_bytes(b[pos + 1..pos + 9].try_into().ok()?);
409 Some((v, 9))
410 }
411 _ => Some((first as u64, 1)),
412 }
413}
414
415fn decode_name_tlv(b: &[u8]) -> Option<Name> {
416 if b.is_empty() || b[0] != 0x07 {
418 return None;
419 }
420 use ndn_packet::NameComponent;
421 let (_, len, hl) = read_tlv_header(b, 0)?;
422 let comps_bytes = &b[hl..hl + len];
423 let mut comps = Vec::new();
424 let mut pos = 0;
425 while pos < comps_bytes.len() {
426 let (typ, clen, chl) = read_tlv_header(comps_bytes, pos)?;
427 let val = comps_bytes[pos + chl..pos + chl + clen].to_vec();
428 comps.push(NameComponent {
429 typ: typ as u64,
430 value: val.into(),
431 });
432 pos += chl + clen;
433 }
434 if comps.is_empty() {
435 return Some(Name::root());
436 }
437 let mut uri = String::new();
439 for comp in &comps {
440 uri.push('/');
441 for byte in comp.value.iter() {
442 if byte.is_ascii_alphanumeric() || b"-.~_".contains(byte) {
443 uri.push(*byte as char);
444 } else {
445 uri.push_str(&format!("%{byte:02X}"));
446 }
447 }
448 }
449 if uri.is_empty() {
450 uri.push('/');
451 }
452 use std::str::FromStr;
453 Name::from_str(&uri).ok()
454}