ndn_sync/
protocol.rs

1//! Sync protocol trait — abstraction over SVS, PSync, etc.
2//!
3//! Consumers don't choose a sync protocol directly; they subscribe to a
4//! group prefix and the runtime picks the appropriate protocol.
5
6use std::fmt;
7
8use bytes::Bytes;
9use ndn_packet::Name;
10
11/// A notification that new data is available from a peer.
12#[derive(Clone, Debug)]
13pub struct SyncUpdate {
14    /// The peer that published new data.
15    pub publisher: String,
16    /// Name prefix under which the new data can be fetched.
17    pub name: Name,
18    /// Sequence range of new publications: [low, high] inclusive.
19    pub low_seq: u64,
20    pub high_seq: u64,
21    /// Optional mapping metadata from the publisher (ndnSVS `MappingData`).
22    ///
23    /// Present when the peer called [`SyncHandle::publish_with_mapping`]. The
24    /// bytes are application-defined; a common convention is to encode a content
25    /// `Name` TLV (type 7) so the consumer can fetch the named object directly
26    /// without constructing the name from the sequence number.
27    pub mapping: Option<Bytes>,
28}
29
30impl fmt::Display for SyncUpdate {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        if self.low_seq == self.high_seq {
33            write!(f, "{}#{}", self.name, self.low_seq)
34        } else {
35            write!(f, "{}#{}..{}", self.name, self.low_seq, self.high_seq)
36        }
37    }
38}
39
40/// Error type for sync protocol operations.
41#[derive(Debug, thiserror::Error)]
42pub enum SyncError {
43    #[error("sync I/O error: {0}")]
44    Io(#[from] std::io::Error),
45    #[error("connection lost")]
46    Disconnected,
47    #[error("protocol error: {0}")]
48    Protocol(String),
49}
50
51/// Handle to a running sync group.
52///
53/// Returned by [`SyncProtocol::join`].  Provides a channel for receiving
54/// updates and a method for announcing local publications.
55pub struct SyncHandle {
56    /// Receive sync updates (new data available from peers).
57    pub rx: tokio::sync::mpsc::Receiver<SyncUpdate>,
58    /// Send local publications into the sync group.
59    /// Each message is `(publication_name, optional_mapping_bytes)`.
60    pub tx: tokio::sync::mpsc::Sender<(Name, Option<Bytes>)>,
61    /// Cancel the sync background task.
62    cancel: tokio_util::sync::CancellationToken,
63}
64
65impl SyncHandle {
66    pub fn new(
67        rx: tokio::sync::mpsc::Receiver<SyncUpdate>,
68        tx: tokio::sync::mpsc::Sender<(Name, Option<Bytes>)>,
69        cancel: tokio_util::sync::CancellationToken,
70    ) -> Self {
71        Self { rx, tx, cancel }
72    }
73
74    /// Receive the next sync update. Returns `None` when the group is closed.
75    pub async fn recv(&mut self) -> Option<SyncUpdate> {
76        self.rx.recv().await
77    }
78
79    /// Announce that we published new data under `name`.
80    pub async fn publish(&self, name: Name) -> Result<(), SyncError> {
81        self.tx
82            .send((name, None))
83            .await
84            .map_err(|_| SyncError::Disconnected)
85    }
86
87    /// Announce a publication and attach mapping metadata for peers.
88    ///
89    /// The `mapping` bytes are forwarded to peers in the `MappingData` TLV
90    /// carried in the next Sync Interest. Peers receive it as
91    /// [`SyncUpdate::mapping`] so they can fast-path content fetching without
92    /// constructing names from sequence numbers.
93    ///
94    /// A common convention is to pass a Name TLV (type 7 + length + components)
95    /// so the consumer can directly fetch the named object.
96    pub async fn publish_with_mapping(&self, name: Name, mapping: Bytes) -> Result<(), SyncError> {
97        self.tx
98            .send((name, Some(mapping)))
99            .await
100            .map_err(|_| SyncError::Disconnected)
101    }
102
103    /// Leave the sync group.
104    pub fn leave(self) {
105        self.cancel.cancel();
106    }
107}
108
109impl Drop for SyncHandle {
110    fn drop(&mut self) {
111        self.cancel.cancel();
112    }
113}