1use std::time::Duration;
20
21use bytes::{Buf, BufMut, Bytes, BytesMut};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use ndn_packet::Name;
26use ndn_packet::encode::InterestBuilder;
27
28use crate::protocol::{SyncHandle, SyncUpdate};
29use crate::psync::{Ibf, PSyncNode};
30
31#[derive(Clone, Debug)]
33pub struct PSyncConfig {
34 pub sync_interval: Duration,
36 pub jitter_ms: u64,
38 pub ibf_size: usize,
40 pub channel_capacity: usize,
42}
43
44impl Default for PSyncConfig {
45 fn default() -> Self {
46 Self {
47 sync_interval: Duration::from_secs(1),
48 jitter_ms: 200,
49 ibf_size: 80,
50 channel_capacity: 256,
51 }
52 }
53}
54
55pub fn join_psync_group(
69 group: Name,
70 send: mpsc::Sender<Bytes>,
71 recv: mpsc::Receiver<Bytes>,
72 config: PSyncConfig,
73) -> SyncHandle {
74 let cancel = CancellationToken::new();
75 let (update_tx, update_rx) = mpsc::channel(config.channel_capacity);
76 let (publish_tx, publish_rx) = mpsc::channel(64);
77
78 let task_cancel = cancel.clone();
79 tokio::spawn(async move {
80 psync_task(
81 group,
82 send,
83 recv,
84 publish_rx,
85 update_tx,
86 config,
87 task_cancel,
88 )
89 .await;
90 });
91
92 SyncHandle::new(update_rx, publish_tx, cancel)
93}
94
95async fn psync_task(
101 group: Name,
102 send: mpsc::Sender<Bytes>,
103 mut recv: mpsc::Receiver<Bytes>,
104 mut publish_rx: mpsc::Receiver<(Name, Option<bytes::Bytes>)>,
105 update_tx: mpsc::Sender<SyncUpdate>,
106 config: PSyncConfig,
107 cancel: CancellationToken,
108) {
109 let mut node = PSyncNode::new(config.ibf_size);
110
111 loop {
112 let jitter = Duration::from_millis(fastrand::u64(0..=config.jitter_ms));
113 let interval = config.sync_interval + jitter;
114
115 tokio::select! {
116 _ = cancel.cancelled() => break,
117
118 _ = tokio::time::sleep(interval) => {
119 let ibf = node.build_ibf();
121 let ibf_bytes = encode_ibf(&ibf);
122 let sync_name = group.clone()
123 .append("psync")
124 .append(ibf_bytes);
125 let wire = InterestBuilder::new(sync_name)
126 .lifetime(Duration::from_millis(1000))
127 .build();
128 let _ = send.send(wire).await;
129 }
130
131 Some(raw) = recv.recv() => {
132 if raw.len() > 2 && raw[0] == 0x06 {
134 if let Some(hashes) = parse_sync_data(&raw) {
136 for hash in hashes {
137 let update = SyncUpdate {
138 publisher: format!("{hash:016x}"),
139 name: group.clone().append(format!("{hash:016x}")),
140 low_seq: 0,
141 high_seq: 0,
142 mapping: None,
143 };
144 let _ = update_tx.send(update).await;
145 }
146 }
147 } else if raw.len() > 2 && raw[0] == 0x05 {
148 if let Some(peer_ibf) = parse_sync_interest(&group, &raw)
150 && let Some((we_have, _they_have)) = node.reconcile(&peer_ibf)
151 && !we_have.is_empty()
152 {
153 let data_bytes = encode_hash_set(&we_have);
154 let _ = send.send(data_bytes).await;
155 }
156 }
157 }
158
159 Some((pub_name, _mapping)) = publish_rx.recv() => {
160 let hash = hash_name(&pub_name);
163 node.insert(hash);
164 let ibf = node.build_ibf();
166 let ibf_bytes = encode_ibf(&ibf);
167 let sync_name = group.clone()
168 .append("psync")
169 .append(ibf_bytes);
170 let wire = InterestBuilder::new(sync_name)
171 .lifetime(Duration::from_millis(1000))
172 .build();
173 let _ = send.send(wire).await;
174 }
175 }
176 }
177}
178
179fn encode_ibf(ibf: &Ibf) -> Bytes {
183 let cells = ibf.cells();
184 let mut buf = BytesMut::with_capacity(cells.len() * 24);
185 for cell in cells {
186 buf.put_u64(cell.0); buf.put_u64(cell.1); buf.put_i64(cell.2); }
190 buf.freeze()
191}
192
193fn decode_ibf(data: &[u8], ibf_size: usize) -> Option<Ibf> {
195 if data.len() < ibf_size * 24 {
196 return None;
197 }
198 let mut cursor = data;
199 let mut cells = Vec::with_capacity(ibf_size);
200 for _ in 0..ibf_size {
201 let xor_sum = cursor.get_u64();
202 let hash_sum = cursor.get_u64();
203 let count = cursor.get_i64();
204 cells.push((xor_sum, hash_sum, count));
205 }
206 Some(Ibf::from_cells(cells))
207}
208
209fn encode_hash_set(hashes: &std::collections::HashSet<u64>) -> Bytes {
211 let mut buf = BytesMut::with_capacity(hashes.len() * 8);
212 for &h in hashes {
213 buf.put_u64(h);
214 }
215 buf.freeze()
216}
217
218fn parse_sync_interest(group: &Name, raw: &[u8]) -> Option<Ibf> {
220 let interest = ndn_packet::Interest::decode(Bytes::copy_from_slice(raw)).ok()?;
221 let components = interest.name.components();
222
223 let group_len = group.components().len();
224 if components.len() < group_len + 2 {
225 return None;
226 }
227
228 let psync_comp = &components[group_len];
229 if psync_comp.value.as_ref() != b"psync" {
230 return None;
231 }
232
233 let ibf_comp = &components[group_len + 1];
234 let ibf_size = ibf_comp.value.len() / 24;
236 if ibf_size == 0 {
237 return None;
238 }
239
240 decode_ibf(&ibf_comp.value, ibf_size)
241}
242
243fn parse_sync_data(raw: &[u8]) -> Option<Vec<u64>> {
245 let data = ndn_packet::Data::decode(Bytes::copy_from_slice(raw)).ok()?;
246 let content = data.content()?;
247 let mut hashes = Vec::new();
248 let mut cursor = content.as_ref();
249 while cursor.remaining() >= 8 {
250 hashes.push(cursor.get_u64());
251 }
252 Some(hashes)
253}
254
255fn hash_name(name: &Name) -> u64 {
261 let mut h: u64 = 0xcbf29ce484222325;
262 for comp in name.components() {
263 for b in comp.value.iter() {
264 h ^= *b as u64;
265 h = h.wrapping_mul(0x100000001b3);
266 }
267 h ^= 0xff;
269 h = h.wrapping_mul(0x100000001b3);
270 }
271 h
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn ibf_encode_decode_roundtrip() {
280 let mut ibf = Ibf::new(8, 3);
281 ibf.insert(42);
282 ibf.insert(99);
283 let encoded = encode_ibf(&ibf);
284 let decoded = decode_ibf(&encoded, 8).unwrap();
285 let diff = ibf.subtract(&decoded);
287 let (a, b) = diff.decode().unwrap();
288 assert!(a.is_empty());
289 assert!(b.is_empty());
290 }
291
292 #[test]
293 fn hash_name_distinct() {
294 let a: Name = "/a/b".parse().unwrap();
295 let b: Name = "/a/c".parse().unwrap();
296 assert_ne!(hash_name(&a), hash_name(&b));
297 }
298
299 #[tokio::test]
300 async fn join_and_leave() {
301 let (send_tx, _send_rx) = mpsc::channel(16);
302 let (_recv_tx, recv_rx) = mpsc::channel(16);
303
304 let group: Name = "/test/psync".parse().unwrap();
305 let handle = join_psync_group(group, send_tx, recv_rx, PSyncConfig::default());
306 handle.leave();
307 }
308}