ndn_discovery/gossip/
epidemic.rs1use std::sync::Mutex;
32use std::time::{Duration, Instant};
33
34use bytes::Bytes;
35use ndn_packet::Name;
36use ndn_packet::encode::{DataBuilder, InterestBuilder};
37use ndn_tlv::{TlvReader, TlvWriter};
38use ndn_transport::FaceId;
39use tracing::{debug, trace};
40
41use crate::config::DiscoveryConfig;
42use crate::context::DiscoveryContext;
43use crate::neighbor::{NeighborEntry, NeighborState, NeighborUpdate};
44use crate::protocol::{DiscoveryProtocol, InboundMeta, ProtocolId};
45use crate::scope::gossip_prefix;
46use crate::wire::{parse_raw_data, parse_raw_interest, write_name_tlv};
47
48const PROTOCOL: ProtocolId = ProtocolId("epidemic-gossip");
49
50const GOSSIP_SUBSCRIBE_INTERVAL: Duration = Duration::from_secs(5);
53
54struct State {
56 node_name: Name,
58 local_seq: u64,
60 local_gossip_data: Option<Bytes>,
62 local_gossip_name: Option<Name>,
65 last_subscribe: Option<Instant>,
67 last_publish: Option<Instant>,
69}
70
71pub struct EpidemicGossip {
78 config: DiscoveryConfig,
79 claimed: Vec<Name>,
81 state: Mutex<State>,
82}
83
84impl EpidemicGossip {
85 pub fn new(node_name: Name, config: DiscoveryConfig) -> Self {
87 let claimed = vec![gossip_prefix().clone()];
88 let state = State {
89 node_name,
90 local_seq: 0,
91 local_gossip_data: None,
92 local_gossip_name: None,
93 last_subscribe: None,
94 last_publish: None,
95 };
96 Self {
97 config,
98 claimed,
99 state: Mutex::new(state),
100 }
101 }
102
103 fn build_subscribe_interest(peer_name: &Name) -> Bytes {
108 let mut interest_name = gossip_prefix().clone();
110 for comp in peer_name.components() {
111 interest_name = interest_name.append_component(comp.clone());
112 }
113 InterestBuilder::new(interest_name)
114 .can_be_prefix()
115 .must_be_fresh()
116 .lifetime(Duration::from_secs(10))
117 .build()
118 }
119
120 fn encode_snapshot(ctx: &dyn DiscoveryContext) -> Vec<u8> {
125 let mut w = TlvWriter::new();
126 for entry in ctx.neighbors().all() {
127 match &entry.state {
128 NeighborState::Established { .. } | NeighborState::Stale { .. } => {
129 write_name_tlv(&mut w, &entry.node_name);
130 }
131 _ => {}
132 }
133 }
134 w.finish().to_vec()
135 }
136
137 fn decode_snapshot(content: &Bytes) -> Vec<Name> {
139 let mut names = Vec::new();
140 let mut r = TlvReader::new(content.clone());
141 while !r.is_empty() {
142 if let Ok((typ, val)) = r.read_tlv() {
143 if typ == ndn_packet::tlv_type::NAME
144 && let Ok(name) = Name::decode(val)
145 {
146 names.push(name);
147 }
148 } else {
149 break;
150 }
151 }
152 names
153 }
154
155 fn publish_local_snapshot(&self, ctx: &dyn DiscoveryContext) -> Bytes {
157 let mut st = self.state.lock().unwrap();
158 st.local_seq += 1;
159 let seq = st.local_seq;
160 let node_name = st.node_name.clone();
161 drop(st);
162
163 let payload = Self::encode_snapshot(ctx);
164 let mut data_name = gossip_prefix().clone();
166 for comp in node_name.components() {
167 data_name = data_name.append_component(comp.clone());
168 }
169 let data_name = data_name.append(seq.to_string());
170
171 let wire = DataBuilder::new(data_name.clone(), &payload)
172 .freshness(GOSSIP_SUBSCRIBE_INTERVAL * 2)
173 .build();
174
175 let mut st = self.state.lock().unwrap();
176 st.local_gossip_data = Some(wire.clone());
177 st.local_gossip_name = Some(data_name);
178 st.last_publish = Some(Instant::now());
179 wire
180 }
181
182 fn handle_gossip_interest(&self, incoming_face: FaceId, ctx: &dyn DiscoveryContext) {
184 let wire = {
185 let st = self.state.lock().unwrap();
186 st.local_gossip_data.clone()
187 };
188 let wire = wire.unwrap_or_else(|| self.publish_local_snapshot(ctx));
190 ctx.send_on(incoming_face, wire);
191 }
192
193 fn handle_gossip_data(&self, raw: &Bytes, ctx: &dyn DiscoveryContext) {
195 let parsed = match parse_raw_data(raw) {
196 Some(d) => d,
197 None => return,
198 };
199 let content = match parsed.content {
200 Some(c) => c,
201 None => return,
202 };
203 let names = Self::decode_snapshot(&content);
204 debug!(
205 source_name=%parsed.name,
206 count=%names.len(),
207 "epidemic-gossip: received gossip record"
208 );
209 let local_name = self.state.lock().unwrap().node_name.clone();
210 for name in names {
211 if name == local_name {
213 continue;
214 }
215 if ctx.neighbors().get(&name).is_none() {
217 trace!(peer=%name, "epidemic-gossip: inserting Probing entry from gossip");
218 ctx.update_neighbor(NeighborUpdate::Upsert(NeighborEntry {
219 node_name: name,
220 state: NeighborState::Probing {
221 attempts: 0,
222 last_probe: Instant::now(),
223 },
224 faces: Vec::new(),
225 rtt_us: None,
226 pending_nonce: None,
227 }));
228 }
229 }
230 }
231}
232
233impl DiscoveryProtocol for EpidemicGossip {
234 fn protocol_id(&self) -> ProtocolId {
235 PROTOCOL
236 }
237
238 fn claimed_prefixes(&self) -> &[Name] {
239 &self.claimed
240 }
241
242 fn on_face_up(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
243
244 fn on_face_down(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
245
246 fn on_inbound(
247 &self,
248 raw: &Bytes,
249 incoming_face: FaceId,
250 _meta: &InboundMeta,
251 ctx: &dyn DiscoveryContext,
252 ) -> bool {
253 if raw.is_empty() {
255 return false;
256 }
257 let first = raw[0];
258
259 if first == ndn_packet::tlv_type::INTEREST as u8
261 && let Some(interest) = parse_raw_interest(raw)
262 && interest.name.has_prefix(gossip_prefix())
263 {
264 self.handle_gossip_interest(incoming_face, ctx);
265 return true;
266 }
267
268 if first == ndn_packet::tlv_type::DATA as u8
270 && let Some(parsed) = parse_raw_data(raw)
271 && parsed.name.has_prefix(gossip_prefix())
272 {
273 self.handle_gossip_data(raw, ctx);
274 return true;
275 }
276
277 false
278 }
279
280 fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
281 let (should_subscribe, should_publish) = {
282 let st = self.state.lock().unwrap();
283 let subscribe = st
284 .last_subscribe
285 .map(|t| now.duration_since(t) >= GOSSIP_SUBSCRIBE_INTERVAL)
286 .unwrap_or(true);
287 let publish = st
288 .last_publish
289 .map(|t| now.duration_since(t) >= GOSSIP_SUBSCRIBE_INTERVAL)
290 .unwrap_or(true);
291 (subscribe, publish)
292 };
293
294 if should_publish {
296 self.publish_local_snapshot(ctx);
297 }
298
299 if !should_subscribe {
300 return;
301 }
302 self.state.lock().unwrap().last_subscribe = Some(now);
303
304 let fanout = self.config.gossip_fanout as usize;
307 let peers: Vec<_> = ctx
308 .neighbors()
309 .all()
310 .into_iter()
311 .filter(|e| e.is_reachable())
312 .collect();
313
314 let selected: Vec<_> = if fanout > 0 && fanout < peers.len() {
315 let step = peers.len() / fanout;
317 peers.iter().step_by(step.max(1)).take(fanout).collect()
318 } else {
319 peers.iter().collect()
320 };
321
322 for entry in selected {
323 let face_ids: Vec<FaceId> = entry.faces.iter().map(|(fid, _, _)| *fid).collect();
324 let interest = Self::build_subscribe_interest(&entry.node_name);
325 for face_id in face_ids {
326 trace!(peer=%entry.node_name, face=%face_id, "epidemic-gossip: sending gossip subscription Interest");
327 ctx.send_on(face_id, interest.clone());
328 }
329 }
330 }
331
332 fn tick_interval(&self) -> Duration {
333 self.config.tick_interval
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use std::str::FromStr;
341
342 #[test]
343 fn snapshot_roundtrip_empty() {
344 let encoded: Vec<u8> = {
345 let w = TlvWriter::new();
346 w.finish().to_vec()
347 };
348 let decoded = EpidemicGossip::decode_snapshot(&Bytes::from(encoded));
349 assert!(decoded.is_empty());
350 }
351
352 #[test]
353 fn snapshot_roundtrip_with_names() {
354 let names = vec![
355 Name::from_str("/ndn/site/alice").unwrap(),
356 Name::from_str("/ndn/site/bob").unwrap(),
357 ];
358 let mut w = TlvWriter::new();
360 for n in &names {
361 write_name_tlv(&mut w, n);
362 }
363 let encoded = Bytes::from(w.finish().to_vec());
364 let decoded = EpidemicGossip::decode_snapshot(&encoded);
366 assert_eq!(decoded.len(), 2);
367 assert_eq!(decoded[0], names[0]);
368 assert_eq!(decoded[1], names[1]);
369 }
370}