ndn_discovery/gossip/
epidemic.rs

1//! `EpidemicGossip` — pull-gossip for neighbor state dissemination.
2//!
3//! Each node publishes a neighbor state snapshot under
4//! `/ndn/local/nd/gossip/<node-name>/<seq>` and subscribes to its peers'
5//! snapshots by expressing prefix Interests (`CanBePrefix=true`) for
6//! `/ndn/local/nd/gossip/`.
7//!
8//! ## Wire format — gossip record payload
9//!
10//! The Data Content is a sequence of concatenated Name TLVs, one per
11//! established-or-stale neighbor:
12//!
13//! ```text
14//! GossipRecord ::= (Name TLV)*
15//! ```
16//!
17//! This is intentionally minimal — gossip records carry name hints for
18//! nodes that should be probed; the receiver creates `Probing` state entries
19//! and the normal hello state machine confirms them.  No RTT or face-ID
20//! metadata is included (link-local face IDs have no meaning to remote peers).
21//!
22//! ## Operation
23//!
24//! * `on_tick`: every `gossip_interval` ticks, express a fresh prefix Interest
25//!   for each established peer's gossip prefix and publish a local snapshot
26//!   when the local sequence number has advanced.
27//! * `on_inbound` for Interest: respond with the latest local gossip Data.
28//! * `on_inbound` for Data: decode the neighbor name list and add any
29//!   unknown names as `Probing` entries to the neighbor table.
30
31use std::sync::Mutex;
32use std::time::{Duration, Instant};
33
34use bytes::Bytes;
35use ndn_packet::Name;
36use ndn_packet::encode::{DataBuilder, InterestBuilder};
37use ndn_tlv::{TlvReader, TlvWriter};
38use ndn_transport::FaceId;
39use tracing::{debug, trace};
40
41use crate::config::DiscoveryConfig;
42use crate::context::DiscoveryContext;
43use crate::neighbor::{NeighborEntry, NeighborState, NeighborUpdate};
44use crate::protocol::{DiscoveryProtocol, InboundMeta, ProtocolId};
45use crate::scope::gossip_prefix;
46use crate::wire::{parse_raw_data, parse_raw_interest, write_name_tlv};
47
48const PROTOCOL: ProtocolId = ProtocolId("epidemic-gossip");
49
50/// Gossip subscription interval: how often we express prefix Interests for
51/// each peer's gossip prefix to pull fresh snapshots.
52const GOSSIP_SUBSCRIBE_INTERVAL: Duration = Duration::from_secs(5);
53
54/// Internal state protected by a mutex.
55struct State {
56    /// This node's own NDN name.
57    node_name: Name,
58    /// Monotonically increasing sequence number for local gossip records.
59    local_seq: u64,
60    /// Cached wire bytes for the most recent local gossip Data.
61    local_gossip_data: Option<Bytes>,
62    /// The name of the last-published gossip Data (used as Interest name prefix
63    /// when peers subscribe to this node's gossip).
64    local_gossip_name: Option<Name>,
65    /// Timestamp of last gossip subscription sweep.
66    last_subscribe: Option<Instant>,
67    /// Timestamp of last local snapshot publication.
68    last_publish: Option<Instant>,
69}
70
71/// Pull-gossip protocol for neighbor state dissemination.
72///
73/// Publishes and subscribes to neighbor snapshots at
74/// `/ndn/local/nd/gossip/<node-name>/<seq>`.  Remote node names discovered
75/// via gossip are inserted into the neighbor table as `Probing` entries so the
76/// normal hello state machine takes over.
77pub struct EpidemicGossip {
78    config: DiscoveryConfig,
79    /// The claimed prefix as an owned `Vec` for `claimed_prefixes()`.
80    claimed: Vec<Name>,
81    state: Mutex<State>,
82}
83
84impl EpidemicGossip {
85    /// Create a new `EpidemicGossip` for `node_name`.
86    pub fn new(node_name: Name, config: DiscoveryConfig) -> Self {
87        let claimed = vec![gossip_prefix().clone()];
88        let state = State {
89            node_name,
90            local_seq: 0,
91            local_gossip_data: None,
92            local_gossip_name: None,
93            last_subscribe: None,
94            last_publish: None,
95        };
96        Self {
97            config,
98            claimed,
99            state: Mutex::new(state),
100        }
101    }
102
103    // ─── helpers ──────────────────────────────────────────────────────────────
104
105    /// Build a gossip Interest for a specific peer: subscribes to all of that
106    /// peer's gossip publications under `/ndn/local/nd/gossip/<peer-name>/`.
107    fn build_subscribe_interest(peer_name: &Name) -> Bytes {
108        // Append all components of peer_name onto gossip_prefix().
109        let mut interest_name = gossip_prefix().clone();
110        for comp in peer_name.components() {
111            interest_name = interest_name.append_component(comp.clone());
112        }
113        InterestBuilder::new(interest_name)
114            .can_be_prefix()
115            .must_be_fresh()
116            .lifetime(Duration::from_secs(10))
117            .build()
118    }
119
120    /// Encode the local neighbor snapshot as a gossip record payload.
121    ///
122    /// Returns the encoded TLV bytes (sequence of Name TLVs) and the list
123    /// of neighbor names included.
124    fn encode_snapshot(ctx: &dyn DiscoveryContext) -> Vec<u8> {
125        let mut w = TlvWriter::new();
126        for entry in ctx.neighbors().all() {
127            match &entry.state {
128                NeighborState::Established { .. } | NeighborState::Stale { .. } => {
129                    write_name_tlv(&mut w, &entry.node_name);
130                }
131                _ => {}
132            }
133        }
134        w.finish().to_vec()
135    }
136
137    /// Decode a gossip record payload into a list of neighbor names.
138    fn decode_snapshot(content: &Bytes) -> Vec<Name> {
139        let mut names = Vec::new();
140        let mut r = TlvReader::new(content.clone());
141        while !r.is_empty() {
142            if let Ok((typ, val)) = r.read_tlv() {
143                if typ == ndn_packet::tlv_type::NAME
144                    && let Ok(name) = Name::decode(val)
145                {
146                    names.push(name);
147                }
148            } else {
149                break;
150            }
151        }
152        names
153    }
154
155    /// Publish a fresh local gossip Data packet.  Returns the wire bytes.
156    fn publish_local_snapshot(&self, ctx: &dyn DiscoveryContext) -> Bytes {
157        let mut st = self.state.lock().unwrap();
158        st.local_seq += 1;
159        let seq = st.local_seq;
160        let node_name = st.node_name.clone();
161        drop(st);
162
163        let payload = Self::encode_snapshot(ctx);
164        // Build data name: gossip_prefix / node_name components / seq
165        let mut data_name = gossip_prefix().clone();
166        for comp in node_name.components() {
167            data_name = data_name.append_component(comp.clone());
168        }
169        let data_name = data_name.append(seq.to_string());
170
171        let wire = DataBuilder::new(data_name.clone(), &payload)
172            .freshness(GOSSIP_SUBSCRIBE_INTERVAL * 2)
173            .build();
174
175        let mut st = self.state.lock().unwrap();
176        st.local_gossip_data = Some(wire.clone());
177        st.local_gossip_name = Some(data_name);
178        st.last_publish = Some(Instant::now());
179        wire
180    }
181
182    /// Handle an incoming gossip Interest and respond with local snapshot.
183    fn handle_gossip_interest(&self, incoming_face: FaceId, ctx: &dyn DiscoveryContext) {
184        let wire = {
185            let st = self.state.lock().unwrap();
186            st.local_gossip_data.clone()
187        };
188        // Publish fresh snapshot if we don't have one yet.
189        let wire = wire.unwrap_or_else(|| self.publish_local_snapshot(ctx));
190        ctx.send_on(incoming_face, wire);
191    }
192
193    /// Handle an incoming gossip Data: merge remote neighbor names into table.
194    fn handle_gossip_data(&self, raw: &Bytes, ctx: &dyn DiscoveryContext) {
195        let parsed = match parse_raw_data(raw) {
196            Some(d) => d,
197            None => return,
198        };
199        let content = match parsed.content {
200            Some(c) => c,
201            None => return,
202        };
203        let names = Self::decode_snapshot(&content);
204        debug!(
205            source_name=%parsed.name,
206            count=%names.len(),
207            "epidemic-gossip: received gossip record"
208        );
209        let local_name = self.state.lock().unwrap().node_name.clone();
210        for name in names {
211            // Skip self.
212            if name == local_name {
213                continue;
214            }
215            // Only insert if not already known.
216            if ctx.neighbors().get(&name).is_none() {
217                trace!(peer=%name, "epidemic-gossip: inserting Probing entry from gossip");
218                ctx.update_neighbor(NeighborUpdate::Upsert(NeighborEntry {
219                    node_name: name,
220                    state: NeighborState::Probing {
221                        attempts: 0,
222                        last_probe: Instant::now(),
223                    },
224                    faces: Vec::new(),
225                    rtt_us: None,
226                    pending_nonce: None,
227                }));
228            }
229        }
230    }
231}
232
233impl DiscoveryProtocol for EpidemicGossip {
234    fn protocol_id(&self) -> ProtocolId {
235        PROTOCOL
236    }
237
238    fn claimed_prefixes(&self) -> &[Name] {
239        &self.claimed
240    }
241
242    fn on_face_up(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
243
244    fn on_face_down(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
245
246    fn on_inbound(
247        &self,
248        raw: &Bytes,
249        incoming_face: FaceId,
250        _meta: &InboundMeta,
251        ctx: &dyn DiscoveryContext,
252    ) -> bool {
253        // Quick peek to classify Interest vs Data without full decode.
254        if raw.is_empty() {
255            return false;
256        }
257        let first = raw[0];
258
259        // Interest TLV type 0x05.
260        if first == ndn_packet::tlv_type::INTEREST as u8
261            && let Some(interest) = parse_raw_interest(raw)
262            && interest.name.has_prefix(gossip_prefix())
263        {
264            self.handle_gossip_interest(incoming_face, ctx);
265            return true;
266        }
267
268        // Data TLV type 0x06.
269        if first == ndn_packet::tlv_type::DATA as u8
270            && let Some(parsed) = parse_raw_data(raw)
271            && parsed.name.has_prefix(gossip_prefix())
272        {
273            self.handle_gossip_data(raw, ctx);
274            return true;
275        }
276
277        false
278    }
279
280    fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
281        let (should_subscribe, should_publish) = {
282            let st = self.state.lock().unwrap();
283            let subscribe = st
284                .last_subscribe
285                .map(|t| now.duration_since(t) >= GOSSIP_SUBSCRIBE_INTERVAL)
286                .unwrap_or(true);
287            let publish = st
288                .last_publish
289                .map(|t| now.duration_since(t) >= GOSSIP_SUBSCRIBE_INTERVAL)
290                .unwrap_or(true);
291            (subscribe, publish)
292        };
293
294        // Publish a fresh local snapshot if due.
295        if should_publish {
296            self.publish_local_snapshot(ctx);
297        }
298
299        if !should_subscribe {
300            return;
301        }
302        self.state.lock().unwrap().last_subscribe = Some(now);
303
304        // Express gossip subscription Interests to established/stale peers.
305        // Limit to `gossip_fanout` peers when set, otherwise subscribe to all.
306        let fanout = self.config.gossip_fanout as usize;
307        let peers: Vec<_> = ctx
308            .neighbors()
309            .all()
310            .into_iter()
311            .filter(|e| e.is_reachable())
312            .collect();
313
314        let selected: Vec<_> = if fanout > 0 && fanout < peers.len() {
315            // Pseudo-random selection: pick every Nth entry using tick count.
316            let step = peers.len() / fanout;
317            peers.iter().step_by(step.max(1)).take(fanout).collect()
318        } else {
319            peers.iter().collect()
320        };
321
322        for entry in selected {
323            let face_ids: Vec<FaceId> = entry.faces.iter().map(|(fid, _, _)| *fid).collect();
324            let interest = Self::build_subscribe_interest(&entry.node_name);
325            for face_id in face_ids {
326                trace!(peer=%entry.node_name, face=%face_id, "epidemic-gossip: sending gossip subscription Interest");
327                ctx.send_on(face_id, interest.clone());
328            }
329        }
330    }
331
332    fn tick_interval(&self) -> Duration {
333        self.config.tick_interval
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use std::str::FromStr;
341
342    #[test]
343    fn snapshot_roundtrip_empty() {
344        let encoded: Vec<u8> = {
345            let w = TlvWriter::new();
346            w.finish().to_vec()
347        };
348        let decoded = EpidemicGossip::decode_snapshot(&Bytes::from(encoded));
349        assert!(decoded.is_empty());
350    }
351
352    #[test]
353    fn snapshot_roundtrip_with_names() {
354        let names = vec![
355            Name::from_str("/ndn/site/alice").unwrap(),
356            Name::from_str("/ndn/site/bob").unwrap(),
357        ];
358        // Encode.
359        let mut w = TlvWriter::new();
360        for n in &names {
361            write_name_tlv(&mut w, n);
362        }
363        let encoded = Bytes::from(w.finish().to_vec());
364        // Decode.
365        let decoded = EpidemicGossip::decode_snapshot(&encoded);
366        assert_eq!(decoded.len(), 2);
367        assert_eq!(decoded[0], names[0]);
368        assert_eq!(decoded[1], names[1]);
369    }
370}