ndn_discovery/hello/
protocol.rs

1//! `HelloProtocol<T>` — generic SWIM/hello/probe discovery state machine.
2//!
3//! Implements the shared logic for NDN neighbor discovery: hello
4//! Interest/Data exchange, SWIM direct and indirect probes, gossip diff
5//! piggyback, and the Established→Stale→Absent neighbor lifecycle.
6//!
7//! Link-specific operations (address extraction, face creation, packet
8//! signing) are delegated to a [`LinkMedium`] implementation.
9
10use std::sync::atomic::Ordering;
11use std::time::{Duration, Instant};
12
13use bytes::Bytes;
14use ndn_packet::{Name, tlv_type};
15use ndn_tlv::TlvWriter;
16use ndn_transport::FaceId;
17use tracing::{debug, info, trace};
18
19use super::medium::{HELLO_PREFIX_DEPTH, HelloCore, LinkMedium, MAX_DIFF_ENTRIES};
20use super::probe::{
21    build_direct_probe, build_indirect_probe, build_probe_ack, is_probe_ack, parse_direct_probe,
22    parse_indirect_probe,
23};
24use crate::config::PrefixAnnouncementMode;
25use crate::scope::{probe_direct, probe_via};
26use crate::strategy::{ProbeRequest, TriggerEvent};
27use crate::wire::{parse_raw_data, parse_raw_interest, write_nni};
28use crate::{
29    DiffEntry, DiscoveryContext, DiscoveryProtocol, HelloPayload, InboundMeta, NeighborDiff,
30    NeighborEntry, NeighborState, NeighborUpdate, ProtocolId,
31};
32
33/// Generic neighbor discovery protocol over any [`LinkMedium`].
34///
35/// Contains the shared SWIM/hello/probe state machine and delegates to `T`
36/// for link-specific operations.  Concrete types are typically exposed via
37/// type aliases:
38///
39/// ```text
40/// pub type UdpNeighborDiscovery = HelloProtocol<UdpMedium>;
41/// pub type EtherNeighborDiscovery = HelloProtocol<EtherMedium>;
42/// ```
43pub struct HelloProtocol<T: LinkMedium> {
44    pub core: HelloCore,
45    pub medium: T,
46}
47
48impl<T: LinkMedium> HelloProtocol<T> {
49    /// Create a new `HelloProtocol` with the given medium, node name, and config.
50    pub fn create(medium: T, node_name: Name, config: crate::config::DiscoveryConfig) -> Self {
51        let core = HelloCore::new(node_name, config);
52        Self { core, medium }
53    }
54
55    /// Access the shared core state.
56    pub fn core(&self) -> &HelloCore {
57        &self.core
58    }
59
60    /// Access the link medium.
61    pub fn medium(&self) -> &T {
62        &self.medium
63    }
64
65    /// Set the prefixes this node serves (announced in Hello Data when InHello mode).
66    pub fn set_served_prefixes(&self, prefixes: Vec<Name>) {
67        *self.core.served_prefixes.lock().unwrap() = prefixes;
68    }
69
70    // ── Shared packet builders ───────────────────────────────────────────────
71
72    pub fn build_hello_interest(&self, nonce: u32) -> Bytes {
73        let hello_interval_base = self.core.config.read().unwrap().hello_interval_base;
74        let mut w = TlvWriter::new();
75        w.write_nested(tlv_type::INTEREST, |w: &mut TlvWriter| {
76            w.write_nested(tlv_type::NAME, |w: &mut TlvWriter| {
77                for comp in self.core.hello_prefix.components() {
78                    w.write_tlv(comp.typ, &comp.value);
79                }
80                w.write_tlv(tlv_type::NAME_COMPONENT, &nonce.to_be_bytes());
81            });
82            w.write_tlv(tlv_type::NONCE, &nonce.to_be_bytes());
83            let lifetime_ms = hello_interval_base.as_millis().min(u32::MAX as u128) as u64 * 2;
84            write_nni(w, tlv_type::INTEREST_LIFETIME, lifetime_ms);
85        });
86        w.finish()
87    }
88
89    /// Build a `HelloPayload` from the current shared state.
90    ///
91    /// Called by `LinkMedium::build_hello_data` to get the payload content
92    /// before applying link-specific signing.
93    pub fn build_hello_payload(&self) -> HelloPayload {
94        let mut payload = HelloPayload::new(self.core.node_name.clone());
95        if self.core.config.read().unwrap().prefix_announcement == PrefixAnnouncementMode::InHello {
96            payload.served_prefixes = self.core.served_prefixes.lock().unwrap().clone();
97        }
98        {
99            let st = self.core.state.lock().unwrap();
100            if !st.recent_diffs.is_empty() {
101                payload.neighbor_diffs.push(NeighborDiff {
102                    entries: st.recent_diffs.iter().cloned().collect(),
103                });
104            }
105        }
106        payload
107    }
108
109    // ── Shared inbound handlers ──────────────────────────────────────────────
110
111    fn handle_hello_data(
112        &self,
113        raw: &Bytes,
114        _incoming_face: FaceId,
115        meta: &InboundMeta,
116        ctx: &dyn DiscoveryContext,
117    ) -> bool {
118        let parsed = match parse_raw_data(raw) {
119            Some(d) => d,
120            None => return false,
121        };
122        let name = &parsed.name;
123        if !name.has_prefix(&self.core.hello_prefix) {
124            return false;
125        }
126        if name.components().len() != HELLO_PREFIX_DEPTH + 1 {
127            return false;
128        }
129
130        let nonce_comp = &name.components()[HELLO_PREFIX_DEPTH];
131        if nonce_comp.value.len() != 4 {
132            return false;
133        }
134        let nonce = u32::from_be_bytes(nonce_comp.value[..4].try_into().unwrap());
135        let send_time = {
136            let mut st = self.core.state.lock().unwrap();
137            st.pending_probes.remove(&nonce)
138        };
139
140        let content = match parsed.content {
141            Some(c) => c,
142            None => {
143                debug!("{}: hello Data no content", self.medium.protocol_id());
144                return true;
145            }
146        };
147        let payload = match HelloPayload::decode(&content) {
148            Some(p) => p,
149            None => {
150                debug!("{}: HelloPayload decode failed", self.medium.protocol_id());
151                return true;
152            }
153        };
154
155        // Link-specific: verify signature, extract address, create face.
156        let (responder_name, peer_face_id) = match self
157            .medium
158            .verify_and_ensure_peer(raw, &payload, meta, &self.core, ctx)
159        {
160            Some(result) => result,
161            None => return true,
162        };
163
164        // Update neighbor to Established.
165        ctx.update_neighbor(NeighborUpdate::SetState {
166            name: responder_name.clone(),
167            state: NeighborState::Established {
168                last_seen: Instant::now(),
169            },
170        });
171
172        // Record RTT if we have a matching send time.
173        if let Some(sent) = send_time {
174            let rtt = sent.elapsed();
175            let rtt_us = rtt.as_micros().min(u32::MAX as u128) as u32;
176            ctx.update_neighbor(NeighborUpdate::UpdateRtt {
177                name: responder_name.clone(),
178                rtt_us,
179            });
180            self.core.strategy.lock().unwrap().on_probe_success(rtt);
181        }
182
183        // Auto-populate FIB with served prefixes (InHello mode).
184        if self.core.config.read().unwrap().prefix_announcement == PrefixAnnouncementMode::InHello
185            && let Some(face_id) = peer_face_id
186        {
187            for prefix in &payload.served_prefixes {
188                ctx.add_fib_entry(prefix, face_id, 10, self.medium.protocol_id());
189                debug!(
190                    "{}: auto-FIB {prefix:?} via {face_id:?}",
191                    self.medium.protocol_id()
192                );
193            }
194        }
195
196        // Apply piggybacked SWIM gossip diffs.
197        self.apply_neighbor_diffs(&payload, ctx);
198
199        // Record this neighbor for our own outbound diffs.
200        {
201            let mut st = self.core.state.lock().unwrap();
202            st.recent_diffs.push_back(DiffEntry::Add(responder_name));
203            while st.recent_diffs.len() > MAX_DIFF_ENTRIES {
204                st.recent_diffs.pop_front();
205            }
206        }
207
208        true
209    }
210
211    fn handle_direct_probe_interest(
212        &self,
213        raw: &Bytes,
214        incoming_face: FaceId,
215        ctx: &dyn DiscoveryContext,
216    ) -> bool {
217        let probe = match parse_direct_probe(raw) {
218            Some(p) => p,
219            None => return false,
220        };
221        if probe.target == self.core.node_name
222            && let Some(parsed) = parse_raw_interest(raw)
223        {
224            let ack = build_probe_ack(&parsed.name);
225            ctx.send_on(incoming_face, ack);
226            debug!(
227                "{}: probe ACK sent (direct, nonce={:#010x})",
228                self.medium.protocol_id(),
229                probe.nonce
230            );
231        }
232        true
233    }
234
235    fn handle_via_probe_interest(
236        &self,
237        raw: &Bytes,
238        incoming_face: FaceId,
239        ctx: &dyn DiscoveryContext,
240    ) -> bool {
241        let probe = match parse_indirect_probe(raw) {
242            Some(p) => p,
243            None => return false,
244        };
245        if probe.intermediary != self.core.node_name {
246            return false;
247        }
248        if let Some(entry) = ctx.neighbors().get(&probe.target)
249            && let Some((face_id, _, _)) = entry.faces.first()
250        {
251            let relay_nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
252            let direct_pkt = build_direct_probe(&probe.target, relay_nonce);
253            ctx.send_on(*face_id, direct_pkt);
254            if let Some(parsed) = parse_raw_interest(raw) {
255                let mut st = self.core.state.lock().unwrap();
256                st.relay_probes
257                    .insert(relay_nonce, (incoming_face, parsed.name.clone()));
258            }
259            debug!(
260                "{}: relaying via-probe to {:?}",
261                self.medium.protocol_id(),
262                probe.target
263            );
264            return true;
265        }
266        debug!(
267            "{}: via-probe target {:?} unknown, dropping",
268            self.medium.protocol_id(),
269            probe.target
270        );
271        true
272    }
273
274    fn handle_probe_ack(
275        &self,
276        raw: &Bytes,
277        _incoming_face: FaceId,
278        ctx: &dyn DiscoveryContext,
279    ) -> Option<bool> {
280        let parsed = parse_raw_data(raw)?;
281        let name = &parsed.name;
282        let comps = name.components();
283        let last = comps.last()?;
284        if last.value.len() != 4 {
285            return Some(false);
286        }
287        let nonce = u32::from_be_bytes(last.value[..4].try_into().ok()?);
288
289        let relay = {
290            let mut st = self.core.state.lock().unwrap();
291            st.relay_probes.remove(&nonce)
292        };
293        if let Some((origin_face, original_name)) = relay {
294            let ack = build_probe_ack(&original_name);
295            ctx.send_on(origin_face, ack);
296            debug!(
297                "{}: relayed probe ACK for nonce={nonce:#010x}",
298                self.medium.protocol_id()
299            );
300        }
301
302        let swim = {
303            let mut st = self.core.state.lock().unwrap();
304            st.swim_probes.remove(&nonce)
305        };
306        if let Some((sent, _target)) = swim {
307            let rtt = sent.elapsed();
308            self.core.strategy.lock().unwrap().on_probe_success(rtt);
309            debug!(
310                "{}: SWIM direct probe ACK nonce={nonce:#010x} rtt={rtt:?}",
311                self.medium.protocol_id()
312            );
313        }
314        Some(true)
315    }
316
317    fn apply_neighbor_diffs(&self, payload: &HelloPayload, ctx: &dyn DiscoveryContext) {
318        let mut should_broadcast = false;
319
320        for diff in &payload.neighbor_diffs {
321            for entry in &diff.entries {
322                match entry {
323                    DiffEntry::Add(name) => {
324                        if ctx.neighbors().get(name).is_none() {
325                            ctx.update_neighbor(NeighborUpdate::Upsert(NeighborEntry {
326                                node_name: name.clone(),
327                                state: NeighborState::Probing {
328                                    attempts: 0,
329                                    last_probe: Instant::now(),
330                                },
331                                faces: Vec::new(),
332                                rtt_us: None,
333                                pending_nonce: None,
334                            }));
335                            should_broadcast = true;
336                            debug!(
337                                "{}: SWIM diff — new peer {name:?} in Probing",
338                                self.medium.protocol_id()
339                            );
340                        }
341                    }
342                    DiffEntry::Remove(name) => {
343                        if ctx.neighbors().get(name).is_some() {
344                            ctx.update_neighbor(NeighborUpdate::SetState {
345                                name: name.clone(),
346                                state: NeighborState::Stale {
347                                    miss_count: 1,
348                                    last_seen: Instant::now(),
349                                },
350                            });
351                        }
352                    }
353                }
354            }
355        }
356
357        if should_broadcast {
358            self.core
359                .strategy
360                .lock()
361                .unwrap()
362                .trigger(TriggerEvent::ForwardingFailure);
363        }
364    }
365}
366
367// ── DiscoveryProtocol impl ──────────────────────────────────────────────────
368
369impl<T: LinkMedium> DiscoveryProtocol for HelloProtocol<T> {
370    fn protocol_id(&self) -> ProtocolId {
371        self.medium.protocol_id()
372    }
373
374    fn claimed_prefixes(&self) -> &[Name] {
375        &self.core.claimed
376    }
377
378    fn tick_interval(&self) -> Duration {
379        self.core.config.read().unwrap().tick_interval
380    }
381
382    fn on_face_up(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
383        if self.medium.is_multicast_face(face_id) {
384            let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
385            {
386                let mut st = self.core.state.lock().unwrap();
387                st.pending_probes.insert(nonce, Instant::now());
388            }
389            let hello = self.build_hello_interest(nonce);
390            self.medium.send_multicast(ctx, hello);
391            self.core
392                .strategy
393                .lock()
394                .unwrap()
395                .trigger(TriggerEvent::FaceUp);
396            debug!(
397                "{}: sent initial hello on face {face_id:?}",
398                self.medium.protocol_id()
399            );
400        }
401    }
402
403    fn on_face_down(&self, face_id: FaceId, ctx: &dyn DiscoveryContext) {
404        let mut st = self.core.state.lock().unwrap();
405        self.medium.on_face_down(face_id, &mut st, ctx);
406    }
407
408    fn on_inbound(
409        &self,
410        raw: &Bytes,
411        incoming_face: FaceId,
412        meta: &InboundMeta,
413        ctx: &dyn DiscoveryContext,
414    ) -> bool {
415        let swim_fanout = self.core.config.read().unwrap().swim_indirect_fanout;
416        match raw.first() {
417            Some(&0x05) => {
418                if swim_fanout > 0
419                    && let Some(parsed) = parse_raw_interest(raw)
420                {
421                    if parsed.name.has_prefix(probe_via()) {
422                        return self.handle_via_probe_interest(raw, incoming_face, ctx);
423                    }
424                    if parsed.name.has_prefix(probe_direct()) {
425                        return self.handle_direct_probe_interest(raw, incoming_face, ctx);
426                    }
427                }
428                self.medium
429                    .handle_hello_interest(raw, incoming_face, meta, &self.core, ctx)
430            }
431            Some(&0x06) => {
432                if swim_fanout > 0 && is_probe_ack(raw) {
433                    return self
434                        .handle_probe_ack(raw, incoming_face, ctx)
435                        .unwrap_or(false);
436                }
437                self.handle_hello_data(raw, incoming_face, meta, ctx)
438            }
439            _ => false,
440        }
441    }
442
443    fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
444        let protocol = self.medium.protocol_id();
445
446        // Read config once per tick to avoid repeated lock acquisitions.
447        let (liveness_timeout, miss_limit, gossip_k, swim_k, probe_timeout) = {
448            let cfg = self.core.config.read().unwrap();
449            (
450                cfg.liveness_timeout,
451                cfg.liveness_miss_count,
452                cfg.gossip_fanout as usize,
453                cfg.swim_indirect_fanout as usize,
454                cfg.probe_timeout,
455            )
456        };
457
458        // ── Hello probe scheduling ───────────────────────────────────────────
459        let probes = { self.core.strategy.lock().unwrap().on_tick(now) };
460        for probe in probes {
461            match probe {
462                ProbeRequest::Broadcast => {
463                    let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
464                    let hello = self.build_hello_interest(nonce);
465                    self.medium.send_multicast(ctx, hello);
466                    self.core
467                        .state
468                        .lock()
469                        .unwrap()
470                        .pending_probes
471                        .insert(nonce, now);
472                    debug!("{protocol}: broadcast hello (nonce={nonce:#010x})");
473                }
474                ProbeRequest::Unicast(face_id) => {
475                    let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
476                    ctx.send_on(face_id, self.build_hello_interest(nonce));
477                    self.core
478                        .state
479                        .lock()
480                        .unwrap()
481                        .pending_probes
482                        .insert(nonce, now);
483                    debug!("{protocol}: unicast hello on {face_id:?} (nonce={nonce:#010x})");
484                }
485            }
486        }
487
488        // ── Neighbor state machine ───────────────────────────────────────────
489        let all = ctx.neighbors().all();
490        for entry in &all {
491            match &entry.state {
492                NeighborState::Established { last_seen } => {
493                    if now.duration_since(*last_seen) > liveness_timeout {
494                        ctx.update_neighbor(NeighborUpdate::SetState {
495                            name: entry.node_name.clone(),
496                            state: NeighborState::Stale {
497                                miss_count: 1,
498                                last_seen: *last_seen,
499                            },
500                        });
501                        self.core
502                            .strategy
503                            .lock()
504                            .unwrap()
505                            .trigger(TriggerEvent::NeighborStale);
506                        // Send unicast hello directly to the stale neighbor's face.
507                        if let Some((face_id, _, _)) = entry.faces.first() {
508                            let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
509                            ctx.send_on(*face_id, self.build_hello_interest(nonce));
510                            self.core
511                                .state
512                                .lock()
513                                .unwrap()
514                                .pending_probes
515                                .insert(nonce, now);
516                        }
517                        // Emergency gossip: K unicast hellos to other established peers.
518                        if gossip_k > 0 {
519                            let stale_name = &entry.node_name;
520                            let peers: Vec<FaceId> = all
521                                .iter()
522                                .filter(|e| e.is_reachable() && &e.node_name != stale_name)
523                                .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid))
524                                .take(gossip_k)
525                                .collect();
526                            for face_id in peers {
527                                let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
528                                ctx.send_on(face_id, self.build_hello_interest(nonce));
529                                self.core
530                                    .state
531                                    .lock()
532                                    .unwrap()
533                                    .pending_probes
534                                    .insert(nonce, now);
535                            }
536                        }
537                    }
538                }
539                NeighborState::Stale {
540                    miss_count,
541                    last_seen,
542                } => {
543                    if u32::from(*miss_count) >= miss_limit {
544                        info!(
545                            peer = %entry.node_name, miss_count,
546                            "{protocol}: peer unreachable, removing"
547                        );
548                        // Link-specific cleanup (e.g. remove peer_faces for UDP).
549                        {
550                            let mut st = self.core.state.lock().unwrap();
551                            self.medium.on_peer_removed(entry, &mut st);
552                            st.recent_diffs
553                                .push_back(DiffEntry::Remove(entry.node_name.clone()));
554                            while st.recent_diffs.len() > MAX_DIFF_ENTRIES {
555                                st.recent_diffs.pop_front();
556                            }
557                        }
558                        for (face_id, _, _) in &entry.faces {
559                            ctx.remove_fib_entry(&entry.node_name, *face_id, protocol);
560                            ctx.remove_face(*face_id);
561                        }
562                        ctx.update_neighbor(NeighborUpdate::Remove(entry.node_name.clone()));
563                    } else if now.duration_since(*last_seen) > liveness_timeout {
564                        ctx.update_neighbor(NeighborUpdate::SetState {
565                            name: entry.node_name.clone(),
566                            state: NeighborState::Stale {
567                                miss_count: miss_count + 1,
568                                last_seen: now,
569                            },
570                        });
571                    }
572                }
573                _ => {}
574            }
575        }
576
577        // ── SWIM direct probes to established neighbors ──────────────────────
578        if swim_k > 0 {
579            for entry in all.iter().filter(|e| e.is_reachable()) {
580                if let Some((face_id, _, _)) = entry.faces.first() {
581                    let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
582                    trace!(
583                        peer = %entry.node_name, face = ?face_id, nonce,
584                        "{protocol}: SWIM direct probe →"
585                    );
586                    ctx.send_on(*face_id, build_direct_probe(&entry.node_name, nonce));
587                    self.core
588                        .state
589                        .lock()
590                        .unwrap()
591                        .swim_probes
592                        .insert(nonce, (now, entry.node_name.clone()));
593                }
594            }
595        }
596
597        // ── Expire hello pending probes ──────────────────────────────────────
598        let mut timed_out = 0u32;
599        {
600            let mut st = self.core.state.lock().unwrap();
601            st.pending_probes.retain(|_, sent| {
602                if now.duration_since(*sent) >= probe_timeout {
603                    timed_out += 1;
604                    false
605                } else {
606                    true
607                }
608            });
609        }
610        if timed_out > 0 {
611            let mut strategy = self.core.strategy.lock().unwrap();
612            for _ in 0..timed_out {
613                strategy.on_probe_timeout();
614            }
615        }
616
617        // ── Expire SWIM direct probes; dispatch indirect probes on failure ───
618        if swim_k > 0 {
619            let k = swim_k;
620            let mut timed_out_swim: Vec<Name> = Vec::new();
621            {
622                let mut st = self.core.state.lock().unwrap();
623                st.swim_probes.retain(|_, (sent, target)| {
624                    if now.duration_since(*sent) >= probe_timeout {
625                        timed_out_swim.push(target.clone());
626                        false
627                    } else {
628                        true
629                    }
630                });
631            }
632            for target in timed_out_swim {
633                let intermediaries: Vec<_> = ctx
634                    .neighbors()
635                    .all()
636                    .into_iter()
637                    .filter(|e| e.is_reachable() && e.node_name != target)
638                    .take(k)
639                    .collect();
640                debug!(
641                    peer = %target, via_count = intermediaries.len(),
642                    "{protocol}: SWIM direct probe timed out, dispatching indirect probes"
643                );
644                for via in intermediaries {
645                    let nonce = self.core.nonce_counter.fetch_add(1, Ordering::Relaxed);
646                    if let Some((face_id, _, _)) = via.faces.first() {
647                        ctx.send_on(
648                            *face_id,
649                            build_indirect_probe(&via.node_name, &target, nonce),
650                        );
651                    }
652                }
653                self.core.strategy.lock().unwrap().on_probe_timeout();
654            }
655        }
656    }
657}