ndn_store/
lru_cs.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3
4use bytes::Bytes;
5use lru::LruCache;
6
7use ndn_packet::{Interest, Name};
8
9use crate::{ContentStore, CsCapacity, CsEntry, CsMeta, InsertResult, NameTrie};
10
11/// In-memory LRU content store, bounded by total byte capacity.
12///
13/// Maintains two indices:
14/// - `cache`: `LruCache<Arc<Name>, CsEntry>` — exact-match lookup in O(1).
15/// - `prefix_index`: `NameTrie<Arc<Name>>` — maps each cached name to itself,
16///   enabling `CanBePrefix` lookups via `first_descendant`.
17///
18/// All insertions and evictions update both indices atomically under the lock.
19///
20/// An atomic entry count (`entry_count`) allows `get()` to short-circuit
21/// without acquiring the Mutex when the CS is empty — eliminating lock
22/// contention on the hot path for workloads that don't cache (e.g. iperf).
23pub struct LruCs {
24    inner: Mutex<LruInner>,
25    /// Maximum byte capacity. Atomic for lock-free runtime updates via `set_capacity`.
26    capacity_bytes: AtomicUsize,
27    /// Atomic entry count — updated under the lock but readable without it.
28    entry_count: AtomicUsize,
29}
30
31struct LruInner {
32    cache: LruCache<Arc<Name>, CsEntry>,
33    prefix_index: NameTrie<Arc<Name>>,
34    current_bytes: usize,
35}
36
37impl LruCs {
38    /// Create an LRU CS with the given byte capacity.
39    pub fn new(capacity_bytes: usize) -> Self {
40        use std::num::NonZeroUsize;
41        // Set the LruCache entry limit to capacity_bytes so it never fires before
42        // our byte-based eviction loop does. Each Data packet is at least 1 byte,
43        // so we can never accumulate more than capacity_bytes entries.
44        let max_entries = NonZeroUsize::new(capacity_bytes.max(1)).unwrap();
45        Self {
46            inner: Mutex::new(LruInner {
47                cache: LruCache::new(max_entries),
48                prefix_index: NameTrie::new(),
49                current_bytes: 0,
50            }),
51            capacity_bytes: AtomicUsize::new(capacity_bytes),
52            entry_count: AtomicUsize::new(0),
53        }
54    }
55
56    /// Whether the cache is empty.
57    pub fn is_empty(&self) -> bool {
58        self.entry_count.load(Ordering::Relaxed) == 0
59    }
60}
61
62impl ContentStore for LruCs {
63    async fn get(&self, interest: &Interest) -> Option<CsEntry> {
64        // Fast path: skip the Mutex entirely when the CS is empty.
65        if self.entry_count.load(Ordering::Relaxed) == 0 {
66            return None;
67        }
68        let mut inner = self.inner.lock().unwrap();
69
70        // Check if the Interest carries an ImplicitSha256DigestComponent as
71        // its last name component. If so, strip it and look up by Data name,
72        // then verify the digest against the cached wire bytes.
73        let comps = interest.name.components();
74        let has_implicit_digest =
75            !comps.is_empty() && comps.last().unwrap().typ == ndn_packet::tlv_type::IMPLICIT_SHA256;
76
77        let entry = if has_implicit_digest {
78            // Build the Data name (Interest name minus the digest component).
79            let data_name = Name::from_components(comps[..comps.len() - 1].iter().cloned());
80            let candidate = inner.cache.get(&data_name)?.clone();
81            // Verify the implicit digest matches.
82            let expected_digest = &comps.last().unwrap().value;
83            let actual = ring::digest::digest(&ring::digest::SHA256, &candidate.data);
84            if expected_digest.as_ref() != actual.as_ref() {
85                return None;
86            }
87            candidate
88        } else if interest.selectors().can_be_prefix {
89            let data_name = inner.prefix_index.first_descendant(&interest.name)?;
90            inner.cache.get(data_name.as_ref())?.clone()
91        } else {
92            inner.cache.get(interest.name.as_ref())?.clone()
93        };
94
95        if interest.selectors().must_be_fresh && !entry.is_fresh(now_ns()) {
96            return None;
97        }
98        Some(entry)
99    }
100
101    async fn insert(&self, data: Bytes, name: Arc<Name>, meta: CsMeta) -> InsertResult {
102        let entry_bytes = data.len();
103        let capacity = self.capacity_bytes.load(Ordering::Relaxed);
104        let mut inner = self.inner.lock().unwrap();
105
106        // Track whether we are replacing an existing entry.
107        let was_present = if let Some(old) = inner.cache.peek(name.as_ref()) {
108            inner.current_bytes = inner.current_bytes.saturating_sub(old.data.len());
109            true
110        } else {
111            false
112        };
113
114        // Evict LRU entries until there is room, keeping prefix_index in sync.
115        while inner.current_bytes + entry_bytes > capacity {
116            if let Some((evicted_name, evicted)) = inner.cache.pop_lru() {
117                inner.current_bytes = inner.current_bytes.saturating_sub(evicted.data.len());
118                inner.prefix_index.remove(&evicted_name);
119                self.entry_count.fetch_sub(1, Ordering::Relaxed);
120            } else {
121                break;
122            }
123        }
124
125        let entry = CsEntry {
126            data,
127            stale_at: meta.stale_at,
128            name: name.clone(),
129        };
130        inner.cache.put(name.clone(), entry);
131        inner.current_bytes += entry_bytes;
132
133        // Only index new entries; replacements keep the same name in the trie.
134        if !was_present {
135            inner.prefix_index.insert(name.as_ref(), Arc::clone(&name));
136        }
137
138        if !was_present {
139            self.entry_count.fetch_add(1, Ordering::Relaxed);
140        }
141        if was_present {
142            InsertResult::Replaced
143        } else {
144            InsertResult::Inserted
145        }
146    }
147
148    async fn evict(&self, name: &Name) -> bool {
149        let mut inner = self.inner.lock().unwrap();
150        if let Some(evicted) = inner.cache.pop(name) {
151            inner.current_bytes = inner.current_bytes.saturating_sub(evicted.data.len());
152            inner.prefix_index.remove(name);
153            self.entry_count.fetch_sub(1, Ordering::Relaxed);
154            return true;
155        }
156        false
157    }
158
159    fn capacity(&self) -> CsCapacity {
160        CsCapacity::bytes(self.capacity_bytes.load(Ordering::Relaxed))
161    }
162
163    fn len(&self) -> usize {
164        self.entry_count.load(Ordering::Relaxed)
165    }
166
167    fn current_bytes(&self) -> usize {
168        self.inner.lock().unwrap().current_bytes
169    }
170
171    fn set_capacity(&self, max_bytes: usize) {
172        self.capacity_bytes.store(max_bytes, Ordering::Relaxed);
173        // Evict entries that exceed the new capacity.
174        let mut inner = self.inner.lock().unwrap();
175        while inner.current_bytes > max_bytes {
176            if let Some((evicted_name, evicted)) = inner.cache.pop_lru() {
177                inner.current_bytes = inner.current_bytes.saturating_sub(evicted.data.len());
178                inner.prefix_index.remove(&evicted_name);
179                self.entry_count.fetch_sub(1, Ordering::Relaxed);
180            } else {
181                break;
182            }
183        }
184    }
185
186    fn variant_name(&self) -> &str {
187        "lru"
188    }
189
190    async fn evict_prefix(&self, prefix: &Name, limit: Option<usize>) -> usize {
191        let mut inner = self.inner.lock().unwrap();
192        let names: Vec<Arc<Name>> = inner.prefix_index.descendants(prefix);
193        let max = limit.unwrap_or(usize::MAX);
194        let mut evicted = 0;
195        for name in names {
196            if evicted >= max {
197                break;
198            }
199            if let Some(entry) = inner.cache.pop(name.as_ref()) {
200                inner.current_bytes = inner.current_bytes.saturating_sub(entry.data.len());
201                inner.prefix_index.remove(name.as_ref());
202                self.entry_count.fetch_sub(1, Ordering::Relaxed);
203                evicted += 1;
204            }
205        }
206        evicted
207    }
208}
209
210fn now_ns() -> u64 {
211    use std::time::{SystemTime, UNIX_EPOCH};
212    SystemTime::now()
213        .duration_since(UNIX_EPOCH)
214        .map(|d| d.as_nanos() as u64)
215        .unwrap_or(0)
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use ndn_packet::{Interest, Name, NameComponent};
222
223    fn arc_name(components: &[&str]) -> Arc<Name> {
224        Arc::new(Name::from_components(components.iter().map(|s| {
225            NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))
226        })))
227    }
228
229    fn meta_fresh() -> CsMeta {
230        CsMeta { stale_at: u64::MAX }
231    }
232
233    fn meta_stale() -> CsMeta {
234        CsMeta { stale_at: 0 } // already stale
235    }
236
237    fn interest(components: &[&str]) -> Interest {
238        Interest::new((*arc_name(components)).clone())
239    }
240
241    fn interest_fresh(components: &[&str]) -> Interest {
242        let name = (*arc_name(components)).clone();
243        let _ = Interest::new(name);
244        // Force must_be_fresh by building wire bytes with the flag set.
245        // For testing purposes we build via TlvWriter.
246        use ndn_packet::tlv_type;
247        use ndn_tlv::TlvWriter;
248        let mut w = TlvWriter::new();
249        w.write_nested(tlv_type::INTEREST, |w| {
250            w.write_nested(tlv_type::NAME, |w| {
251                for comp in components {
252                    w.write_tlv(tlv_type::NAME_COMPONENT, comp.as_bytes());
253                }
254            });
255            w.write_tlv(tlv_type::MUST_BE_FRESH, &[]);
256        });
257        Interest::decode(w.finish()).unwrap()
258    }
259
260    fn interest_can_be_prefix(components: &[&str]) -> Interest {
261        use ndn_packet::tlv_type;
262        use ndn_tlv::TlvWriter;
263        let mut w = TlvWriter::new();
264        w.write_nested(tlv_type::INTEREST, |w| {
265            w.write_nested(tlv_type::NAME, |w| {
266                for comp in components {
267                    w.write_tlv(tlv_type::NAME_COMPONENT, comp.as_bytes());
268                }
269            });
270            w.write_tlv(tlv_type::CAN_BE_PREFIX, &[]);
271        });
272        Interest::decode(w.finish()).unwrap()
273    }
274
275    // ── basic insert / get ────────────────────────────────────────────────────
276
277    #[tokio::test]
278    async fn get_miss_returns_none() {
279        let cs = LruCs::new(65536);
280        assert!(cs.get(&interest(&["a"])).await.is_none());
281    }
282
283    #[tokio::test]
284    async fn insert_then_get_returns_entry() {
285        let cs = LruCs::new(65536);
286        let name = arc_name(&["a", "b"]);
287        cs.insert(Bytes::from_static(b"data"), name.clone(), meta_fresh())
288            .await;
289        let entry = cs.get(&interest(&["a", "b"])).await.unwrap();
290        assert_eq!(entry.data.as_ref(), b"data");
291    }
292
293    #[tokio::test]
294    async fn insert_returns_inserted() {
295        let cs = LruCs::new(65536);
296        let r = cs
297            .insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_fresh())
298            .await;
299        assert_eq!(r, InsertResult::Inserted);
300    }
301
302    #[tokio::test]
303    async fn insert_replaces_existing() {
304        let cs = LruCs::new(65536);
305        let name = arc_name(&["a"]);
306        cs.insert(Bytes::from_static(b"old"), name.clone(), meta_fresh())
307            .await;
308        let r = cs
309            .insert(Bytes::from_static(b"new"), name.clone(), meta_fresh())
310            .await;
311        assert_eq!(r, InsertResult::Replaced);
312        let entry = cs.get(&interest(&["a"])).await.unwrap();
313        assert_eq!(entry.data.as_ref(), b"new");
314    }
315
316    // ── must_be_fresh ─────────────────────────────────────────────────────────
317
318    #[tokio::test]
319    async fn must_be_fresh_rejects_stale_entry() {
320        let cs = LruCs::new(65536);
321        cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_stale())
322            .await;
323        assert!(cs.get(&interest_fresh(&["a"])).await.is_none());
324    }
325
326    #[tokio::test]
327    async fn must_be_fresh_accepts_fresh_entry() {
328        let cs = LruCs::new(65536);
329        cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_fresh())
330            .await;
331        assert!(cs.get(&interest_fresh(&["a"])).await.is_some());
332    }
333
334    #[tokio::test]
335    async fn no_must_be_fresh_returns_stale_entry() {
336        let cs = LruCs::new(65536);
337        cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_stale())
338            .await;
339        // Without MustBeFresh the stale entry is still returned.
340        assert!(cs.get(&interest(&["a"])).await.is_some());
341    }
342
343    // ── can_be_prefix ─────────────────────────────────────────────────────────
344
345    #[tokio::test]
346    async fn can_be_prefix_finds_longer_name() {
347        let cs = LruCs::new(65536);
348        cs.insert(
349            Bytes::from_static(b"v"),
350            arc_name(&["a", "b", "1"]),
351            meta_fresh(),
352        )
353        .await;
354        let entry = cs.get(&interest_can_be_prefix(&["a", "b"])).await;
355        assert!(entry.is_some());
356    }
357
358    #[tokio::test]
359    async fn can_be_prefix_miss_for_unrelated_name() {
360        let cs = LruCs::new(65536);
361        cs.insert(
362            Bytes::from_static(b"v"),
363            arc_name(&["x", "y"]),
364            meta_fresh(),
365        )
366        .await;
367        assert!(cs.get(&interest_can_be_prefix(&["a", "b"])).await.is_none());
368    }
369
370    // ── evict ─────────────────────────────────────────────────────────────────
371
372    #[tokio::test]
373    async fn evict_removes_entry() {
374        let cs = LruCs::new(65536);
375        let name = arc_name(&["a"]);
376        cs.insert(Bytes::from_static(b"x"), name.clone(), meta_fresh())
377            .await;
378        let removed = cs.evict(&name).await;
379        assert!(removed);
380        assert!(cs.get(&interest(&["a"])).await.is_none());
381    }
382
383    #[tokio::test]
384    async fn evict_nonexistent_returns_false() {
385        let cs = LruCs::new(65536);
386        assert!(!cs.evict(&arc_name(&["z"])).await);
387    }
388
389    #[tokio::test]
390    async fn evict_removes_from_prefix_index() {
391        let cs = LruCs::new(65536);
392        cs.insert(
393            Bytes::from_static(b"x"),
394            arc_name(&["a", "b"]),
395            meta_fresh(),
396        )
397        .await;
398        cs.evict(&arc_name(&["a", "b"])).await;
399        // CanBePrefix should also miss now.
400        assert!(cs.get(&interest_can_be_prefix(&["a"])).await.is_none());
401    }
402
403    // ── capacity / LRU eviction ───────────────────────────────────────────────
404
405    #[tokio::test]
406    async fn capacity_is_reported() {
407        let cs = LruCs::new(1024);
408        assert_eq!(cs.capacity().max_bytes, 1024);
409    }
410
411    #[tokio::test]
412    async fn lru_eviction_keeps_byte_count_bounded() {
413        // Capacity = 20 bytes; each entry is 10 bytes → room for 2.
414        let cs = LruCs::new(20);
415        cs.insert(Bytes::from(vec![0u8; 10]), arc_name(&["a"]), meta_fresh())
416            .await;
417        cs.insert(Bytes::from(vec![0u8; 10]), arc_name(&["b"]), meta_fresh())
418            .await;
419        // Third insert evicts /a (LRU).
420        cs.insert(Bytes::from(vec![0u8; 10]), arc_name(&["c"]), meta_fresh())
421            .await;
422        assert!(cs.get(&interest(&["a"])).await.is_none());
423        assert!(cs.get(&interest(&["b"])).await.is_some());
424        assert!(cs.get(&interest(&["c"])).await.is_some());
425    }
426
427    // ── implicit SHA-256 digest ────────────────────────────────────────────
428
429    #[tokio::test]
430    async fn implicit_digest_lookup_matches() {
431        let cs = LruCs::new(65536);
432        let data_bytes = Bytes::from_static(b"wire-format-data");
433        let name = arc_name(&["a", "b"]);
434        cs.insert(data_bytes.clone(), name.clone(), meta_fresh())
435            .await;
436
437        // Build an Interest whose name is /a/b/<implicit-digest>
438        let digest = ring::digest::digest(&ring::digest::SHA256, &data_bytes);
439        let mut comps: Vec<NameComponent> = name.components().to_vec();
440        comps.push(NameComponent {
441            typ: ndn_packet::tlv_type::IMPLICIT_SHA256,
442            value: Bytes::copy_from_slice(digest.as_ref()),
443        });
444        let interest_name = Name::from_components(comps);
445        let i = Interest::new(interest_name);
446        let entry = cs.get(&i).await.expect("implicit digest hit");
447        assert_eq!(entry.data.as_ref(), b"wire-format-data");
448    }
449
450    #[tokio::test]
451    async fn implicit_digest_wrong_hash_misses() {
452        let cs = LruCs::new(65536);
453        cs.insert(Bytes::from_static(b"data"), arc_name(&["a"]), meta_fresh())
454            .await;
455
456        // Wrong digest
457        let mut comps: Vec<NameComponent> = arc_name(&["a"]).components().to_vec();
458        comps.push(NameComponent {
459            typ: ndn_packet::tlv_type::IMPLICIT_SHA256,
460            value: Bytes::from_static(&[0u8; 32]),
461        });
462        let i = Interest::new(Name::from_components(comps));
463        assert!(cs.get(&i).await.is_none());
464    }
465
466    #[tokio::test]
467    async fn lru_eviction_removes_from_prefix_index() {
468        let cs = LruCs::new(20);
469        cs.insert(
470            Bytes::from(vec![0u8; 10]),
471            arc_name(&["a", "b"]),
472            meta_fresh(),
473        )
474        .await;
475        cs.insert(
476            Bytes::from(vec![0u8; 10]),
477            arc_name(&["b", "c"]),
478            meta_fresh(),
479        )
480        .await;
481        // Third insert evicts /a/b.
482        cs.insert(
483            Bytes::from(vec![0u8; 10]),
484            arc_name(&["c", "d"]),
485            meta_fresh(),
486        )
487        .await;
488        // CanBePrefix for /a should now miss (evicted entry removed from index).
489        assert!(cs.get(&interest_can_be_prefix(&["a"])).await.is_none());
490    }
491
492    // ── new trait methods ─────────────────────────────────────────────────────
493
494    #[tokio::test]
495    async fn len_tracks_entries() {
496        let cs = LruCs::new(65536);
497        assert_eq!(cs.len(), 0);
498        cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_fresh())
499            .await;
500        assert_eq!(cs.len(), 1);
501        cs.insert(Bytes::from_static(b"y"), arc_name(&["b"]), meta_fresh())
502            .await;
503        assert_eq!(cs.len(), 2);
504        cs.evict(&arc_name(&["a"])).await;
505        assert_eq!(cs.len(), 1);
506    }
507
508    #[tokio::test]
509    async fn set_capacity_evicts_excess() {
510        let cs = LruCs::new(100);
511        cs.insert(Bytes::from(vec![0u8; 40]), arc_name(&["a"]), meta_fresh())
512            .await;
513        cs.insert(Bytes::from(vec![0u8; 40]), arc_name(&["b"]), meta_fresh())
514            .await;
515        assert_eq!(cs.len(), 2);
516        // Shrink capacity below current usage — should evict LRU entries.
517        cs.set_capacity(50);
518        assert_eq!(cs.capacity().max_bytes, 50);
519        assert_eq!(cs.len(), 1);
520        // The more recently used entry (/b) survives.
521        assert!(cs.get(&interest(&["b"])).await.is_some());
522    }
523
524    #[tokio::test]
525    async fn variant_name_is_lru() {
526        let cs = LruCs::new(1024);
527        assert_eq!(cs.variant_name(), "lru");
528    }
529
530    #[tokio::test]
531    async fn evict_prefix_removes_matching_entries() {
532        let cs = LruCs::new(65536);
533        cs.insert(
534            Bytes::from_static(b"1"),
535            arc_name(&["a", "b", "1"]),
536            meta_fresh(),
537        )
538        .await;
539        cs.insert(
540            Bytes::from_static(b"2"),
541            arc_name(&["a", "b", "2"]),
542            meta_fresh(),
543        )
544        .await;
545        cs.insert(
546            Bytes::from_static(b"3"),
547            arc_name(&["x", "y"]),
548            meta_fresh(),
549        )
550        .await;
551        let name_ab = Name::from_components(
552            ["a", "b"]
553                .iter()
554                .map(|s| NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))),
555        );
556        let evicted = cs.evict_prefix(&name_ab, None).await;
557        assert_eq!(evicted, 2);
558        assert_eq!(cs.len(), 1);
559        // /x/y should still be there.
560        assert!(cs.get(&interest(&["x", "y"])).await.is_some());
561    }
562
563    #[tokio::test]
564    async fn evict_prefix_respects_limit() {
565        let cs = LruCs::new(65536);
566        cs.insert(
567            Bytes::from_static(b"1"),
568            arc_name(&["a", "1"]),
569            meta_fresh(),
570        )
571        .await;
572        cs.insert(
573            Bytes::from_static(b"2"),
574            arc_name(&["a", "2"]),
575            meta_fresh(),
576        )
577        .await;
578        cs.insert(
579            Bytes::from_static(b"3"),
580            arc_name(&["a", "3"]),
581            meta_fresh(),
582        )
583        .await;
584        let name_a = Name::from_components(std::iter::once(NameComponent::generic(
585            Bytes::copy_from_slice(b"a"),
586        )));
587        let evicted = cs.evict_prefix(&name_a, Some(1)).await;
588        assert_eq!(evicted, 1);
589        assert_eq!(cs.len(), 2);
590    }
591}