1use std::path::Path;
22use std::sync::Arc;
23use std::time::Duration;
24
25use bytes::Bytes;
26use tokio::sync::mpsc;
27
28use ndn_ipc::ForwarderClient;
29use ndn_packet::encode::encode_interest;
30use ndn_packet::{Data, Name};
31
32use crate::AppError;
33use crate::connection::NdnConnection;
34
35#[derive(Clone, Debug)]
37pub struct Sample {
38 pub name: Name,
40 pub publisher: String,
42 pub seq: u64,
44 pub payload: Option<Bytes>,
47}
48
49#[derive(Clone, Debug)]
51pub struct SubscriberConfig {
52 pub auto_fetch: bool,
54 pub fetch_timeout: Duration,
56 pub svs: ndn_sync::SvsConfig,
58}
59
60impl Default for SubscriberConfig {
61 fn default() -> Self {
62 Self {
63 auto_fetch: true,
64 fetch_timeout: Duration::from_secs(4),
65 svs: ndn_sync::SvsConfig::default(),
66 }
67 }
68}
69
70pub struct Subscriber {
74 sample_rx: mpsc::Receiver<Sample>,
75 _cancel: tokio_util::sync::CancellationToken,
76}
77
78impl Subscriber {
79 pub async fn connect(
84 socket: impl AsRef<Path>,
85 group_prefix: impl Into<Name>,
86 ) -> Result<Self, AppError> {
87 Self::connect_with_config(socket, group_prefix, SubscriberConfig::default()).await
88 }
89
90 pub async fn connect_with_config(
92 socket: impl AsRef<Path>,
93 group_prefix: impl Into<Name>,
94 config: SubscriberConfig,
95 ) -> Result<Self, AppError> {
96 let group = group_prefix.into();
97 let client = ForwarderClient::connect(socket)
98 .await
99 .map_err(AppError::Connection)?;
100 client
101 .register_prefix(&group)
102 .await
103 .map_err(AppError::Connection)?;
104
105 let local_name = group.clone().append(format!("node-{}", std::process::id()));
107
108 Self::run(NdnConnection::External(client), group, local_name, config)
109 }
110
111 pub async fn connect_psync(
116 socket: impl AsRef<Path>,
117 group_prefix: impl Into<Name>,
118 ) -> Result<Self, AppError> {
119 Self::connect_psync_with_config(socket, group_prefix, ndn_sync::PSyncConfig::default())
120 .await
121 }
122
123 pub async fn connect_psync_with_config(
125 socket: impl AsRef<Path>,
126 group_prefix: impl Into<Name>,
127 psync_config: ndn_sync::PSyncConfig,
128 ) -> Result<Self, AppError> {
129 let group = group_prefix.into();
130 let client = ForwarderClient::connect(socket)
131 .await
132 .map_err(AppError::Connection)?;
133 client
134 .register_prefix(&group)
135 .await
136 .map_err(AppError::Connection)?;
137
138 let local_name = group.clone().append(format!("node-{}", std::process::id()));
139 Self::run_psync(
140 NdnConnection::External(client),
141 group,
142 local_name,
143 psync_config,
144 )
145 }
146
147 pub fn from_connection(
149 conn: NdnConnection,
150 group: Name,
151 local_name: Name,
152 config: SubscriberConfig,
153 ) -> Result<Self, AppError> {
154 Self::run(conn, group, local_name, config)
155 }
156
157 fn run_psync(
159 conn: NdnConnection,
160 group: Name,
161 local_name: Name,
162 psync_config: ndn_sync::PSyncConfig,
163 ) -> Result<Self, AppError> {
164 let _ = local_name; let cancel = tokio_util::sync::CancellationToken::new();
166 let capacity = psync_config.channel_capacity;
167 let (sample_tx, sample_rx) = mpsc::channel(capacity);
168
169 let (net_send_tx, mut net_send_rx) = mpsc::channel::<Bytes>(64);
170 let (net_recv_tx, net_recv_rx) = mpsc::channel::<Bytes>(64);
171
172 let mut sync_handle =
173 ndn_sync::join_psync_group(group.clone(), net_send_tx, net_recv_rx, psync_config);
174
175 let conn = Arc::new(conn);
176
177 let conn_send = Arc::clone(&conn);
178 let cancel_send = cancel.clone();
179 tokio::spawn(async move {
180 loop {
181 tokio::select! {
182 _ = cancel_send.cancelled() => break,
183 Some(pkt) = net_send_rx.recv() => { let _ = conn_send.send(pkt).await; }
184 }
185 }
186 });
187
188 let conn_recv = Arc::clone(&conn);
189 let cancel_recv = cancel.clone();
190 tokio::spawn(async move {
191 loop {
192 tokio::select! {
193 _ = cancel_recv.cancelled() => break,
194 pkt = conn_recv.recv() => match pkt {
195 Some(raw) => { if raw.first() == Some(&0x05) { let _ = net_recv_tx.send(raw).await; } }
196 None => break,
197 }
198 }
199 }
200 });
201
202 let conn_fetch = Arc::clone(&conn);
203 let task_cancel = cancel.clone();
204 tokio::spawn(async move {
205 loop {
206 tokio::select! {
207 _ = task_cancel.cancelled() => break,
208 Some(update) = sync_handle.recv() => {
209 for seq in update.low_seq..=update.high_seq {
210 let data_name = update.name.clone().append_segment(seq);
211 let payload = fetch_data(&conn_fetch, &data_name, Duration::from_secs(4)).await;
212 let sample = Sample {
213 name: data_name,
214 publisher: update.publisher.clone(),
215 seq,
216 payload,
217 };
218 if sample_tx.send(sample).await.is_err() { return; }
219 }
220 }
221 }
222 }
223 });
224
225 Ok(Self {
226 sample_rx,
227 _cancel: cancel,
228 })
229 }
230
231 fn run(
239 conn: NdnConnection,
240 group: Name,
241 local_name: Name,
242 config: SubscriberConfig,
243 ) -> Result<Self, AppError> {
244 let cancel = tokio_util::sync::CancellationToken::new();
245 let (sample_tx, sample_rx) = mpsc::channel(config.svs.channel_capacity);
246
247 let (net_send_tx, mut net_send_rx) = mpsc::channel::<Bytes>(64);
249 let (net_recv_tx, net_recv_rx) = mpsc::channel::<Bytes>(64);
250
251 let mut sync_handle = ndn_sync::join_svs_group(
253 group.clone(),
254 local_name,
255 net_send_tx,
256 net_recv_rx,
257 config.svs,
258 );
259
260 let auto_fetch = config.auto_fetch;
261 let fetch_timeout = config.fetch_timeout;
262 let conn = Arc::new(conn);
263
264 let conn_send = Arc::clone(&conn);
266 let cancel_send = cancel.clone();
267 tokio::spawn(async move {
268 loop {
269 tokio::select! {
270 _ = cancel_send.cancelled() => break,
271 Some(pkt) = net_send_rx.recv() => {
272 let _ = conn_send.send(pkt).await;
273 }
274 }
275 }
276 });
277
278 let conn_recv = Arc::clone(&conn);
282 let cancel_recv = cancel.clone();
283 tokio::spawn(async move {
284 loop {
285 tokio::select! {
286 _ = cancel_recv.cancelled() => break,
287 pkt = conn_recv.recv() => match pkt {
288 Some(raw) => {
289 if raw.len() > 2 && raw.starts_with(&[0x05]) {
294 let _ = net_recv_tx.send(raw).await;
296 }
297 }
300 None => break,
301 }
302 }
303 }
304 });
305
306 let conn_fetch = Arc::clone(&conn);
308 let task_cancel = cancel.clone();
309 tokio::spawn(async move {
310 loop {
311 tokio::select! {
312 _ = task_cancel.cancelled() => break,
313 Some(update) = sync_handle.recv() => {
314 for seq in update.low_seq..=update.high_seq {
315 let data_name = update.name.clone().append_segment(seq);
316 let payload = if auto_fetch {
317 fetch_data(&conn_fetch, &data_name, fetch_timeout).await
318 } else {
319 None
320 };
321 let sample = Sample {
322 name: data_name,
323 publisher: update.publisher.clone(),
324 seq,
325 payload,
326 };
327 if sample_tx.send(sample).await.is_err() {
328 return;
329 }
330 }
331 }
332 }
333 }
334 });
335
336 Ok(Self {
337 sample_rx,
338 _cancel: cancel,
339 })
340 }
341
342 pub async fn recv(&mut self) -> Option<Sample> {
344 self.sample_rx.recv().await
345 }
346}
347
348async fn fetch_data(conn: &NdnConnection, name: &Name, timeout: Duration) -> Option<Bytes> {
353 let wire = encode_interest(name, None);
354 conn.send(wire).await.ok()?;
355 let reply = tokio::time::timeout(timeout, conn.recv()).await.ok()??;
356 let data = Data::decode(reply).ok()?;
358 data.content().cloned()
359}