1use std::collections::HashMap;
41use std::net::SocketAddr;
42use std::sync::{Arc, Mutex};
43use std::time::Duration;
44
45use bytes::Bytes;
46use ndn_faces::net::UdpFace;
47use ndn_packet::Name;
48use ndn_packet::encode::DataBuilder;
49use ndn_security::{Ed25519Signer, Ed25519Verifier, Signer, VerifyOutcome};
50use ndn_transport::FaceId;
51use tracing::{debug, error, warn};
52
53use super::medium::{HelloCore, HelloState, LinkMedium};
54use super::protocol::HelloProtocol;
55use crate::config::{DiscoveryConfig, DiscoveryProfile};
56use crate::wire::parse_raw_interest;
57use crate::{
58 DiscoveryContext, HelloPayload, InboundMeta, LinkAddr, MacAddr, NeighborEntry, NeighborUpdate,
59 ProtocolId,
60};
61
62const PROTOCOL: ProtocolId = ProtocolId("udp-nd");
63
64pub struct UdpMedium {
69 multicast_face_ids: Vec<FaceId>,
71 signer: Arc<dyn Signer>,
73 unicast_port: Option<u16>,
75 peer_faces: Mutex<HashMap<SocketAddr, FaceId>>,
77}
78
79pub type UdpNeighborDiscovery = HelloProtocol<UdpMedium>;
81
82impl UdpNeighborDiscovery {
83 pub fn new(multicast_face_id: FaceId, node_name: Name) -> Self {
85 Self::new_multi(
86 vec![multicast_face_id],
87 node_name,
88 DiscoveryConfig::for_profile(&DiscoveryProfile::Lan),
89 )
90 }
91
92 pub fn new_with_config(
93 multicast_face_id: FaceId,
94 node_name: Name,
95 config: DiscoveryConfig,
96 ) -> Self {
97 Self::new_multi(vec![multicast_face_id], node_name, config)
98 }
99
100 pub fn new_multi(face_ids: Vec<FaceId>, node_name: Name, config: DiscoveryConfig) -> Self {
107 let signer = UdpMedium::make_transient_signer(&node_name);
108 Self::new_multi_with_signer(face_ids, node_name, config, signer)
109 }
110
111 pub fn new_multi_with_signer(
113 face_ids: Vec<FaceId>,
114 node_name: Name,
115 config: DiscoveryConfig,
116 signer: Arc<dyn Signer>,
117 ) -> Self {
118 let medium = UdpMedium {
119 multicast_face_ids: face_ids,
120 signer,
121 unicast_port: None,
122 peer_faces: Mutex::new(HashMap::new()),
123 };
124 HelloProtocol::create(medium, node_name, config)
125 }
126
127 pub fn with_unicast_port(mut self, port: u16) -> Self {
129 self.medium.unicast_port = Some(port);
130 self
131 }
132
133 pub fn from_profile(
134 multicast_face_id: FaceId,
135 node_name: Name,
136 profile: &DiscoveryProfile,
137 ) -> Self {
138 Self::new_with_config(
139 multicast_face_id,
140 node_name,
141 DiscoveryConfig::for_profile(profile),
142 )
143 }
144}
145
146impl UdpMedium {
147 fn make_transient_signer(node_name: &Name) -> Arc<dyn Signer> {
149 let name_str = node_name.to_string();
150 let digest = ring::digest::digest(&ring::digest::SHA256, name_str.as_bytes());
151 let seed: &[u8; 32] = digest.as_ref().try_into().expect("SHA-256 is 32 bytes");
152 let key_name = format!("{node_name}/KEY/discovery-transient")
153 .parse::<Name>()
154 .unwrap_or_else(|_| node_name.clone());
155 Arc::new(Ed25519Signer::from_seed(seed, key_name))
156 }
157
158 fn create_udp_face(&self, ctx: &dyn DiscoveryContext, peer_addr: SocketAddr) -> Option<FaceId> {
159 let bind_addr: SocketAddr = if peer_addr.is_ipv4() {
160 "0.0.0.0:0".parse().unwrap()
161 } else {
162 "[::]:0".parse().unwrap()
163 };
164 let std_sock = match std::net::UdpSocket::bind(bind_addr) {
165 Ok(s) => s,
166 Err(e) => {
167 warn!("UdpND: bind failed for {peer_addr}: {e}");
168 return None;
169 }
170 };
171 if let Err(e) = std_sock.set_nonblocking(true) {
172 warn!("UdpND: set_nonblocking: {e}");
173 return None;
174 }
175 let async_sock = match tokio::net::UdpSocket::from_std(std_sock) {
176 Ok(s) => s,
177 Err(e) => {
178 warn!("UdpND: from_std: {e}");
179 return None;
180 }
181 };
182 let face_id = ctx.alloc_face_id();
183 let face = UdpFace::from_socket(face_id, async_sock, peer_addr);
184 let registered = ctx.add_face(std::sync::Arc::new(face));
185 self.peer_faces
186 .lock()
187 .unwrap()
188 .insert(peer_addr, registered);
189 debug!("UdpND: created unicast face {registered:?} -> {peer_addr}");
190 Some(registered)
191 }
192
193 fn ensure_peer(
194 &self,
195 ctx: &dyn DiscoveryContext,
196 _core: &HelloCore,
197 peer_name: &Name,
198 peer_addr: SocketAddr,
199 ) -> Option<FaceId> {
200 let existing = { self.peer_faces.lock().unwrap().get(&peer_addr).copied() };
201 let face_id = if let Some(fid) = existing {
202 fid
203 } else {
204 self.create_udp_face(ctx, peer_addr)?
205 };
206 if ctx.neighbors().get(peer_name).is_none() {
207 ctx.update_neighbor(NeighborUpdate::Upsert(NeighborEntry::new(
208 peer_name.clone(),
209 )));
210 }
211 ctx.update_neighbor(NeighborUpdate::AddFace {
212 name: peer_name.clone(),
213 face_id,
214 mac: MacAddr::new([0; 6]),
215 iface: peer_addr.to_string(),
216 });
217 ctx.add_fib_entry(peer_name, face_id, 0, PROTOCOL);
218 Some(face_id)
219 }
220}
221
222impl LinkMedium for UdpMedium {
223 fn protocol_id(&self) -> ProtocolId {
224 PROTOCOL
225 }
226
227 fn build_hello_data(&self, core: &HelloCore, interest_name: &Name) -> Bytes {
228 let (prefix_announcement, hello_interval_base) = {
232 let cfg = core.config.read().unwrap();
233 (cfg.prefix_announcement.clone(), cfg.hello_interval_base)
234 };
235 let mut payload = crate::HelloPayload::new(core.node_name.clone());
236 if prefix_announcement == crate::config::PrefixAnnouncementMode::InHello {
237 payload.served_prefixes = core.served_prefixes.lock().unwrap().clone();
238 }
239 {
240 let st = core.state.lock().unwrap();
241 if !st.recent_diffs.is_empty() {
242 payload.neighbor_diffs.push(crate::NeighborDiff {
243 entries: st.recent_diffs.iter().cloned().collect(),
244 });
245 }
246 }
247 payload.public_key = self.signer.public_key();
248 payload.unicast_port = self.unicast_port;
249 let content = payload.encode();
250
251 let freshness_ms = hello_interval_base.as_millis().min(u32::MAX as u128) as u64 * 2;
252
253 let signer = &self.signer;
254 DataBuilder::new(interest_name.clone(), &content)
255 .freshness(Duration::from_millis(freshness_ms))
256 .sign_sync(signer.sig_type(), signer.cert_name(), |region| {
257 signer.sign_sync(region).unwrap_or_default()
258 })
259 }
260
261 fn handle_hello_interest(
262 &self,
263 raw: &Bytes,
264 _incoming_face: FaceId,
265 meta: &InboundMeta,
266 core: &HelloCore,
267 ctx: &dyn DiscoveryContext,
268 ) -> bool {
269 use crate::hello::medium::HELLO_PREFIX_DEPTH;
270 let parsed = match parse_raw_interest(raw) {
271 Some(p) => p,
272 None => return false,
273 };
274 let name = &parsed.name;
275 if !name.has_prefix(&core.hello_prefix) {
276 return false;
277 }
278 if name.components().len() != HELLO_PREFIX_DEPTH + 1 {
279 return false;
280 }
281 let sender_addr = match &meta.source {
282 Some(LinkAddr::Udp(addr)) => *addr,
283 _ => {
284 debug!("UdpND: hello Interest has no source addr");
285 return true;
286 }
287 };
288 let reply = self.build_hello_data(core, name);
289 for &fid in &self.multicast_face_ids {
290 ctx.send_on(fid, reply.clone());
291 }
292 debug!("UdpND: hello Interest from {sender_addr}, sent reply");
293 true
294 }
295
296 fn verify_and_ensure_peer(
297 &self,
298 raw: &Bytes,
299 payload: &HelloPayload,
300 meta: &InboundMeta,
301 core: &HelloCore,
302 ctx: &dyn DiscoveryContext,
303 ) -> Option<(Name, Option<FaceId>)> {
304 if let Some(ref peer_pk) = payload.public_key {
306 if let Ok(data_pkt) = ndn_packet::Data::decode(raw.clone()) {
307 let region = data_pkt.signed_region();
308 let sig_val = data_pkt.sig_value();
309 let verifier = Ed25519Verifier;
310 let outcome = verifier.verify_sync(region, sig_val, peer_pk);
311 if outcome != VerifyOutcome::Valid {
312 warn!(
313 name = %payload.node_name,
314 "UdpND: hello Data signature invalid, discarding"
315 );
316 return None;
317 }
318 } else {
319 warn!("UdpND: hello Data has public_key but failed full decode; discarding");
320 return None;
321 }
322 }
323
324 let responder_name = payload.node_name.clone();
326 if responder_name == core.node_name {
327 let our_pk = self.signer.public_key();
328 match (our_pk, payload.public_key.as_ref()) {
329 (Some(ref ours), Some(ref theirs)) if ours == theirs => {
330 debug!(
331 name = %responder_name,
332 "UdpND: hello echo (own packet), discarding"
333 );
334 }
335 _ => {
336 error!(
337 name = %responder_name,
338 "UdpND: DUPLICATE NODE NAME detected — another node is using our name!"
339 );
340 }
341 }
342 return None;
343 }
344
345 let responder_addr = match &meta.source {
346 Some(LinkAddr::Udp(addr)) => *addr,
347 _ => {
348 debug!("UdpND: hello Data no source addr");
349 return None;
350 }
351 };
352 let unicast_addr = match payload.unicast_port {
353 Some(port) => std::net::SocketAddr::new(responder_addr.ip(), port),
354 None => responder_addr,
355 };
356 let peer_face_id = self.ensure_peer(ctx, core, &responder_name, unicast_addr);
357
358 debug!(
359 peer = %responder_name, addr = %responder_addr,
360 "UdpND: hello response accepted"
361 );
362
363 Some((responder_name, peer_face_id))
364 }
365
366 fn send_multicast(&self, ctx: &dyn DiscoveryContext, pkt: Bytes) {
367 for &fid in &self.multicast_face_ids {
368 ctx.send_on(fid, pkt.clone());
369 }
370 }
371
372 fn is_multicast_face(&self, face_id: FaceId) -> bool {
373 self.multicast_face_ids.contains(&face_id)
374 }
375
376 fn on_face_down(&self, face_id: FaceId, _state: &mut HelloState, _ctx: &dyn DiscoveryContext) {
377 let mut pf = self.peer_faces.lock().unwrap();
378 let removed = pf.iter().filter(|(_, fid)| **fid == face_id).count();
379 pf.retain(|_, fid| *fid != face_id);
380 if removed > 0 {
381 debug!(
382 face = ?face_id, peers_removed = removed,
383 "UdpND: face down, removed peer bindings"
384 );
385 } else {
386 debug!(face = ?face_id, "UdpND: face down");
387 }
388 }
389
390 fn on_peer_removed(&self, entry: &NeighborEntry, _state: &mut HelloState) {
391 let mut pf = self.peer_faces.lock().unwrap();
392 pf.retain(|_, fid| !entry.faces.iter().any(|(f, _, _)| f == fid));
393 }
394}
395
396#[cfg(test)]
399mod tests {
400 use std::time::Duration;
401
402 use super::*;
403 use std::str::FromStr;
404
405 use crate::wire::parse_raw_data;
406
407 fn make_nd() -> UdpNeighborDiscovery {
408 UdpNeighborDiscovery::new(FaceId(1), Name::from_str("/ndn/test/node").unwrap())
409 }
410
411 #[test]
412 fn hello_interest_format() {
413 let nd = make_nd();
414 let nonce: u32 = 0xCAFE_BABE;
415 let pkt = nd.build_hello_interest(nonce);
416 let parsed = parse_raw_interest(&pkt).unwrap();
417 let comps = parsed.name.components();
418 assert_eq!(comps.len(), crate::hello::medium::HELLO_PREFIX_DEPTH + 1);
419 let decoded_nonce = u32::from_be_bytes(
420 comps[crate::hello::medium::HELLO_PREFIX_DEPTH].value[..4]
421 .try_into()
422 .unwrap(),
423 );
424 assert_eq!(decoded_nonce, nonce);
425 assert!(parsed.app_params.is_none());
426 }
427
428 #[test]
429 fn hello_data_freshness_period_is_nonzero() {
430 use ndn_packet::tlv_type;
431 use ndn_tlv::TlvReader;
432 let nd = make_nd();
433 let interest_name = Name::from_str("/ndn/local/nd/hello/CAFEBABE").unwrap();
434 let pkt = nd.medium.build_hello_data(&nd.core, &interest_name);
435 let mut r = TlvReader::new(pkt.clone());
436 let (_, data_val) = r.read_tlv().unwrap();
437 let mut inner = TlvReader::new(data_val);
438 let mut found_fp = false;
439 while !inner.is_empty() {
440 let (t, v) = inner.read_tlv().unwrap();
441 if t == tlv_type::META_INFO {
442 let mut meta_r = TlvReader::new(v);
443 while !meta_r.is_empty() {
444 let (mt, mv) = meta_r.read_tlv().unwrap();
445 if mt == tlv_type::FRESHNESS_PERIOD {
446 let mut val: u64 = 0;
447 for b in mv.iter() {
448 val = (val << 8) | (*b as u64);
449 }
450 assert!(val > 0, "FreshnessPeriod should be > 0, got {val}");
451 found_fp = true;
452 }
453 }
454 }
455 }
456 assert!(found_fp, "FreshnessPeriod TLV not found in MetaInfo");
457 }
458
459 #[test]
460 fn hello_data_carries_hello_payload() {
461 let nd = make_nd();
462 let interest_name = Name::from_str("/ndn/local/nd/hello/CAFEBABE").unwrap();
463 let pkt = nd.medium.build_hello_data(&nd.core, &interest_name);
464 let parsed = parse_raw_data(&pkt).unwrap();
465 assert_eq!(parsed.name, interest_name);
466 let payload = HelloPayload::decode(&parsed.content.unwrap()).unwrap();
467 assert_eq!(payload.node_name, nd.core.node_name);
468 }
469
470 #[test]
471 fn in_hello_served_prefixes_encoded() {
472 let nd = make_nd();
473 nd.set_served_prefixes(vec![Name::from_str("/ndn/edu/test").unwrap()]);
474 let interest_name = Name::from_str("/ndn/local/nd/hello/1").unwrap();
475 let pkt = nd.medium.build_hello_data(&nd.core, &interest_name);
476 let parsed = parse_raw_data(&pkt).unwrap();
477 let payload = HelloPayload::decode(&parsed.content.unwrap()).unwrap();
478 assert_eq!(payload.served_prefixes.len(), 1);
479 }
480
481 #[test]
482 fn neighbor_diffs_piggybacked() {
483 let nd = make_nd();
484 {
485 let mut st = nd.core.state.lock().unwrap();
486 st.recent_diffs.push_back(crate::DiffEntry::Add(
487 Name::from_str("/ndn/peer/alpha").unwrap(),
488 ));
489 }
490 let interest_name = Name::from_str("/ndn/local/nd/hello/1").unwrap();
491 let pkt = nd.medium.build_hello_data(&nd.core, &interest_name);
492 let parsed = parse_raw_data(&pkt).unwrap();
493 let payload = HelloPayload::decode(&parsed.content.unwrap()).unwrap();
494 assert_eq!(payload.neighbor_diffs.len(), 1);
495 }
496
497 #[test]
498 fn swim_probes_added_to_claimed_when_enabled() {
499 let mut cfg = DiscoveryConfig::for_profile(&DiscoveryProfile::Campus);
500 cfg.swim_indirect_fanout = 3;
501 let nd = UdpNeighborDiscovery::new_with_config(
502 FaceId(1),
503 Name::from_str("/ndn/test/node").unwrap(),
504 cfg,
505 );
506 let has_probe_direct = nd
507 .core
508 .claimed
509 .iter()
510 .any(|p| p == crate::scope::probe_direct());
511 let has_probe_via = nd
512 .core
513 .claimed
514 .iter()
515 .any(|p| p == crate::scope::probe_via());
516 assert!(
517 has_probe_direct,
518 "probe/direct should be claimed when SWIM enabled"
519 );
520 assert!(
521 has_probe_via,
522 "probe/via should be claimed when SWIM enabled"
523 );
524 }
525
526 #[test]
527 fn lp_unwrap_strips_framing() {
528 let raw = Bytes::from_static(b"\x05\x03ndn");
529 let wrapped = ndn_packet::lp::encode_lp_packet(&raw);
530 let unwrapped = crate::wire::unwrap_lp(&wrapped).unwrap();
531 assert_eq!(unwrapped, raw);
532 }
533
534 #[test]
535 fn protocol_id_and_prefix() {
536 let nd = make_nd();
537 assert_eq!(nd.medium.protocol_id(), PROTOCOL);
538 assert!(
539 nd.core
540 .claimed
541 .iter()
542 .any(|p| p == &Name::from_str(crate::hello::medium::HELLO_PREFIX_STR).unwrap())
543 );
544 }
545
546 #[test]
547 fn tick_interval_from_config() {
548 let nd = make_nd();
549 assert_eq!(
550 nd.core.config.read().unwrap().tick_interval,
551 Duration::from_millis(500)
552 );
553 }
554
555 #[test]
556 fn on_face_down_removes_peer_entry() {
557 let nd = make_nd();
558 {
559 nd.medium
560 .peer_faces
561 .lock()
562 .unwrap()
563 .insert("10.0.0.1:6363".parse().unwrap(), FaceId(5));
564 }
565 struct NullCtx;
566 impl crate::DiscoveryContext for NullCtx {
567 fn alloc_face_id(&self) -> FaceId {
568 FaceId(0)
569 }
570 fn add_face(&self, _: std::sync::Arc<dyn ndn_transport::ErasedFace>) -> FaceId {
571 FaceId(0)
572 }
573 fn remove_face(&self, _: FaceId) {}
574 fn add_fib_entry(&self, _: &Name, _: FaceId, _: u32, _: ProtocolId) {}
575 fn remove_fib_entry(&self, _: &Name, _: FaceId, _: ProtocolId) {}
576 fn remove_fib_entries_by_owner(&self, _: ProtocolId) {}
577 fn neighbors(&self) -> std::sync::Arc<dyn crate::NeighborTableView> {
578 crate::NeighborTable::new()
579 }
580 fn update_neighbor(&self, _: crate::NeighborUpdate) {}
581 fn send_on(&self, _: FaceId, _: Bytes) {}
582 fn now(&self) -> std::time::Instant {
583 std::time::Instant::now()
584 }
585 }
586 crate::DiscoveryProtocol::on_face_down(&nd, FaceId(5), &NullCtx);
587 assert!(nd.medium.peer_faces.lock().unwrap().is_empty());
588 }
589
590 #[test]
591 fn from_profile_sets_config() {
592 let nd = UdpNeighborDiscovery::from_profile(
593 FaceId(1),
594 Name::from_str("/ndn/test/node").unwrap(),
595 &DiscoveryProfile::Mobile,
596 );
597 assert!(nd.core.config.read().unwrap().hello_interval_base < Duration::from_secs(1));
598 }
599
600 #[test]
601 fn swim_diff_add_creates_probing_neighbor() {
602 use crate::{NeighborState, NeighborTable, NeighborUpdate};
603 use std::sync::Arc;
604
605 struct TrackCtx {
606 neighbors: Arc<NeighborTable>,
607 }
608 impl crate::DiscoveryContext for TrackCtx {
609 fn alloc_face_id(&self) -> FaceId {
610 FaceId(0)
611 }
612 fn add_face(&self, _: Arc<dyn ndn_transport::ErasedFace>) -> FaceId {
613 FaceId(0)
614 }
615 fn remove_face(&self, _: FaceId) {}
616 fn add_fib_entry(&self, _: &Name, _: FaceId, _: u32, _: ProtocolId) {}
617 fn remove_fib_entry(&self, _: &Name, _: FaceId, _: ProtocolId) {}
618 fn remove_fib_entries_by_owner(&self, _: ProtocolId) {}
619 fn neighbors(&self) -> Arc<dyn crate::NeighborTableView> {
620 Arc::clone(&self.neighbors) as Arc<dyn crate::NeighborTableView>
621 }
622 fn update_neighbor(&self, u: NeighborUpdate) {
623 self.neighbors.apply(u);
624 }
625 fn send_on(&self, _: FaceId, _: Bytes) {}
626 fn now(&self) -> std::time::Instant {
627 std::time::Instant::now()
628 }
629 }
630
631 let _nd = make_nd();
632 let ctx = TrackCtx {
633 neighbors: NeighborTable::new(),
634 };
635
636 let peer_name = Name::from_str("/ndn/peer/unknown").unwrap();
637 let mut payload = crate::HelloPayload::new(Name::from_str("/ndn/test/sender").unwrap());
638 payload.neighbor_diffs.push(crate::NeighborDiff {
639 entries: vec![crate::DiffEntry::Add(peer_name.clone())],
640 });
641
642 let _interest_name = Name::from_str("/ndn/local/nd/hello/1").unwrap();
653 use crate::{NeighborEntry, NeighborState as NS};
656 use std::time::Instant;
657
658 for diff in &payload.neighbor_diffs {
659 for entry in &diff.entries {
660 if let crate::DiffEntry::Add(name) = entry
661 && ctx.neighbors.get(name).is_none()
662 {
663 ctx.neighbors.apply(NeighborUpdate::Upsert(NeighborEntry {
664 node_name: name.clone(),
665 state: NS::Probing {
666 attempts: 0,
667 last_probe: Instant::now(),
668 },
669 faces: Vec::new(),
670 rtt_us: None,
671 pending_nonce: None,
672 }));
673 }
674 }
675 }
676
677 let entry = ctx
678 .neighbors
679 .get(&peer_name)
680 .expect("neighbor should be created");
681 assert!(
682 matches!(entry.state, NeighborState::Probing { .. }),
683 "expected Probing state, got {:?}",
684 entry.state
685 );
686 }
687}