ndn_discovery/gossip/svs_gossip.rs
1//! `SvsServiceDiscovery` — SVS-backed push service-record notifications.
2//!
3//! Joins the SVS sync group at `/ndn/local/sd/updates/` so that service record
4//! *changes* are pushed to all group members rather than requiring periodic
5//! pulls. Nodes that only need occasional browsability continue to use
6//! `ServiceDiscoveryProtocol` via `/ndn/local/sd/services/`.
7//!
8//! ## Architecture
9//!
10//! The SVS background task is fully async (`join_svs_group`), but
11//! `DiscoveryProtocol` hooks are synchronous. The bridge uses two
12//! `tokio::sync::mpsc` channels:
13//!
14//! ```text
15//! on_inbound ──► incoming_tx ──► SVS task (merge, detect gaps)
16//! SVS task ──► outgoing_tx ──► on_tick drain ──► ctx.send_on (all neighbor faces)
17//! SVS updates ──► update_rx ──► on_tick drain ──► express fetch Interests
18//! ```
19//!
20//! `on_tick` performs non-blocking draining of both channels using
21//! `try_recv()`. This keeps the synchronous discovery hooks from blocking
22//! while still processing all pending work in bounded time.
23//!
24//! ## Packet routing
25//!
26//! All SVS Sync Interests arrive under `/ndn/local/sd/updates/svs/…`.
27//! This protocol claims `/ndn/local/sd/updates/`, so `CompositeDiscovery`
28//! routes these packets here before the forwarding pipeline sees them.
29//! Service record Data packets arrive under `/ndn/local/sd/services/` and
30//! are handled by `ServiceDiscoveryProtocol`; this protocol only handles the
31//! sync control plane.
32
33use std::sync::Mutex;
34use std::time::{Duration, Instant};
35
36use bytes::Bytes;
37use ndn_packet::Name;
38use ndn_packet::encode::InterestBuilder;
39use ndn_transport::FaceId;
40use tokio::sync::mpsc;
41use tracing::{debug, trace, warn};
42
43use crate::context::DiscoveryContext;
44use crate::protocol::{DiscoveryProtocol, InboundMeta, ProtocolId};
45use crate::scope::sd_updates;
46use crate::wire::parse_raw_interest;
47
48use ndn_sync::{SvsConfig, SyncHandle, SyncUpdate, join_svs_group};
49
50const PROTOCOL: ProtocolId = ProtocolId("svs-service-discovery");
51
52/// Capacity of the incoming-packet bridge channel.
53const CHANNEL_CAP: usize = 256;
54
55/// How often a pending `SyncUpdate` (fetch request) is retried if no Data
56/// has been seen. In practice the SVS task emits each update only once.
57const FETCH_LIFETIME: Duration = Duration::from_secs(4);
58
59struct Inner {
60 /// Channel to feed raw incoming SVS packets into the background task.
61 incoming_tx: mpsc::Sender<Bytes>,
62 /// Channel that the background task uses to request outgoing SVS packets.
63 outgoing_rx: mpsc::Receiver<Bytes>,
64 /// Updates from the SVS task (new data to fetch from peers).
65 sync_handle: SyncHandle,
66 /// Timestamp of last housekeeping sweep.
67 last_tick: Option<Instant>,
68}
69
70/// SVS-backed push service-record discovery.
71///
72/// Add alongside [`ServiceDiscoveryProtocol`] in a [`CompositeDiscovery`] to
73/// receive push notifications whenever a peer publishes or updates a service
74/// record.
75///
76/// [`ServiceDiscoveryProtocol`]: crate::service_discovery::ServiceDiscoveryProtocol
77/// [`CompositeDiscovery`]: crate::CompositeDiscovery
78pub struct SvsServiceDiscovery {
79 /// Static claimed prefixes — set at construction, never mutated.
80 claimed: Vec<Name>,
81 inner: Mutex<Inner>,
82}
83
84impl SvsServiceDiscovery {
85 /// Create a new `SvsServiceDiscovery` for `node_name`.
86 ///
87 /// Immediately spawns the SVS background task (requires a Tokio runtime).
88 pub fn new(node_name: Name) -> Self {
89 let group = sd_updates().clone();
90
91 // Channels: we feed raw incoming bytes to SVS, SVS sends raw outgoing bytes back.
92 let (incoming_tx, incoming_rx) = mpsc::channel::<Bytes>(CHANNEL_CAP);
93 let (outgoing_tx, outgoing_rx) = mpsc::channel::<Bytes>(CHANNEL_CAP);
94
95 // Wrap the outgoing_tx so the SVS task can use it as its `send` channel.
96 // SVS `join_svs_group` takes `send: mpsc::Sender<Bytes>` for outgoing packets
97 // and `recv: mpsc::Receiver<Bytes>` for incoming packets.
98 let sync_handle = join_svs_group(
99 group,
100 node_name,
101 outgoing_tx,
102 incoming_rx,
103 SvsConfig::default(),
104 );
105
106 Self {
107 claimed: vec![sd_updates().clone()],
108 inner: Mutex::new(Inner {
109 incoming_tx,
110 outgoing_rx,
111 sync_handle,
112 last_tick: None,
113 }),
114 }
115 }
116
117 /// Send pending outgoing SVS packets to all reachable neighbors.
118 fn drain_outgoing(inner: &mut Inner, ctx: &dyn DiscoveryContext) {
119 // Drain all pending outgoing packets (Sync Interests).
120 let reachable_faces: Vec<FaceId> = ctx
121 .neighbors()
122 .all()
123 .into_iter()
124 .filter(|e| e.is_reachable())
125 .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid).collect::<Vec<_>>())
126 .collect();
127
128 loop {
129 match inner.outgoing_rx.try_recv() {
130 Ok(pkt) => {
131 trace!(len=%pkt.len(), "svs-sd: sending SVS Sync Interest to {} faces", reachable_faces.len());
132 for &face_id in &reachable_faces {
133 ctx.send_on(face_id, pkt.clone());
134 }
135 }
136 Err(mpsc::error::TryRecvError::Empty) => break,
137 Err(mpsc::error::TryRecvError::Disconnected) => {
138 warn!("svs-sd: outgoing channel disconnected");
139 break;
140 }
141 }
142 }
143 }
144
145 /// Drain sync updates and express fetch Interests for missing service records.
146 fn drain_updates(inner: &mut Inner, ctx: &dyn DiscoveryContext) {
147 loop {
148 match inner.sync_handle.rx.try_recv() {
149 Ok(update) => Self::handle_update(&update, ctx),
150 Err(mpsc::error::TryRecvError::Empty) => break,
151 Err(mpsc::error::TryRecvError::Disconnected) => {
152 warn!("svs-sd: update channel disconnected");
153 break;
154 }
155 }
156 }
157 }
158
159 /// Express fetch Interests for all records in a `SyncUpdate` gap.
160 fn handle_update(update: &SyncUpdate, ctx: &dyn DiscoveryContext) {
161 debug!(
162 publisher=%update.publisher,
163 low=%update.low_seq,
164 high=%update.high_seq,
165 "svs-sd: new service record update from peer"
166 );
167 // Express an Interest for each missing sequence number.
168 // The records live under the SD services prefix, keyed by
169 // `<publisher-name>/<seq>`. The publisher's node name is embedded
170 // in `update.name` as the last component of the group prefix.
171 for seq in update.low_seq..=update.high_seq {
172 let fetch_name = update.name.clone().append(seq.to_string());
173 let interest = InterestBuilder::new(fetch_name)
174 .must_be_fresh()
175 .lifetime(FETCH_LIFETIME)
176 .build();
177 // Send on all reachable faces — the PIT will aggregate duplicates.
178 let faces: Vec<FaceId> = ctx
179 .neighbors()
180 .all()
181 .into_iter()
182 .filter(|e| e.is_reachable())
183 .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid).collect::<Vec<_>>())
184 .collect();
185 for face_id in faces {
186 ctx.send_on(face_id, interest.clone());
187 }
188 }
189 }
190}
191
192impl DiscoveryProtocol for SvsServiceDiscovery {
193 fn protocol_id(&self) -> ProtocolId {
194 PROTOCOL
195 }
196
197 fn claimed_prefixes(&self) -> &[Name] {
198 &self.claimed
199 }
200
201 fn on_face_up(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
202
203 fn on_face_down(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
204
205 fn on_inbound(
206 &self,
207 raw: &Bytes,
208 _incoming_face: FaceId,
209 _meta: &InboundMeta,
210 _ctx: &dyn DiscoveryContext,
211 ) -> bool {
212 if raw.is_empty() {
213 return false;
214 }
215 // Only forward SVS Sync Interests (under sd_updates/).
216 let is_svs = parse_raw_interest(raw)
217 .map(|i| i.name.has_prefix(sd_updates()))
218 .unwrap_or(false);
219
220 if !is_svs {
221 return false;
222 }
223
224 // Non-blocking send; if the channel is full, drop the packet.
225 let inner = self.inner.lock().unwrap();
226 match inner.incoming_tx.try_send(raw.clone()) {
227 Ok(()) => true,
228 Err(mpsc::error::TrySendError::Full(_)) => {
229 warn!("svs-sd: incoming channel full, dropping SVS packet");
230 true // still consumed — don't forward to pipeline
231 }
232 Err(mpsc::error::TrySendError::Closed(_)) => {
233 warn!("svs-sd: incoming channel closed");
234 false
235 }
236 }
237 }
238
239 fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
240 let mut inner = self.inner.lock().unwrap();
241 inner.last_tick = Some(now);
242 // Drain outgoing SVS packets (send Sync Interests to all neighbors).
243 Self::drain_outgoing(&mut inner, ctx);
244 // Drain sync updates (express fetch Interests for new service records).
245 Self::drain_updates(&mut inner, ctx);
246 }
247
248 fn tick_interval(&self) -> Duration {
249 // Tick frequently enough to keep SVS Sync Interests flowing.
250 Duration::from_millis(200)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use std::str::FromStr;
258
259 #[tokio::test]
260 async fn svs_sd_creates_without_panic() {
261 let node = Name::from_str("/ndn/local/test-node").unwrap();
262 let _sd = SvsServiceDiscovery::new(node);
263 // Minimal smoke test: ensure construction and drop are clean.
264 }
265}