ndn_discovery/gossip/
svs_gossip.rs

1//! `SvsServiceDiscovery` — SVS-backed push service-record notifications.
2//!
3//! Joins the SVS sync group at `/ndn/local/sd/updates/` so that service record
4//! *changes* are pushed to all group members rather than requiring periodic
5//! pulls.  Nodes that only need occasional browsability continue to use
6//! `ServiceDiscoveryProtocol` via `/ndn/local/sd/services/`.
7//!
8//! ## Architecture
9//!
10//! The SVS background task is fully async (`join_svs_group`), but
11//! `DiscoveryProtocol` hooks are synchronous. The bridge uses two
12//! `tokio::sync::mpsc` channels:
13//!
14//! ```text
15//!  on_inbound ──►  incoming_tx ──► SVS task (merge, detect gaps)
16//!  SVS task ──► outgoing_tx ──► on_tick drain ──► ctx.send_on (all neighbor faces)
17//!  SVS updates ──► update_rx ──► on_tick drain ──► express fetch Interests
18//! ```
19//!
20//! `on_tick` performs non-blocking draining of both channels using
21//! `try_recv()`.  This keeps the synchronous discovery hooks from blocking
22//! while still processing all pending work in bounded time.
23//!
24//! ## Packet routing
25//!
26//! All SVS Sync Interests arrive under `/ndn/local/sd/updates/svs/…`.
27//! This protocol claims `/ndn/local/sd/updates/`, so `CompositeDiscovery`
28//! routes these packets here before the forwarding pipeline sees them.
29//! Service record Data packets arrive under `/ndn/local/sd/services/` and
30//! are handled by `ServiceDiscoveryProtocol`; this protocol only handles the
31//! sync control plane.
32
33use std::sync::Mutex;
34use std::time::{Duration, Instant};
35
36use bytes::Bytes;
37use ndn_packet::Name;
38use ndn_packet::encode::InterestBuilder;
39use ndn_transport::FaceId;
40use tokio::sync::mpsc;
41use tracing::{debug, trace, warn};
42
43use crate::context::DiscoveryContext;
44use crate::protocol::{DiscoveryProtocol, InboundMeta, ProtocolId};
45use crate::scope::sd_updates;
46use crate::wire::parse_raw_interest;
47
48use ndn_sync::{SvsConfig, SyncHandle, SyncUpdate, join_svs_group};
49
50const PROTOCOL: ProtocolId = ProtocolId("svs-service-discovery");
51
52/// Capacity of the incoming-packet bridge channel.
53const CHANNEL_CAP: usize = 256;
54
55/// How often a pending `SyncUpdate` (fetch request) is retried if no Data
56/// has been seen.  In practice the SVS task emits each update only once.
57const FETCH_LIFETIME: Duration = Duration::from_secs(4);
58
59struct Inner {
60    /// Channel to feed raw incoming SVS packets into the background task.
61    incoming_tx: mpsc::Sender<Bytes>,
62    /// Channel that the background task uses to request outgoing SVS packets.
63    outgoing_rx: mpsc::Receiver<Bytes>,
64    /// Updates from the SVS task (new data to fetch from peers).
65    sync_handle: SyncHandle,
66    /// Timestamp of last housekeeping sweep.
67    last_tick: Option<Instant>,
68}
69
70/// SVS-backed push service-record discovery.
71///
72/// Add alongside [`ServiceDiscoveryProtocol`] in a [`CompositeDiscovery`] to
73/// receive push notifications whenever a peer publishes or updates a service
74/// record.
75///
76/// [`ServiceDiscoveryProtocol`]: crate::service_discovery::ServiceDiscoveryProtocol
77/// [`CompositeDiscovery`]: crate::CompositeDiscovery
78pub struct SvsServiceDiscovery {
79    /// Static claimed prefixes — set at construction, never mutated.
80    claimed: Vec<Name>,
81    inner: Mutex<Inner>,
82}
83
84impl SvsServiceDiscovery {
85    /// Create a new `SvsServiceDiscovery` for `node_name`.
86    ///
87    /// Immediately spawns the SVS background task (requires a Tokio runtime).
88    pub fn new(node_name: Name) -> Self {
89        let group = sd_updates().clone();
90
91        // Channels: we feed raw incoming bytes to SVS, SVS sends raw outgoing bytes back.
92        let (incoming_tx, incoming_rx) = mpsc::channel::<Bytes>(CHANNEL_CAP);
93        let (outgoing_tx, outgoing_rx) = mpsc::channel::<Bytes>(CHANNEL_CAP);
94
95        // Wrap the outgoing_tx so the SVS task can use it as its `send` channel.
96        // SVS `join_svs_group` takes `send: mpsc::Sender<Bytes>` for outgoing packets
97        // and `recv: mpsc::Receiver<Bytes>` for incoming packets.
98        let sync_handle = join_svs_group(
99            group,
100            node_name,
101            outgoing_tx,
102            incoming_rx,
103            SvsConfig::default(),
104        );
105
106        Self {
107            claimed: vec![sd_updates().clone()],
108            inner: Mutex::new(Inner {
109                incoming_tx,
110                outgoing_rx,
111                sync_handle,
112                last_tick: None,
113            }),
114        }
115    }
116
117    /// Send pending outgoing SVS packets to all reachable neighbors.
118    fn drain_outgoing(inner: &mut Inner, ctx: &dyn DiscoveryContext) {
119        // Drain all pending outgoing packets (Sync Interests).
120        let reachable_faces: Vec<FaceId> = ctx
121            .neighbors()
122            .all()
123            .into_iter()
124            .filter(|e| e.is_reachable())
125            .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid).collect::<Vec<_>>())
126            .collect();
127
128        loop {
129            match inner.outgoing_rx.try_recv() {
130                Ok(pkt) => {
131                    trace!(len=%pkt.len(), "svs-sd: sending SVS Sync Interest to {} faces", reachable_faces.len());
132                    for &face_id in &reachable_faces {
133                        ctx.send_on(face_id, pkt.clone());
134                    }
135                }
136                Err(mpsc::error::TryRecvError::Empty) => break,
137                Err(mpsc::error::TryRecvError::Disconnected) => {
138                    warn!("svs-sd: outgoing channel disconnected");
139                    break;
140                }
141            }
142        }
143    }
144
145    /// Drain sync updates and express fetch Interests for missing service records.
146    fn drain_updates(inner: &mut Inner, ctx: &dyn DiscoveryContext) {
147        loop {
148            match inner.sync_handle.rx.try_recv() {
149                Ok(update) => Self::handle_update(&update, ctx),
150                Err(mpsc::error::TryRecvError::Empty) => break,
151                Err(mpsc::error::TryRecvError::Disconnected) => {
152                    warn!("svs-sd: update channel disconnected");
153                    break;
154                }
155            }
156        }
157    }
158
159    /// Express fetch Interests for all records in a `SyncUpdate` gap.
160    fn handle_update(update: &SyncUpdate, ctx: &dyn DiscoveryContext) {
161        debug!(
162            publisher=%update.publisher,
163            low=%update.low_seq,
164            high=%update.high_seq,
165            "svs-sd: new service record update from peer"
166        );
167        // Express an Interest for each missing sequence number.
168        // The records live under the SD services prefix, keyed by
169        // `<publisher-name>/<seq>`.  The publisher's node name is embedded
170        // in `update.name` as the last component of the group prefix.
171        for seq in update.low_seq..=update.high_seq {
172            let fetch_name = update.name.clone().append(seq.to_string());
173            let interest = InterestBuilder::new(fetch_name)
174                .must_be_fresh()
175                .lifetime(FETCH_LIFETIME)
176                .build();
177            // Send on all reachable faces — the PIT will aggregate duplicates.
178            let faces: Vec<FaceId> = ctx
179                .neighbors()
180                .all()
181                .into_iter()
182                .filter(|e| e.is_reachable())
183                .flat_map(|e| e.faces.iter().map(|(fid, _, _)| *fid).collect::<Vec<_>>())
184                .collect();
185            for face_id in faces {
186                ctx.send_on(face_id, interest.clone());
187            }
188        }
189    }
190}
191
192impl DiscoveryProtocol for SvsServiceDiscovery {
193    fn protocol_id(&self) -> ProtocolId {
194        PROTOCOL
195    }
196
197    fn claimed_prefixes(&self) -> &[Name] {
198        &self.claimed
199    }
200
201    fn on_face_up(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
202
203    fn on_face_down(&self, _face_id: FaceId, _ctx: &dyn DiscoveryContext) {}
204
205    fn on_inbound(
206        &self,
207        raw: &Bytes,
208        _incoming_face: FaceId,
209        _meta: &InboundMeta,
210        _ctx: &dyn DiscoveryContext,
211    ) -> bool {
212        if raw.is_empty() {
213            return false;
214        }
215        // Only forward SVS Sync Interests (under sd_updates/).
216        let is_svs = parse_raw_interest(raw)
217            .map(|i| i.name.has_prefix(sd_updates()))
218            .unwrap_or(false);
219
220        if !is_svs {
221            return false;
222        }
223
224        // Non-blocking send; if the channel is full, drop the packet.
225        let inner = self.inner.lock().unwrap();
226        match inner.incoming_tx.try_send(raw.clone()) {
227            Ok(()) => true,
228            Err(mpsc::error::TrySendError::Full(_)) => {
229                warn!("svs-sd: incoming channel full, dropping SVS packet");
230                true // still consumed — don't forward to pipeline
231            }
232            Err(mpsc::error::TrySendError::Closed(_)) => {
233                warn!("svs-sd: incoming channel closed");
234                false
235            }
236        }
237    }
238
239    fn on_tick(&self, now: Instant, ctx: &dyn DiscoveryContext) {
240        let mut inner = self.inner.lock().unwrap();
241        inner.last_tick = Some(now);
242        // Drain outgoing SVS packets (send Sync Interests to all neighbors).
243        Self::drain_outgoing(&mut inner, ctx);
244        // Drain sync updates (express fetch Interests for new service records).
245        Self::drain_updates(&mut inner, ctx);
246    }
247
248    fn tick_interval(&self) -> Duration {
249        // Tick frequently enough to keep SVS Sync Interests flowing.
250        Duration::from_millis(200)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use std::str::FromStr;
258
259    #[tokio::test]
260    async fn svs_sd_creates_without_panic() {
261        let node = Name::from_str("/ndn/local/test-node").unwrap();
262        let _sd = SvsServiceDiscovery::new(node);
263        // Minimal smoke test: ensure construction and drop are clean.
264    }
265}