ndn_sync/protocol.rs
1//! Sync protocol trait — abstraction over SVS, PSync, etc.
2//!
3//! Consumers don't choose a sync protocol directly; they subscribe to a
4//! group prefix and the runtime picks the appropriate protocol.
5
6use std::fmt;
7
8use bytes::Bytes;
9use ndn_packet::Name;
10
11/// A notification that new data is available from a peer.
12#[derive(Clone, Debug)]
13pub struct SyncUpdate {
14 /// The peer that published new data.
15 pub publisher: String,
16 /// Name prefix under which the new data can be fetched.
17 pub name: Name,
18 /// Sequence range of new publications: [low, high] inclusive.
19 pub low_seq: u64,
20 pub high_seq: u64,
21 /// Optional mapping metadata from the publisher (ndnSVS `MappingData`).
22 ///
23 /// Present when the peer called [`SyncHandle::publish_with_mapping`]. The
24 /// bytes are application-defined; a common convention is to encode a content
25 /// `Name` TLV (type 7) so the consumer can fetch the named object directly
26 /// without constructing the name from the sequence number.
27 pub mapping: Option<Bytes>,
28}
29
30impl fmt::Display for SyncUpdate {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 if self.low_seq == self.high_seq {
33 write!(f, "{}#{}", self.name, self.low_seq)
34 } else {
35 write!(f, "{}#{}..{}", self.name, self.low_seq, self.high_seq)
36 }
37 }
38}
39
40/// Error type for sync protocol operations.
41#[derive(Debug, thiserror::Error)]
42pub enum SyncError {
43 #[error("sync I/O error: {0}")]
44 Io(#[from] std::io::Error),
45 #[error("connection lost")]
46 Disconnected,
47 #[error("protocol error: {0}")]
48 Protocol(String),
49}
50
51/// Handle to a running sync group.
52///
53/// Returned by [`SyncProtocol::join`]. Provides a channel for receiving
54/// updates and a method for announcing local publications.
55pub struct SyncHandle {
56 /// Receive sync updates (new data available from peers).
57 pub rx: tokio::sync::mpsc::Receiver<SyncUpdate>,
58 /// Send local publications into the sync group.
59 /// Each message is `(publication_name, optional_mapping_bytes)`.
60 pub tx: tokio::sync::mpsc::Sender<(Name, Option<Bytes>)>,
61 /// Cancel the sync background task.
62 cancel: tokio_util::sync::CancellationToken,
63}
64
65impl SyncHandle {
66 pub fn new(
67 rx: tokio::sync::mpsc::Receiver<SyncUpdate>,
68 tx: tokio::sync::mpsc::Sender<(Name, Option<Bytes>)>,
69 cancel: tokio_util::sync::CancellationToken,
70 ) -> Self {
71 Self { rx, tx, cancel }
72 }
73
74 /// Receive the next sync update. Returns `None` when the group is closed.
75 pub async fn recv(&mut self) -> Option<SyncUpdate> {
76 self.rx.recv().await
77 }
78
79 /// Announce that we published new data under `name`.
80 pub async fn publish(&self, name: Name) -> Result<(), SyncError> {
81 self.tx
82 .send((name, None))
83 .await
84 .map_err(|_| SyncError::Disconnected)
85 }
86
87 /// Announce a publication and attach mapping metadata for peers.
88 ///
89 /// The `mapping` bytes are forwarded to peers in the `MappingData` TLV
90 /// carried in the next Sync Interest. Peers receive it as
91 /// [`SyncUpdate::mapping`] so they can fast-path content fetching without
92 /// constructing names from sequence numbers.
93 ///
94 /// A common convention is to pass a Name TLV (type 7 + length + components)
95 /// so the consumer can directly fetch the named object.
96 pub async fn publish_with_mapping(&self, name: Name, mapping: Bytes) -> Result<(), SyncError> {
97 self.tx
98 .send((name, Some(mapping)))
99 .await
100 .map_err(|_| SyncError::Disconnected)
101 }
102
103 /// Leave the sync group.
104 pub fn leave(self) {
105 self.cancel.cancel();
106 }
107}
108
109impl Drop for SyncHandle {
110 fn drop(&mut self) {
111 self.cancel.cancel();
112 }
113}