ndn_app/
subscriber.rs

1//! High-level subscription API — Zenoh-inspired pub/sub over NDN sync.
2//!
3//! `Subscriber` joins a sync group, receives notifications of new data from
4//! peers, and optionally auto-fetches the data.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! # async fn example() -> Result<(), ndn_app::AppError> {
10//! use ndn_app::Subscriber;
11//!
12//! let mut sub = Subscriber::connect("/run/nfd/nfd.sock", "/chat/room1").await?;
13//!
14//! while let Some(sample) = sub.recv().await {
15//!     println!("{}: {:?}", sample.name, sample.payload);
16//! }
17//! # Ok(())
18//! # }
19//! ```
20
21use std::path::Path;
22use std::sync::Arc;
23use std::time::Duration;
24
25use bytes::Bytes;
26use tokio::sync::mpsc;
27
28use ndn_ipc::ForwarderClient;
29use ndn_packet::encode::encode_interest;
30use ndn_packet::{Data, Name};
31
32use crate::AppError;
33use crate::connection::NdnConnection;
34
35/// A received publication from a sync group.
36#[derive(Clone, Debug)]
37pub struct Sample {
38    /// The full name of the published data.
39    pub name: Name,
40    /// Publisher identifier (node key from the sync group).
41    pub publisher: String,
42    /// Sequence number of this publication.
43    pub seq: u64,
44    /// Data payload (fetched automatically if `auto_fetch` is enabled,
45    /// otherwise `None` — the subscriber only gets the notification).
46    pub payload: Option<Bytes>,
47}
48
49/// Configuration for a subscriber.
50#[derive(Clone, Debug)]
51pub struct SubscriberConfig {
52    /// Automatically fetch data for each sync update (default: true).
53    pub auto_fetch: bool,
54    /// Timeout for auto-fetch Interests (default: 4 seconds).
55    pub fetch_timeout: Duration,
56    /// SVS sync configuration.
57    pub svs: ndn_sync::SvsConfig,
58}
59
60impl Default for SubscriberConfig {
61    fn default() -> Self {
62        Self {
63            auto_fetch: true,
64            fetch_timeout: Duration::from_secs(4),
65            svs: ndn_sync::SvsConfig::default(),
66        }
67    }
68}
69
70/// A subscription to a sync group.
71///
72/// Receives [`Sample`]s as peers publish new data.
73pub struct Subscriber {
74    sample_rx: mpsc::Receiver<Sample>,
75    _cancel: tokio_util::sync::CancellationToken,
76}
77
78impl Subscriber {
79    /// Connect to a router and subscribe to a sync group prefix.
80    ///
81    /// Uses SVS as the sync protocol. The subscriber registers the group
82    /// prefix and begins receiving updates from peers.
83    pub async fn connect(
84        socket: impl AsRef<Path>,
85        group_prefix: impl Into<Name>,
86    ) -> Result<Self, AppError> {
87        Self::connect_with_config(socket, group_prefix, SubscriberConfig::default()).await
88    }
89
90    /// Connect with explicit configuration.
91    pub async fn connect_with_config(
92        socket: impl AsRef<Path>,
93        group_prefix: impl Into<Name>,
94        config: SubscriberConfig,
95    ) -> Result<Self, AppError> {
96        let group = group_prefix.into();
97        let client = ForwarderClient::connect(socket)
98            .await
99            .map_err(AppError::Connection)?;
100        client
101            .register_prefix(&group)
102            .await
103            .map_err(AppError::Connection)?;
104
105        // Generate a local node name from PID.
106        let local_name = group.clone().append(format!("node-{}", std::process::id()));
107
108        Self::run(NdnConnection::External(client), group, local_name, config)
109    }
110
111    /// Connect to a router and subscribe to a sync group using **PSync**.
112    ///
113    /// Identical to [`connect`](Self::connect) but uses PSync instead of SVS.
114    /// Use this when peers in the group also use PSync.
115    pub async fn connect_psync(
116        socket: impl AsRef<Path>,
117        group_prefix: impl Into<Name>,
118    ) -> Result<Self, AppError> {
119        Self::connect_psync_with_config(socket, group_prefix, ndn_sync::PSyncConfig::default())
120            .await
121    }
122
123    /// Connect with PSync and explicit configuration.
124    pub async fn connect_psync_with_config(
125        socket: impl AsRef<Path>,
126        group_prefix: impl Into<Name>,
127        psync_config: ndn_sync::PSyncConfig,
128    ) -> Result<Self, AppError> {
129        let group = group_prefix.into();
130        let client = ForwarderClient::connect(socket)
131            .await
132            .map_err(AppError::Connection)?;
133        client
134            .register_prefix(&group)
135            .await
136            .map_err(AppError::Connection)?;
137
138        let local_name = group.clone().append(format!("node-{}", std::process::id()));
139        Self::run_psync(
140            NdnConnection::External(client),
141            group,
142            local_name,
143            psync_config,
144        )
145    }
146
147    /// Create from an in-process connection (embedded engine).
148    pub fn from_connection(
149        conn: NdnConnection,
150        group: Name,
151        local_name: Name,
152        config: SubscriberConfig,
153    ) -> Result<Self, AppError> {
154        Self::run(conn, group, local_name, config)
155    }
156
157    /// Internal: run with PSync protocol.
158    fn run_psync(
159        conn: NdnConnection,
160        group: Name,
161        local_name: Name,
162        psync_config: ndn_sync::PSyncConfig,
163    ) -> Result<Self, AppError> {
164        let _ = local_name; // PSync uses group name, not per-node name
165        let cancel = tokio_util::sync::CancellationToken::new();
166        let capacity = psync_config.channel_capacity;
167        let (sample_tx, sample_rx) = mpsc::channel(capacity);
168
169        let (net_send_tx, mut net_send_rx) = mpsc::channel::<Bytes>(64);
170        let (net_recv_tx, net_recv_rx) = mpsc::channel::<Bytes>(64);
171
172        let mut sync_handle =
173            ndn_sync::join_psync_group(group.clone(), net_send_tx, net_recv_rx, psync_config);
174
175        let conn = Arc::new(conn);
176
177        let conn_send = Arc::clone(&conn);
178        let cancel_send = cancel.clone();
179        tokio::spawn(async move {
180            loop {
181                tokio::select! {
182                    _ = cancel_send.cancelled() => break,
183                    Some(pkt) = net_send_rx.recv() => { let _ = conn_send.send(pkt).await; }
184                }
185            }
186        });
187
188        let conn_recv = Arc::clone(&conn);
189        let cancel_recv = cancel.clone();
190        tokio::spawn(async move {
191            loop {
192                tokio::select! {
193                    _ = cancel_recv.cancelled() => break,
194                    pkt = conn_recv.recv() => match pkt {
195                        Some(raw) => { if raw.first() == Some(&0x05) { let _ = net_recv_tx.send(raw).await; } }
196                        None => break,
197                    }
198                }
199            }
200        });
201
202        let conn_fetch = Arc::clone(&conn);
203        let task_cancel = cancel.clone();
204        tokio::spawn(async move {
205            loop {
206                tokio::select! {
207                    _ = task_cancel.cancelled() => break,
208                    Some(update) = sync_handle.recv() => {
209                        for seq in update.low_seq..=update.high_seq {
210                            let data_name = update.name.clone().append_segment(seq);
211                            let payload = fetch_data(&conn_fetch, &data_name, Duration::from_secs(4)).await;
212                            let sample = Sample {
213                                name: data_name,
214                                publisher: update.publisher.clone(),
215                                seq,
216                                payload,
217                            };
218                            if sample_tx.send(sample).await.is_err() { return; }
219                        }
220                    }
221                }
222            }
223        });
224
225        Ok(Self {
226            sample_rx,
227            _cancel: cancel,
228        })
229    }
230
231    /// Spawn the background tasks that drive the subscription:
232    ///
233    /// 1. **Send pump** — forwards sync Interests from the SVS task to the router.
234    /// 2. **Recv pump** — reads packets from the connection and routes Interests
235    ///    to the SVS task.
236    /// 3. **Update processor** — receives `SyncUpdate`s, optionally auto-fetches
237    ///    Data, and emits `Sample`s.
238    fn run(
239        conn: NdnConnection,
240        group: Name,
241        local_name: Name,
242        config: SubscriberConfig,
243    ) -> Result<Self, AppError> {
244        let cancel = tokio_util::sync::CancellationToken::new();
245        let (sample_tx, sample_rx) = mpsc::channel(config.svs.channel_capacity);
246
247        // Channels for sync protocol ↔ network.
248        let (net_send_tx, mut net_send_rx) = mpsc::channel::<Bytes>(64);
249        let (net_recv_tx, net_recv_rx) = mpsc::channel::<Bytes>(64);
250
251        // Join the SVS group.
252        let mut sync_handle = ndn_sync::join_svs_group(
253            group.clone(),
254            local_name,
255            net_send_tx,
256            net_recv_rx,
257            config.svs,
258        );
259
260        let auto_fetch = config.auto_fetch;
261        let fetch_timeout = config.fetch_timeout;
262        let conn = Arc::new(conn);
263
264        // Network send pump: forward sync Interests to the router.
265        let conn_send = Arc::clone(&conn);
266        let cancel_send = cancel.clone();
267        tokio::spawn(async move {
268            loop {
269                tokio::select! {
270                    _ = cancel_send.cancelled() => break,
271                    Some(pkt) = net_send_rx.recv() => {
272                        let _ = conn_send.send(pkt).await;
273                    }
274                }
275            }
276        });
277
278        // Network recv pump: read packets from the connection, forward sync
279        // Interests to the SVS task via net_recv_tx. Non-sync packets
280        // (Data responses for auto-fetch) are handled by the fetch path.
281        let conn_recv = Arc::clone(&conn);
282        let cancel_recv = cancel.clone();
283        tokio::spawn(async move {
284            loop {
285                tokio::select! {
286                    _ = cancel_recv.cancelled() => break,
287                    pkt = conn_recv.recv() => match pkt {
288                        Some(raw) => {
289                            // Simple heuristic: if the raw packet starts with
290                            // the sync group prefix, it's likely a sync Interest.
291                            // Forward everything to the sync task — it will
292                            // ignore what it doesn't understand.
293                            if raw.len() > 2 && raw.starts_with(&[0x05]) {
294                                // Interest type (0x05) — could be sync
295                                let _ = net_recv_tx.send(raw).await;
296                            }
297                            // Data packets (0x06) are consumed by fetch tasks
298                            // via separate recv calls and don't need routing here.
299                        }
300                        None => break,
301                    }
302                }
303            }
304        });
305
306        // Update processor: receive sync updates, optionally fetch data.
307        let conn_fetch = Arc::clone(&conn);
308        let task_cancel = cancel.clone();
309        tokio::spawn(async move {
310            loop {
311                tokio::select! {
312                    _ = task_cancel.cancelled() => break,
313                    Some(update) = sync_handle.recv() => {
314                        for seq in update.low_seq..=update.high_seq {
315                            let data_name = update.name.clone().append_segment(seq);
316                            let payload = if auto_fetch {
317                                fetch_data(&conn_fetch, &data_name, fetch_timeout).await
318                            } else {
319                                None
320                            };
321                            let sample = Sample {
322                                name: data_name,
323                                publisher: update.publisher.clone(),
324                                seq,
325                                payload,
326                            };
327                            if sample_tx.send(sample).await.is_err() {
328                                return;
329                            }
330                        }
331                    }
332                }
333            }
334        });
335
336        Ok(Self {
337            sample_rx,
338            _cancel: cancel,
339        })
340    }
341
342    /// Receive the next sample. Returns `None` when the subscription ends.
343    pub async fn recv(&mut self) -> Option<Sample> {
344        self.sample_rx.recv().await
345    }
346}
347
348/// Fetch a single Data object by expressing an Interest and waiting for a reply.
349///
350/// Returns `Some(content)` on success, `None` on timeout, decode failure,
351/// or missing content.
352async fn fetch_data(conn: &NdnConnection, name: &Name, timeout: Duration) -> Option<Bytes> {
353    let wire = encode_interest(name, None);
354    conn.send(wire).await.ok()?;
355    let reply = tokio::time::timeout(timeout, conn.recv()).await.ok()??;
356    // Decode to verify it's valid Data and extract content.
357    let data = Data::decode(reply).ok()?;
358    data.content().cloned()
359}