ndn_sync/
psync_sync.rs

1//! PSync network protocol — wires `PSyncNode` + `Ibf` to Interest/Data exchange.
2//!
3//! # Wire format
4//!
5//! **Sync Interest:** `/<group-prefix>/psync/<ibf-encoded>`
6//!
7//! The IBF is encoded as a series of `<xor_sum:8><hash_sum:8><count:8>` triples.
8//!
9//! **Sync Data:** response carries the list of name hashes the responder has
10//! that the requester lacks, encoded as concatenated `<hash:8>` values.
11//!
12//! # Protocol flow
13//!
14//! 1. Periodically send a Sync Interest carrying the local IBF.
15//! 2. When a peer receives it, subtract against local IBF, decode the difference.
16//! 3. Reply with Data containing hashes the requester is missing.
17//! 4. On receiving the Data, emit `SyncUpdate` for each missing hash.
18
19use std::time::Duration;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use ndn_packet::Name;
26use ndn_packet::encode::InterestBuilder;
27
28use crate::protocol::{SyncHandle, SyncUpdate};
29use crate::psync::{Ibf, PSyncNode};
30
31/// Configuration for a PSync group.
32#[derive(Clone, Debug)]
33pub struct PSyncConfig {
34    /// Sync Interest interval (default: 1 second).
35    pub sync_interval: Duration,
36    /// Jitter range in ms (default: 200).
37    pub jitter_ms: u64,
38    /// IBF size (default: 80 cells).
39    pub ibf_size: usize,
40    /// Channel capacity for update notifications (default: 256).
41    pub channel_capacity: usize,
42}
43
44impl Default for PSyncConfig {
45    fn default() -> Self {
46        Self {
47            sync_interval: Duration::from_secs(1),
48            jitter_ms: 200,
49            ibf_size: 80,
50            channel_capacity: 256,
51        }
52    }
53}
54
55/// Join a PSync group.
56///
57/// Spawns a background task that:
58/// 1. Periodically sends Sync Interests carrying the local IBF
59/// 2. Processes incoming Sync Interests (subtracts IBFs, replies with diff)
60/// 3. Processes incoming Sync Data (emits `SyncUpdate` for missing hashes)
61///
62/// # Arguments
63///
64/// * `group` — sync group prefix (e.g. `/ndn/psync/chat`)
65/// * `send` — channel to send outgoing packets (Interests and Data)
66/// * `recv` — channel to receive incoming packets from the network
67/// * `config` — PSync configuration
68pub fn join_psync_group(
69    group: Name,
70    send: mpsc::Sender<Bytes>,
71    recv: mpsc::Receiver<Bytes>,
72    config: PSyncConfig,
73) -> SyncHandle {
74    let cancel = CancellationToken::new();
75    let (update_tx, update_rx) = mpsc::channel(config.channel_capacity);
76    let (publish_tx, publish_rx) = mpsc::channel(64);
77
78    let task_cancel = cancel.clone();
79    tokio::spawn(async move {
80        psync_task(
81            group,
82            send,
83            recv,
84            publish_rx,
85            update_tx,
86            config,
87            task_cancel,
88        )
89        .await;
90    });
91
92    SyncHandle::new(update_rx, publish_tx, cancel)
93}
94
95/// Background task driving the PSync protocol loop.
96///
97/// On each tick: send a Sync Interest with the local IBF. On incoming
98/// Interest: subtract IBFs, decode the difference, reply with missing hashes.
99/// On incoming Data: emit `SyncUpdate` for each missing hash.
100async fn psync_task(
101    group: Name,
102    send: mpsc::Sender<Bytes>,
103    mut recv: mpsc::Receiver<Bytes>,
104    mut publish_rx: mpsc::Receiver<(Name, Option<bytes::Bytes>)>,
105    update_tx: mpsc::Sender<SyncUpdate>,
106    config: PSyncConfig,
107    cancel: CancellationToken,
108) {
109    let mut node = PSyncNode::new(config.ibf_size);
110
111    loop {
112        let jitter = Duration::from_millis(fastrand::u64(0..=config.jitter_ms));
113        let interval = config.sync_interval + jitter;
114
115        tokio::select! {
116            _ = cancel.cancelled() => break,
117
118            _ = tokio::time::sleep(interval) => {
119                // Send Sync Interest with local IBF.
120                let ibf = node.build_ibf();
121                let ibf_bytes = encode_ibf(&ibf);
122                let sync_name = group.clone()
123                    .append("psync")
124                    .append(ibf_bytes);
125                let wire = InterestBuilder::new(sync_name)
126                    .lifetime(Duration::from_millis(1000))
127                    .build();
128                let _ = send.send(wire).await;
129            }
130
131            Some(raw) = recv.recv() => {
132                // Could be a Sync Interest (IBF from peer) or Sync Data (diff response).
133                if raw.len() > 2 && raw[0] == 0x06 {
134                    // Data packet — contains hashes we're missing.
135                    if let Some(hashes) = parse_sync_data(&raw) {
136                        for hash in hashes {
137                            let update = SyncUpdate {
138                                publisher: format!("{hash:016x}"),
139                                name: group.clone().append(format!("{hash:016x}")),
140                                low_seq: 0,
141                                high_seq: 0,
142                                mapping: None,
143                            };
144                            let _ = update_tx.send(update).await;
145                        }
146                    }
147                } else if raw.len() > 2 && raw[0] == 0x05 {
148                    // Interest — peer's IBF. Subtract, decode, reply with diff.
149                    if let Some(peer_ibf) = parse_sync_interest(&group, &raw)
150                        && let Some((we_have, _they_have)) = node.reconcile(&peer_ibf)
151                        && !we_have.is_empty()
152                    {
153                        let data_bytes = encode_hash_set(&we_have);
154                        let _ = send.send(data_bytes).await;
155                    }
156                }
157            }
158
159            Some((pub_name, _mapping)) = publish_rx.recv() => {
160                // Local publication: hash the name and insert into the IBF.
161                // Mapping metadata is a SVS-only feature; PSync ignores it.
162                let hash = hash_name(&pub_name);
163                node.insert(hash);
164                // Immediately send updated IBF.
165                let ibf = node.build_ibf();
166                let ibf_bytes = encode_ibf(&ibf);
167                let sync_name = group.clone()
168                    .append("psync")
169                    .append(ibf_bytes);
170                let wire = InterestBuilder::new(sync_name)
171                    .lifetime(Duration::from_millis(1000))
172                    .build();
173                let _ = send.send(wire).await;
174            }
175        }
176    }
177}
178
179// ─── Wire encoding helpers ─────────────────────────────────────────────────
180
181/// Encode an IBF as concatenated `<xor_sum:8><hash_sum:8><count:8>` triples.
182fn encode_ibf(ibf: &Ibf) -> Bytes {
183    let cells = ibf.cells();
184    let mut buf = BytesMut::with_capacity(cells.len() * 24);
185    for cell in cells {
186        buf.put_u64(cell.0); // xor_sum
187        buf.put_u64(cell.1); // hash_sum
188        buf.put_i64(cell.2); // count
189    }
190    buf.freeze()
191}
192
193/// Decode an IBF from wire bytes.
194fn decode_ibf(data: &[u8], ibf_size: usize) -> Option<Ibf> {
195    if data.len() < ibf_size * 24 {
196        return None;
197    }
198    let mut cursor = data;
199    let mut cells = Vec::with_capacity(ibf_size);
200    for _ in 0..ibf_size {
201        let xor_sum = cursor.get_u64();
202        let hash_sum = cursor.get_u64();
203        let count = cursor.get_i64();
204        cells.push((xor_sum, hash_sum, count));
205    }
206    Some(Ibf::from_cells(cells))
207}
208
209/// Encode a set of hashes as concatenated `<hash:8>` values.
210fn encode_hash_set(hashes: &std::collections::HashSet<u64>) -> Bytes {
211    let mut buf = BytesMut::with_capacity(hashes.len() * 8);
212    for &h in hashes {
213        buf.put_u64(h);
214    }
215    buf.freeze()
216}
217
218/// Parse a Sync Interest: verify prefix, extract peer IBF.
219fn parse_sync_interest(group: &Name, raw: &[u8]) -> Option<Ibf> {
220    let interest = ndn_packet::Interest::decode(Bytes::copy_from_slice(raw)).ok()?;
221    let components = interest.name.components();
222
223    let group_len = group.components().len();
224    if components.len() < group_len + 2 {
225        return None;
226    }
227
228    let psync_comp = &components[group_len];
229    if psync_comp.value.as_ref() != b"psync" {
230        return None;
231    }
232
233    let ibf_comp = &components[group_len + 1];
234    // Infer IBF size from the component length.
235    let ibf_size = ibf_comp.value.len() / 24;
236    if ibf_size == 0 {
237        return None;
238    }
239
240    decode_ibf(&ibf_comp.value, ibf_size)
241}
242
243/// Parse a Sync Data response: extract list of hashes we're missing.
244fn parse_sync_data(raw: &[u8]) -> Option<Vec<u64>> {
245    let data = ndn_packet::Data::decode(Bytes::copy_from_slice(raw)).ok()?;
246    let content = data.content()?;
247    let mut hashes = Vec::new();
248    let mut cursor = content.as_ref();
249    while cursor.remaining() >= 8 {
250        hashes.push(cursor.get_u64());
251    }
252    Some(hashes)
253}
254
255/// Hash an NDN name to a `u64` for IBF insertion.
256///
257/// Uses FNV-1a over the concatenated component values, with a `0xFF`
258/// separator between components to avoid collisions between names
259/// like `/a/bc` and `/ab/c`.
260fn hash_name(name: &Name) -> u64 {
261    let mut h: u64 = 0xcbf29ce484222325;
262    for comp in name.components() {
263        for b in comp.value.iter() {
264            h ^= *b as u64;
265            h = h.wrapping_mul(0x100000001b3);
266        }
267        // Separator to distinguish /a/bc from /ab/c.
268        h ^= 0xff;
269        h = h.wrapping_mul(0x100000001b3);
270    }
271    h
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn ibf_encode_decode_roundtrip() {
280        let mut ibf = Ibf::new(8, 3);
281        ibf.insert(42);
282        ibf.insert(99);
283        let encoded = encode_ibf(&ibf);
284        let decoded = decode_ibf(&encoded, 8).unwrap();
285        // Subtracting should yield zero diff.
286        let diff = ibf.subtract(&decoded);
287        let (a, b) = diff.decode().unwrap();
288        assert!(a.is_empty());
289        assert!(b.is_empty());
290    }
291
292    #[test]
293    fn hash_name_distinct() {
294        let a: Name = "/a/b".parse().unwrap();
295        let b: Name = "/a/c".parse().unwrap();
296        assert_ne!(hash_name(&a), hash_name(&b));
297    }
298
299    #[tokio::test]
300    async fn join_and_leave() {
301        let (send_tx, _send_rx) = mpsc::channel(16);
302        let (_recv_tx, recv_rx) = mpsc::channel(16);
303
304        let group: Name = "/test/psync".parse().unwrap();
305        let handle = join_psync_group(group, send_tx, recv_rx, PSyncConfig::default());
306        handle.leave();
307    }
308}