ndn_store/
pit.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::sync::Arc;
4
5#[cfg(not(target_arch = "wasm32"))]
6use dashmap::DashMap;
7use smallvec::SmallVec;
8
9use ndn_packet::{Name, NameComponent, Selector};
10
11/// Pre-computed cumulative name-prefix hashes.
12///
13/// Computed once at TLV decode and reused for all PIT lookups (check, match,
14/// nack) without re-hashing name components on every probe.
15///
16/// Inspired by NDN-DPDK's memoized name hashing: the FIB/PIT dispatch layer
17/// hashes the name once at RX and reuses the hash for NDT, PCCT, and FIB
18/// lookups.
19///
20/// Ref: Shi et al., "NDN-DPDK: NDN Forwarding at 100 Gbps on Commodity
21/// Hardware" (ACM ICN 2020), §3.1.
22#[derive(Clone, Debug)]
23pub struct NameHashes {
24    /// `prefix_hashes[i]` is the hash of the first `i+1` name components.
25    pub prefix_hashes: SmallVec<[u64; 8]>,
26}
27
28/// FxHash-style multiplier for order-dependent accumulation.
29const HASH_MIX: u64 = 0x517cc1b727220a95;
30
31impl NameHashes {
32    /// Compute cumulative prefix hashes for all prefixes of `name`.
33    pub fn compute(name: &Name) -> Self {
34        Self::from_components(name.components())
35    }
36
37    /// Compute from a component slice (avoids requiring a `Name` value).
38    pub fn from_components(components: &[NameComponent]) -> Self {
39        let mut prefix_hashes = SmallVec::with_capacity(components.len());
40        let mut state: u64 = 0;
41        for comp in components {
42            state = Self::accumulate(state, comp);
43            prefix_hashes.push(state);
44        }
45        Self { prefix_hashes }
46    }
47
48    /// Hash of the full name (all components).
49    pub fn full_hash(&self) -> u64 {
50        self.prefix_hashes.last().copied().unwrap_or(0)
51    }
52
53    /// Hash of the first `n` components. Returns 0 for the root name (n=0).
54    pub fn prefix_hash(&self, n: usize) -> u64 {
55        if n == 0 { 0 } else { self.prefix_hashes[n - 1] }
56    }
57
58    pub fn len(&self) -> usize {
59        self.prefix_hashes.len()
60    }
61
62    pub fn is_empty(&self) -> bool {
63        self.prefix_hashes.is_empty()
64    }
65
66    /// Compute the full-name hash without storing intermediates.
67    pub fn full_name_hash(name: &Name) -> u64 {
68        let mut state: u64 = 0;
69        for comp in name.components() {
70            state = Self::accumulate(state, comp);
71        }
72        state
73    }
74
75    fn accumulate(state: u64, comp: &NameComponent) -> u64 {
76        let mut h = DefaultHasher::new();
77        comp.hash(&mut h);
78        let comp_hash = h.finish();
79        state.wrapping_mul(HASH_MIX).wrapping_add(comp_hash)
80    }
81}
82
83/// A stable, cheaply-copyable reference to a PIT entry.
84///
85/// Computed as a hash of (Name, Option<Selector>) — safe to copy across tasks
86/// and `await` points without lifetime concerns.
87///
88/// Internally uses a two-phase hash: name components are hashed into a single
89/// `u64` via [`NameHashes`], then combined with selector and forwarding-hint
90/// hashes.  This lets [`NameHashes`] pre-compute the name hash once at decode
91/// and reuse it across all PIT probes — eliminating per-probe re-hashing in
92/// `PitMatchStage` (which may probe 5 + 3*(N-1) combinations).
93#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
94pub struct PitToken(pub u64);
95
96impl PitToken {
97    /// Build a PIT token from Interest fields.
98    ///
99    /// Per RFC 8569 §4.2, PIT aggregation uses (Name, Selectors,
100    /// ForwardingHint) as the key.
101    pub fn from_interest(name: &Name, selector: Option<&Selector>) -> Self {
102        Self::from_interest_full(name, selector, None)
103    }
104
105    /// Build a PIT token including ForwardingHint for correct aggregation.
106    pub fn from_interest_full(
107        name: &Name,
108        selector: Option<&Selector>,
109        forwarding_hint: Option<&[Arc<Name>]>,
110    ) -> Self {
111        let name_hash = NameHashes::full_name_hash(name);
112        Self::from_name_hash(name_hash, selector, forwarding_hint)
113    }
114
115    /// Build a PIT token from a pre-computed name hash.
116    ///
117    /// Use with [`NameHashes::full_hash`] or [`NameHashes::prefix_hash`] to
118    /// avoid re-hashing name components on every probe.
119    pub fn from_name_hash(
120        name_hash: u64,
121        selector: Option<&Selector>,
122        forwarding_hint: Option<&[Arc<Name>]>,
123    ) -> Self {
124        let mut h = DefaultHasher::new();
125        name_hash.hash(&mut h);
126        selector.hash(&mut h);
127        if let Some(hints) = forwarding_hint {
128            for hint in hints {
129                hint.hash(&mut h);
130            }
131        }
132        PitToken(h.finish())
133    }
134}
135
136/// Record of an incoming Interest (consumer-facing side of the PIT entry).
137#[derive(Clone, Debug)]
138pub struct InRecord {
139    /// Face the Interest arrived on (raw u32; mapped to FaceId by the engine).
140    pub face_id: u32,
141    pub nonce: u32,
142    pub expires_at: u64,
143    /// NDNLPv2 PIT token from the LP header on this face.
144    /// Must be echoed back in the Data/Nack response.
145    pub lp_pit_token: Option<bytes::Bytes>,
146}
147
148/// Record of an outgoing Interest (producer-facing side of the PIT entry).
149#[derive(Clone, Debug)]
150pub struct OutRecord {
151    pub face_id: u32,
152    pub last_nonce: u32,
153    pub sent_at: u64,
154}
155
156/// A single PIT entry — one per pending (Name, Option<Selector>) pair.
157pub struct PitEntry {
158    pub name: Arc<Name>,
159    pub selector: Option<Selector>,
160    pub in_records: Vec<InRecord>,
161    pub out_records: Vec<OutRecord>,
162    /// Nonces seen so far — inline for the common case of ≤4 nonces.
163    pub nonces_seen: SmallVec<[u32; 4]>,
164    pub is_satisfied: bool,
165    pub created_at: u64,
166    pub expires_at: u64,
167}
168
169impl PitEntry {
170    pub fn new(name: Arc<Name>, selector: Option<Selector>, now: u64, lifetime_ms: u64) -> Self {
171        Self {
172            name,
173            selector,
174            in_records: Vec::new(),
175            out_records: Vec::new(),
176            nonces_seen: SmallVec::new(),
177            is_satisfied: false,
178            created_at: now,
179            expires_at: now + lifetime_ms * 1_000_000,
180        }
181    }
182
183    pub fn add_in_record(
184        &mut self,
185        face_id: u32,
186        nonce: u32,
187        expires_at: u64,
188        lp_pit_token: Option<bytes::Bytes>,
189    ) {
190        self.in_records.push(InRecord {
191            face_id,
192            nonce,
193            expires_at,
194            lp_pit_token,
195        });
196        if !self.nonces_seen.contains(&nonce) {
197            self.nonces_seen.push(nonce);
198        }
199    }
200
201    pub fn add_out_record(&mut self, face_id: u32, nonce: u32, sent_at: u64) {
202        self.out_records.push(OutRecord {
203            face_id,
204            last_nonce: nonce,
205            sent_at,
206        });
207    }
208
209    /// Returns the face IDs of all in-records (for Data fan-back).
210    pub fn in_record_faces(&self) -> impl Iterator<Item = u32> + '_ {
211        self.in_records.iter().map(|r| r.face_id)
212    }
213}
214
215/// The Pending Interest Table.
216///
217/// On native targets uses `DashMap` for sharded concurrent access with no
218/// global lock on the forwarding hot path. On `wasm32` uses a
219/// `Mutex<HashMap>` (single-threaded WASM has no contention).
220pub struct Pit {
221    #[cfg(not(target_arch = "wasm32"))]
222    entries: DashMap<PitToken, PitEntry>,
223    #[cfg(target_arch = "wasm32")]
224    entries: std::sync::Mutex<std::collections::HashMap<PitToken, PitEntry>>,
225}
226
227impl Pit {
228    pub fn new() -> Self {
229        Self {
230            #[cfg(not(target_arch = "wasm32"))]
231            entries: DashMap::new(),
232            #[cfg(target_arch = "wasm32")]
233            entries: std::sync::Mutex::new(std::collections::HashMap::new()),
234        }
235    }
236
237    pub fn clear(&self) {
238        #[cfg(not(target_arch = "wasm32"))]
239        self.entries.clear();
240        #[cfg(target_arch = "wasm32")]
241        self.entries.lock().unwrap().clear();
242    }
243
244    pub fn insert(&self, token: PitToken, entry: PitEntry) {
245        #[cfg(not(target_arch = "wasm32"))]
246        {
247            self.entries.insert(token, entry);
248        }
249        #[cfg(target_arch = "wasm32")]
250        {
251            self.entries.lock().unwrap().insert(token, entry);
252        }
253    }
254
255    /// Returns `true` if the PIT contains an entry for `token`.
256    pub fn contains(&self, token: &PitToken) -> bool {
257        #[cfg(not(target_arch = "wasm32"))]
258        return self.entries.contains_key(token);
259        #[cfg(target_arch = "wasm32")]
260        return self.entries.lock().unwrap().contains_key(token);
261    }
262
263    /// Apply `f` to the entry for `token`, returning the closure's result.
264    /// Returns `None` if no entry exists.
265    pub fn with_entry<R, F: FnOnce(&PitEntry) -> R>(&self, token: &PitToken, f: F) -> Option<R> {
266        #[cfg(not(target_arch = "wasm32"))]
267        return self.entries.get(token).map(|e| f(&e));
268        #[cfg(target_arch = "wasm32")]
269        return self.entries.lock().unwrap().get(token).map(f);
270    }
271
272    /// Apply `f` to the mutable entry for `token`, returning the closure's result.
273    /// Returns `None` if no entry exists.
274    pub fn with_entry_mut<R, F: FnOnce(&mut PitEntry) -> R>(
275        &self,
276        token: &PitToken,
277        f: F,
278    ) -> Option<R> {
279        #[cfg(not(target_arch = "wasm32"))]
280        return self.entries.get_mut(token).map(|mut e| f(&mut e));
281        #[cfg(target_arch = "wasm32")]
282        return self.entries.lock().unwrap().get_mut(token).map(f);
283    }
284
285    /// Look up an entry by reference. Prefer `contains()` or `with_entry()` for new code.
286    #[cfg(not(target_arch = "wasm32"))]
287    pub fn get(
288        &self,
289        token: &PitToken,
290    ) -> Option<dashmap::mapref::one::Ref<'_, PitToken, PitEntry>> {
291        self.entries.get(token)
292    }
293
294    /// Look up a mutable entry. Prefer `with_entry_mut()` for new code.
295    #[cfg(not(target_arch = "wasm32"))]
296    pub fn get_mut(
297        &self,
298        token: &PitToken,
299    ) -> Option<dashmap::mapref::one::RefMut<'_, PitToken, PitEntry>> {
300        self.entries.get_mut(token)
301    }
302
303    pub fn remove(&self, token: &PitToken) -> Option<(PitToken, PitEntry)> {
304        #[cfg(not(target_arch = "wasm32"))]
305        return self.entries.remove(token);
306        #[cfg(target_arch = "wasm32")]
307        return self
308            .entries
309            .lock()
310            .unwrap()
311            .remove(token)
312            .map(|v| (*token, v));
313    }
314
315    pub fn len(&self) -> usize {
316        #[cfg(not(target_arch = "wasm32"))]
317        return self.entries.len();
318        #[cfg(target_arch = "wasm32")]
319        return self.entries.lock().unwrap().len();
320    }
321
322    pub fn is_empty(&self) -> bool {
323        #[cfg(not(target_arch = "wasm32"))]
324        return self.entries.is_empty();
325        #[cfg(target_arch = "wasm32")]
326        return self.entries.lock().unwrap().is_empty();
327    }
328
329    /// Remove all entries whose `expires_at` ≤ `now_ns`.
330    /// Returns the tokens of expired entries.
331    pub fn drain_expired(&self, now_ns: u64) -> Vec<PitToken> {
332        #[cfg(not(target_arch = "wasm32"))]
333        {
334            let expired: Vec<PitToken> = self
335                .entries
336                .iter()
337                .filter(|r| r.expires_at <= now_ns)
338                .map(|r| *r.key())
339                .collect();
340            for token in &expired {
341                self.entries.remove(token);
342            }
343            expired
344        }
345        #[cfg(target_arch = "wasm32")]
346        {
347            let mut entries = self.entries.lock().unwrap();
348            let expired: Vec<PitToken> = entries
349                .iter()
350                .filter(|(_, e)| e.expires_at <= now_ns)
351                .map(|(k, _)| *k)
352                .collect();
353            for token in &expired {
354                entries.remove(token);
355            }
356            expired
357        }
358    }
359
360    /// Remove PIT entries whose **only** in-record face is `face_id`.
361    ///
362    /// Entries that also have in-records from other faces are kept (with the
363    /// dead face's records removed). This prevents stale PIT entries from
364    /// suppressing Interests after a face disconnects.
365    pub fn remove_face(&self, face_id: u32) -> usize {
366        #[cfg(not(target_arch = "wasm32"))]
367        {
368            // First pass: identify entries to remove entirely (sole consumer was this face).
369            let mut to_remove = Vec::new();
370            let mut to_prune = Vec::new();
371
372            for entry in self.entries.iter() {
373                let all_on_face = entry.in_records.iter().all(|r| r.face_id == face_id);
374                let any_on_face = entry.in_records.iter().any(|r| r.face_id == face_id);
375
376                if all_on_face && !entry.in_records.is_empty() {
377                    to_remove.push(*entry.key());
378                } else if any_on_face {
379                    to_prune.push(*entry.key());
380                }
381            }
382
383            let removed = to_remove.len();
384
385            for token in &to_remove {
386                self.entries.remove(token);
387            }
388
389            // Second pass: prune in-records for the dead face from multi-consumer entries.
390            for token in &to_prune {
391                if let Some(mut entry) = self.entries.get_mut(token) {
392                    entry.in_records.retain(|r| r.face_id != face_id);
393                }
394            }
395
396            removed
397        }
398        #[cfg(target_arch = "wasm32")]
399        {
400            let mut entries = self.entries.lock().unwrap();
401            let mut to_remove = Vec::new();
402            let mut to_prune = Vec::new();
403
404            for (token, entry) in entries.iter() {
405                let all_on_face = entry.in_records.iter().all(|r| r.face_id == face_id);
406                let any_on_face = entry.in_records.iter().any(|r| r.face_id == face_id);
407
408                if all_on_face && !entry.in_records.is_empty() {
409                    to_remove.push(*token);
410                } else if any_on_face {
411                    to_prune.push(*token);
412                }
413            }
414
415            let removed = to_remove.len();
416
417            for token in &to_remove {
418                entries.remove(token);
419            }
420
421            for token in &to_prune {
422                if let Some(entry) = entries.get_mut(token) {
423                    entry.in_records.retain(|r| r.face_id != face_id);
424                }
425            }
426
427            removed
428        }
429    }
430}
431
432impl Default for Pit {
433    fn default() -> Self {
434        Self::new()
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use bytes::Bytes;
442    use ndn_packet::encode::{encode_data_unsigned, encode_interest};
443    use ndn_packet::{Data, Interest, NameComponent, Selector};
444
445    fn make_name(comps: &[&str]) -> Name {
446        Name::from_components(
447            comps
448                .iter()
449                .map(|s| NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))),
450        )
451    }
452
453    /// Verify PIT token matching for iperf-style Interest/Data flow.
454    ///
455    /// This reproduces the exact path used in the pipeline:
456    /// - PitCheckStage creates token with `from_interest_full(name, Some(selectors), fwd_hint)`
457    /// - PitMatchStage tries `from_interest(data.name, None)` then `from_interest(data.name, Some(default))`
458    #[test]
459    fn pit_token_iperf_interest_data_match() {
460        let name = make_name(&["iperf", "0"]);
461
462        // Encode Interest the same way iperf does.
463        let interest_wire = encode_interest(&name, None);
464        let interest = Interest::decode(interest_wire.clone()).unwrap();
465
466        // PitCheckStage creates token:
467        let check_token = PitToken::from_interest_full(
468            &interest.name,
469            Some(interest.selectors()),
470            interest.forwarding_hint(),
471        );
472
473        // Server responds with Data using the Interest's name.
474        let data_wire = encode_data_unsigned(&interest.name, &[0xAAu8; 100]);
475        let data = Data::decode(data_wire).unwrap();
476
477        // PitMatchStage first try:
478        let match_token1 = PitToken::from_interest(&data.name, None);
479        // PitMatchStage second try:
480        let default_sel = Selector::default();
481        let match_token2 = PitToken::from_interest(&data.name, Some(&default_sel));
482
483        // The second try MUST match the check token.
484        assert_ne!(
485            check_token, match_token1,
486            "first try should NOT match (None vs Some selector)"
487        );
488        assert_eq!(
489            check_token, match_token2,
490            "second try MUST match (Same default selector)"
491        );
492    }
493
494    /// Verify that source_face_id computation matches PitCheck for management Interests.
495    ///
496    /// This simulates the rib/register flow where source_face_id decodes
497    /// the Interest from raw bytes forwarded through the pipeline.
498    #[test]
499    fn pit_token_management_interest_source_face() {
500        // Build a rib/register command name (simplified).
501        let name = make_name(&["localhost", "nfd", "rib", "register", "params"]);
502        let interest_wire = encode_interest(&name, None);
503        let interest = Interest::decode(interest_wire.clone()).unwrap();
504
505        // PitCheckStage creates token from decoded Interest:
506        let check_token = PitToken::from_interest_full(
507            &interest.name,
508            Some(interest.selectors()),
509            interest.forwarding_hint(),
510        );
511
512        // After ensure_nonce (no-op since nonce already present), the same bytes
513        // are forwarded to the management face. Management handler decodes them:
514        let mgmt_interest = Interest::decode(interest_wire).unwrap();
515
516        // source_face_id computes:
517        let source_token = PitToken::from_interest_full(
518            &mgmt_interest.name,
519            Some(mgmt_interest.selectors()),
520            mgmt_interest.forwarding_hint(),
521        );
522
523        assert_eq!(
524            check_token, source_token,
525            "source_face_id must match PitCheck token"
526        );
527    }
528
529    #[test]
530    fn pit_insert_and_remove_basic() {
531        let pit = Pit::new();
532        let name = Arc::new(make_name(&["test"]));
533        let token = PitToken::from_interest(&name, None);
534        let entry = PitEntry::new(name, None, 0, 4000);
535        pit.insert(token, entry);
536        assert_eq!(pit.len(), 1);
537        assert!(pit.remove(&token).is_some());
538        assert!(pit.is_empty());
539    }
540
541    #[test]
542    fn remove_face_drains_sole_consumer() {
543        let pit = Pit::new();
544
545        // Entry with sole consumer on face 1.
546        let name1 = Arc::new(make_name(&["a"]));
547        let token1 = PitToken::from_interest(&name1, None);
548        let mut entry1 = PitEntry::new(name1, None, 0, 4000);
549        entry1.add_in_record(1, 100, 999, None);
550        pit.insert(token1, entry1);
551
552        // Entry with consumers on face 1 AND face 2.
553        let name2 = Arc::new(make_name(&["b"]));
554        let token2 = PitToken::from_interest(&name2, None);
555        let mut entry2 = PitEntry::new(name2, None, 0, 4000);
556        entry2.add_in_record(1, 200, 999, None);
557        entry2.add_in_record(2, 201, 999, None);
558        pit.insert(token2, entry2);
559
560        // Entry with sole consumer on face 3 (unrelated).
561        let name3 = Arc::new(make_name(&["c"]));
562        let token3 = PitToken::from_interest(&name3, None);
563        let mut entry3 = PitEntry::new(name3, None, 0, 4000);
564        entry3.add_in_record(3, 300, 999, None);
565        pit.insert(token3, entry3);
566
567        assert_eq!(pit.len(), 3);
568
569        // Remove face 1: should remove entry1 entirely, prune face 1 from entry2.
570        let removed = pit.remove_face(1);
571        assert_eq!(removed, 1);
572        assert_eq!(pit.len(), 2); // entry2 and entry3 remain
573
574        // entry2 should still exist but only have face 2's in-record.
575        pit.with_entry(&token2, |entry2| {
576            assert_eq!(entry2.in_records.len(), 1);
577            assert_eq!(entry2.in_records[0].face_id, 2);
578        })
579        .expect("entry2 should exist");
580
581        // entry3 is untouched.
582        assert!(pit.contains(&token3));
583    }
584}