1use std::sync::atomic::Ordering;
11use std::time::{Duration, Instant};
12
13use bytes::Bytes;
14use ndn_packet::{Name, tlv_type};
15use ndn_tlv::TlvWriter;
16use ndn_transport::FaceId;
17use tracing::{debug, info, trace};
18
19use super::medium::{HELLO_PREFIX_DEPTH, HelloCore, LinkMedium, MAX_DIFF_ENTRIES};
20use super::probe::{
21 build_direct_probe, build_indirect_probe, build_probe_ack, is_probe_ack, parse_direct_probe,
22 parse_indirect_probe,
23};
24use crate::config::PrefixAnnouncementMode;
25use crate::scope::{probe_direct, probe_via};
26use crate::strategy::{ProbeRequest, TriggerEvent};
27use crate::wire::{parse_raw_data, parse_raw_interest, write_nni};
28use crate::{
29 DiffEntry, DiscoveryContext, DiscoveryProtocol, HelloPayload, InboundMeta, NeighborDiff,
30 NeighborEntry, NeighborState, NeighborUpdate, ProtocolId,
31};
32
33pub struct HelloProtocol<T: LinkMedium> {
44 pub core: HelloCore,
45 pub medium: T,
46}
47
48impl<T: LinkMedium> HelloProtocol<T> {
49 pub fn create(medium: T, node_name: Name, config: crate::config::DiscoveryConfig) -> Self {
51 let core = HelloCore::new(node_name, config);
52 Self { core, medium }
53 }
54
55 pub fn core(&self) -> &HelloCore {
57 &self.core
58 }
59
60 pub fn medium(&self) -> &T {
62 &self.medium
63 }
64
65 pub fn set_served_prefixes(&self, prefixes: Vec<Name>) {
67 *self.core.served_prefixes.lock().unwrap() = prefixes;
68 }
69
70 pub fn build_hello_interest(&self, nonce: u32) -> Bytes {
73 let hello_interval_base = self.core.config.read().unwrap().hello_interval_base;
74 let mut w = TlvWriter::new();
75 w.write_nested(tlv_type::INTEREST, |w: &mut TlvWriter| {
76 w.write_nested(tlv_type::NAME, |w: &mut TlvWriter| {
77 for comp in self.core.hello_prefix.components() {
78 w.write_tlv(comp.typ, &comp.value);
79 }
80 w.write_tlv(tlv_type::NAME_COMPONENT, &nonce.to_be_bytes());
81 });
82 w.write_tlv(tlv_type::NONCE, &nonce.to_be_bytes());
83 let lifetime_ms = hello_interval_base.as_millis().min(u32::MAX as u128) as u64 * 2;
84 write_nni(w, tlv_type::INTEREST_LIFETIME, lifetime_ms);
85 });
86 w.finish()
87 }
88
89 pub fn build_hello_payload(&self) -> HelloPayload {
94 let mut payload = HelloPayload::new(self.core.node_name.clone());
95 if self.core.config.read().unwrap().prefix_announcement == PrefixAnnouncementMode::InHello {
96 payload.served_prefixes = self.core.served_prefixes.lock().unwrap().clone();
97 }
98 {
99 let st = self.core.state.lock().unwrap();
100 if !st.recent_diffs.is_empty() {
101 payload.neighbor_diffs.push(NeighborDiff {
102 entries: st.recent_diffs.iter().cloned().collect(),
103 });
104 }
105 }
106 payload
107 }
108
109 fn handle_hello_data(
112 &self,
113 raw: &Bytes,
114 _incoming_face: FaceId,
115 meta: &InboundMeta,
116 ctx: &dyn DiscoveryContext,
117 ) -> bool {
118 let parsed = match parse_raw_data(raw) {
119 Some(d) => d,
120 None => return false,
121 };
122 let name = &parsed.name;
123 if !name.has_prefix(&self.core.hello_prefix) {
124 return false;
125 }
126 if name.components().len() != HELLO_PREFIX_DEPTH + 1 {
127 return false;
128 }
129
130 let nonce_comp = &name.components()[HELLO_PREFIX_DEPTH];
131 if nonce_comp.value.len() != 4 {
132 return false;
133 }
134 let nonce = u32::from_be_bytes(nonce_comp.value[..4].try_into().unwrap());
135 let send_time = {
136 let mut st = self.core.state.lock().unwrap();
137 st.pending_probes.remove(&nonce)
138 };
139
140 let content = match parsed.content {
141 Some(c) => c,
142 None => {
143 debug!("{}: hello Data no content", self.medium.protocol_id());
144 return true;
145 }
146 };
147 let payload = match HelloPayload::decode(&content) {
148 Some(p) => p,
149 None => {
150 debug!("{}: HelloPayload decode failed", self.medium.protocol_id());
151 return true;
152 }
153 };
154
155 let (responder_name, peer_face_id) = match self
157 .medium
158 .verify_and_ensure_peer(raw, &payload, meta, &self.core, ctx)
159 {
160 Some(result) => result,
161 None => return true,
162 };
163
164 ctx.update_neighbor(NeighborUpdate::SetState {
166 name: responder_name.clone(),
167 state: NeighborState::Established {
168 last_seen: Instant::now(),
169 },
170 });
171
172 if let Some(sent) = send_time {
174 let rtt = sent.elapsed();
175 let rtt_us = rtt.as_micros().min(u32::MAX as u128) as u32;
176 ctx.update_neighbor(NeighborUpdate::UpdateRtt {
177 name: responder_name.clone(),
178 rtt_us,
179 });
180 self.core.strategy.lock().unwrap().on_probe_success(rtt);
181 }
182
183 if self.core.config.read().unwrap().prefix_announcement == PrefixAnnouncementMode::InHello
185 && let Some(face_id) = peer_face_id
186 {
187 for prefix in &payload.served_prefixes {
188 ctx.add_fib_entry(prefix, face_id, 10, self.medium.protocol_id());
189 debug!(
190 "{}: auto-FIB {prefix:?} via {face_id:?}",
191 self.medium.protocol_id()
192 );
193 }
194 }
195
196 self.apply_neighbor_diffs(&payload, ctx);
198
199 {
201 let mut st = self.core.state.lock().unwrap();
202 st.recent_diffs.push_back(DiffEntry::Add(responder_name));
203 while st.recent_diffs.len() > MAX_DIFF_ENTRIES {
204 st.recent_diffs.pop_front();
205 }
206 }
207
208 true
209 }
210
211 fn handle_direct_probe_interest(
212 &self,
213 raw: &Bytes,
214 incoming_face: FaceId,
215 ctx: &dyn DiscoveryContext,
216 ) -> bool {
217 let probe = match parse_direct_probe(raw) {
218 Some(p) => p,
219 None => return false,
220 };
221 if probe.target == self.core.node_name
222 && let Some(parsed) = parse_raw_interest(raw)
223 {
224 let ack = build_probe_ack(&parsed.name);
225 ctx.send_on(incoming_face, ack);
226 debug!(
227 "{}: probe ACK sent (direct, nonce={:#010x})",
228 self.medium.protocol_id(),
229 probe.nonce
230 );
231 }
232 true
233 }
234
235 fn handle_via_probe_interest(
236 &self,
237 raw: &Bytes,
238 incoming_face: FaceId,
239 ctx: &dyn DiscoveryContext,
240 ) -> bool {
241 let probe = match parse_indirect_probe(raw) {
242 Some(p) => p,
243 None => return false,
244 };
245 if probe.intermediary != self.core.node_name {
246 return false;
247 }
248 if let Some(entry) = ctx.neighbors().get(&probe.target)
249 && let Some((face_id, _, _)) = entry.faces.first()
250 {
251 let relay_nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
252 let direct_pkt = build_direct_probe(&probe.target, relay_nonce);
253 ctx.send_on(*face_id, direct_pkt);
254 if let Some(parsed) = parse_raw_interest(raw) {
255 let mut st = self.core.state.lock().unwrap();
256 st.relay_probes
257 .insert(relay_nonce, (incoming_face, parsed.name.clone()));
258 }
259 debug!(
260 "{}: relaying via-probe to {:?}",
261 self.medium.protocol_id(),
262 probe.target
263 );
264 return true;
265 }
266 debug!(
267 "{}: via-probe target {:?} unknown, dropping",
268 self.medium.protocol_id(),
269 probe.target
270 );
271 true
272 }
273
274 fn handle_probe_ack(
275 &self,
276 raw: &Bytes,
277 _incoming_face: FaceId,
278 ctx: &dyn DiscoveryContext,
279 ) -> Option<bool> {
280 let parsed = parse_raw_data(raw)?;
281 let name = &parsed.name;
282 let comps = name.components();
283 let last = comps.last()?;
284 if last.value.len() != 4 {
285 return Some(false);
286 }
287 let nonce = u32::from_be_bytes(last.value[..4].try_into().ok()?);
288
289 let relay = {
290 let mut st = self.core.state.lock().unwrap();
291 st.relay_probes.remove(&nonce)
292 };
293 if let Some((origin_face, original_name)) = relay {
294 let ack = build_probe_ack(&original_name);
295 ctx.send_on(origin_face, ack);
296 debug!(
297 "{}: relayed probe ACK for nonce={nonce:#010x}",
298 self.medium.protocol_id()
299 );
300 }
301
302 let swim = {
303 let mut st = self.core.state.lock().unwrap();
304 st.swim_probes.remove(&nonce)
305 };
306 if let Some((sent, _target)) = swim {
307 let rtt = sent.elapsed();
308 self.core.strategy.lock().unwrap().on_probe_success(rtt);
309 debug!(
310 "{}: SWIM direct probe ACK nonce={nonce:#010x} rtt={rtt:?}",
311 self.medium.protocol_id()
312 );
313 }
314 Some(true)
315 }
316
317 fn apply_neighbor_diffs(&self, payload: &HelloPayload, ctx: &dyn DiscoveryContext) {
318 let mut should_broadcast = false;
319
320 for diff in &payload.neighbor_diffs {
321 for entry in &diff.entries {
322 match entry {
323 DiffEntry::Add(name) => {
324 if ctx.neighbors().get(name).is_none() {
325 ctx.update_neighbor(NeighborUpdate::Upsert(NeighborEntry {
326 node_name: name.clone(),
327 state: NeighborState::Probing {
328 attempts: 0,
329 last_probe: Instant::now(),
330 },
331 faces: Vec::new(),
332 rtt_us: None,
333 pending_nonce: None,
334 }));
335 should_broadcast = true;
336 debug!(
337 "{}: SWIM diff — new peer {name:?} in Probing",
338 self.medium.protocol_id()
339 );
340 }
341 }
342 DiffEntry::Remove(name) => {
343 if ctx.neighbors().get(name).is_some() {
344 ctx.update_neighbor(NeighborUpdate::SetState {
345 name: name.clone(),
346 state: NeighborState::Stale {
347 miss_count: 1,
348 last_seen: Instant::now(),
349 },
350 });
351 }
352 }
353 }
354 }
355 }
356
357 if should_broadcast {
358 self.core
359 .strategy
360 .lock()
361 .unwrap()
362 .trigger(TriggerEvent::ForwardingFailure);
363 }
364 }
365}
366
367impl<T: LinkMedium> DiscoveryProtocol for HelloProtocol<T> {
370 fn protocol_id(&self) -> ProtocolId {
371 self.medium.protocol_id()
372 }
373
374 fn claimed_prefixes(&self) -> &[Name] {
375 &self.core.claimed
376 }
377
378 fn tick_interval(&self) -> Duration {
379 self.core.config.read().unwrap().tick_interval
380 }
381
382 fn on_face_up(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
383 if self.medium.is_multicast_face(face_id) {
384 let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
385 {
386 let mut st = self.core.state.lock().unwrap();
387 st.pending_probes.insert(nonce, Instant::now());
388 }
389 let hello = self.build_hello_interest(nonce);
390 self.medium.send_multicast(ctx, hello);
391 self.core
392 .strategy
393 .lock()
394 .unwrap()
395 .trigger(TriggerEvent::FaceUp);
396 debug!(
397 "{}: sent initial hello on face {face_id:?}",
398 self.medium.protocol_id()
399 );
400 }
401 }
402
403 fn on_face_down(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
404 let mut st = self.core.state.lock().unwrap();
405 self.medium.on_face_down(face_id, &mut st, ctx);
406 }
407
408 fn on_inbound(
409 &self,
410 raw: &Bytes,
411 incoming_face: FaceId,
412 meta: &InboundMeta,
413 ctx: &dyn DiscoveryContext,
414 ) -> bool {
415 let swim_fanout = self.core.config.read().unwrap().swim_indirect_fanout;
416 match raw.first() {
417 Some(&0x05) => {
418 if swim_fanout > 0
419 && let Some(parsed) = parse_raw_interest(raw)
420 {
421 if parsed.name.has_prefix(probe_via()) {
422 return self.handle_via_probe_interest(raw, incoming_face, ctx);
423 }
424 if parsed.name.has_prefix(probe_direct()) {
425 return self.handle_direct_probe_interest(raw, incoming_face, ctx);
426 }
427 }
428 self.medium
429 .handle_hello_interest(raw, incoming_face, meta, &self.core, ctx)
430 }
431 Some(&0x06) => {
432 if swim_fanout > 0 && is_probe_ack(raw) {
433 return self
434 .handle_probe_ack(raw, incoming_face, ctx)
435 .unwrap_or(false);
436 }
437 self.handle_hello_data(raw, incoming_face, meta, ctx)
438 }
439 _ => false,
440 }
441 }
442
443 fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
444 let protocol = self.medium.protocol_id();
445
446 let (liveness_timeout, miss_limit, gossip_k, swim_k, probe_timeout) = {
448 let cfg = self.core.config.read().unwrap();
449 (
450 cfg.liveness_timeout,
451 cfg.liveness_miss_count,
452 cfg.gossip_fanout as usize,
453 cfg.swim_indirect_fanout as usize,
454 cfg.probe_timeout,
455 )
456 };
457
458 let probes = { self.core.strategy.lock().unwrap().on_tick(now) };
460 for probe in probes {
461 match probe {
462 ProbeRequest::Broadcast => {
463 let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
464 let hello = self.build_hello_interest(nonce);
465 self.medium.send_multicast(ctx, hello);
466 self.core
467 .state
468 .lock()
469 .unwrap()
470 .pending_probes
471 .insert(nonce, now);
472 debug!("{protocol}: broadcast hello (nonce={nonce:#010x})");
473 }
474 ProbeRequest::Unicast(face_id) => {
475 let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
476 ctx.send_on(face_id, self.build_hello_interest(nonce));
477 self.core
478 .state
479 .lock()
480 .unwrap()
481 .pending_probes
482 .insert(nonce, now);
483 debug!("{protocol}: unicast hello on {face_id:?} (nonce={nonce:#010x})");
484 }
485 }
486 }
487
488 let all = ctx.neighbors().all();
490 for entry in &all {
491 match &entry.state {
492 NeighborState::Established { last_seen } => {
493 if now.duration_since(*last_seen) > liveness_timeout {
494 ctx.update_neighbor(NeighborUpdate::SetState {
495 name: entry.node_name.clone(),
496 state: NeighborState::Stale {
497 miss_count: 1,
498 last_seen: *last_seen,
499 },
500 });
501 self.core
502 .strategy
503 .lock()
504 .unwrap()
505 .trigger(TriggerEvent::NeighborStale);
506 if let Some((face_id, _, _)) = entry.faces.first() {
508 let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
509 ctx.send_on(*face_id, self.build_hello_interest(nonce));
510 self.core
511 .state
512 .lock()
513 .unwrap()
514 .pending_probes
515 .insert(nonce, now);
516 }
517 if gossip_k > 0 {
519 let stale_name = &entry.node_name;
520 let peers: Vec<FaceId> = all
521 .iter()
522 .filter(|e| e.is_reachable() && &e.node_name != stale_name)
523 .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid))
524 .take(gossip_k)
525 .collect();
526 for face_id in peers {
527 let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
528 ctx.send_on(face_id, self.build_hello_interest(nonce));
529 self.core
530 .state
531 .lock()
532 .unwrap()
533 .pending_probes
534 .insert(nonce, now);
535 }
536 }
537 }
538 }
539 NeighborState::Stale {
540 miss_count,
541 last_seen,
542 } => {
543 if u32::from(*miss_count) >= miss_limit {
544 info!(
545 peer = %entry.node_name, miss_count,
546 "{protocol}: peer unreachable, removing"
547 );
548 {
550 let mut st = self.core.state.lock().unwrap();
551 self.medium.on_peer_removed(entry, &mut st);
552 st.recent_diffs
553 .push_back(DiffEntry::Remove(entry.node_name.clone()));
554 while st.recent_diffs.len() > MAX_DIFF_ENTRIES {
555 st.recent_diffs.pop_front();
556 }
557 }
558 for (face_id, _, _) in &entry.faces {
559 ctx.remove_fib_entry(&entry.node_name, *face_id, protocol);
560 ctx.remove_face(*face_id);
561 }
562 ctx.update_neighbor(NeighborUpdate::Remove(entry.node_name.clone()));
563 } else if now.duration_since(*last_seen) > liveness_timeout {
564 ctx.update_neighbor(NeighborUpdate::SetState {
565 name: entry.node_name.clone(),
566 state: NeighborState::Stale {
567 miss_count: miss_count + 1,
568 last_seen: now,
569 },
570 });
571 }
572 }
573 _ => {}
574 }
575 }
576
577 if swim_k > 0 {
579 for entry in all.iter().filter(|e| e.is_reachable()) {
580 if let Some((face_id, _, _)) = entry.faces.first() {
581 let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
582 trace!(
583 peer = %entry.node_name, face = ?face_id, nonce,
584 "{protocol}: SWIM direct probe →"
585 );
586 ctx.send_on(*face_id, build_direct_probe(&entry.node_name, nonce));
587 self.core
588 .state
589 .lock()
590 .unwrap()
591 .swim_probes
592 .insert(nonce, (now, entry.node_name.clone()));
593 }
594 }
595 }
596
597 let mut timed_out = 0u32;
599 {
600 let mut st = self.core.state.lock().unwrap();
601 st.pending_probes.retain(|_, sent| {
602 if now.duration_since(*sent) >= probe_timeout {
603 timed_out += 1;
604 false
605 } else {
606 true
607 }
608 });
609 }
610 if timed_out > 0 {
611 let mut strategy = self.core.strategy.lock().unwrap();
612 for _ in 0..timed_out {
613 strategy.on_probe_timeout();
614 }
615 }
616
617 if swim_k > 0 {
619 let k = swim_k;
620 let mut timed_out_swim: Vec<Name> = Vec::new();
621 {
622 let mut st = self.core.state.lock().unwrap();
623 st.swim_probes.retain(|_, (sent, target)| {
624 if now.duration_since(*sent) >= probe_timeout {
625 timed_out_swim.push(target.clone());
626 false
627 } else {
628 true
629 }
630 });
631 }
632 for target in timed_out_swim {
633 let intermediaries: Vec<_> = ctx
634 .neighbors()
635 .all()
636 .into_iter()
637 .filter(|e| e.is_reachable() && e.node_name != target)
638 .take(k)
639 .collect();
640 debug!(
641 peer = %target, via_count = intermediaries.len(),
642 "{protocol}: SWIM direct probe timed out, dispatching indirect probes"
643 );
644 for via in intermediaries {
645 let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
646 if let Some((face_id, _, _)) = via.faces.first() {
647 ctx.send_on(
648 *face_id,
649 build_indirect_probe(&via.node_name, &target, nonce),
650 );
651 }
652 }
653 self.core.strategy.lock().unwrap().on_probe_timeout();
654 }
655 }
656 }
657}