1use std::sync::atomic::{AtomicUsize, Ordering};
2use std::sync::{Arc, Mutex};
3
4use bytes::Bytes;
5use lru::LruCache;
6
7use ndn_packet::{Interest, Name};
8
9use crate::{ContentStore, CsCapacity, CsEntry, CsMeta, InsertResult, NameTrie};
10
11pub struct LruCs {
24 inner: Mutex<LruInner>,
25 capacity_bytes: AtomicUsize,
27 entry_count: AtomicUsize,
29}
30
31struct LruInner {
32 cache: LruCache<Arc<Name>, CsEntry>,
33 prefix_index: NameTrie<Arc<Name>>,
34 current_bytes: usize,
35}
36
37impl LruCs {
38 pub fn new(capacity_bytes: usize) -> Self {
40 use std::num::NonZeroUsize;
41 let max_entries = NonZeroUsize::new(capacity_bytes.max(1)).unwrap();
45 Self {
46 inner: Mutex::new(LruInner {
47 cache: LruCache::new(max_entries),
48 prefix_index: NameTrie::new(),
49 current_bytes: 0,
50 }),
51 capacity_bytes: AtomicUsize::new(capacity_bytes),
52 entry_count: AtomicUsize::new(0),
53 }
54 }
55
56 pub fn is_empty(&self) -> bool {
58 self.entry_count.load(Ordering::Relaxed) == 0
59 }
60}
61
62impl ContentStore for LruCs {
63 async fn get(&self, interest: &Interest) -> Option<CsEntry> {
64 if self.entry_count.load(Ordering::Relaxed) == 0 {
66 return None;
67 }
68 let mut inner = self.inner.lock().unwrap();
69
70 let comps = interest.name.components();
74 let has_implicit_digest =
75 !comps.is_empty() && comps.last().unwrap().typ == ndn_packet::tlv_type::IMPLICIT_SHA256;
76
77 let entry = if has_implicit_digest {
78 let data_name = Name::from_components(comps[..comps.len() - 1].iter().cloned());
80 let candidate = inner.cache.get(&data_name)?.clone();
81 let expected_digest = &comps.last().unwrap().value;
83 let actual = ring::digest::digest(&ring::digest::SHA256, &candidate.data);
84 if expected_digest.as_ref() != actual.as_ref() {
85 return None;
86 }
87 candidate
88 } else if interest.selectors().can_be_prefix {
89 let data_name = inner.prefix_index.first_descendant(&interest.name)?;
90 inner.cache.get(data_name.as_ref())?.clone()
91 } else {
92 inner.cache.get(interest.name.as_ref())?.clone()
93 };
94
95 if interest.selectors().must_be_fresh && !entry.is_fresh(now_ns()) {
96 return None;
97 }
98 Some(entry)
99 }
100
101 async fn insert(&self, data: Bytes, name: Arc<Name>, meta: CsMeta) -> InsertResult {
102 let entry_bytes = data.len();
103 let capacity = self.capacity_bytes.load(Ordering::Relaxed);
104 let mut inner = self.inner.lock().unwrap();
105
106 let was_present = if let Some(old) = inner.cache.peek(name.as_ref()) {
108 inner.current_bytes = inner.current_bytes.saturating_sub(old.data.len());
109 true
110 } else {
111 false
112 };
113
114 while inner.current_bytes + entry_bytes > capacity {
116 if let Some((evicted_name, evicted)) = inner.cache.pop_lru() {
117 inner.current_bytes = inner.current_bytes.saturating_sub(evicted.data.len());
118 inner.prefix_index.remove(&evicted_name);
119 self.entry_count.fetch_sub(1, Ordering::Relaxed);
120 } else {
121 break;
122 }
123 }
124
125 let entry = CsEntry {
126 data,
127 stale_at: meta.stale_at,
128 name: name.clone(),
129 };
130 inner.cache.put(name.clone(), entry);
131 inner.current_bytes += entry_bytes;
132
133 if !was_present {
135 inner.prefix_index.insert(name.as_ref(), Arc::clone(&name));
136 }
137
138 if !was_present {
139 self.entry_count.fetch_add(1, Ordering::Relaxed);
140 }
141 if was_present {
142 InsertResult::Replaced
143 } else {
144 InsertResult::Inserted
145 }
146 }
147
148 async fn evict(&self, name: &Name) -> bool {
149 let mut inner = self.inner.lock().unwrap();
150 if let Some(evicted) = inner.cache.pop(name) {
151 inner.current_bytes = inner.current_bytes.saturating_sub(evicted.data.len());
152 inner.prefix_index.remove(name);
153 self.entry_count.fetch_sub(1, Ordering::Relaxed);
154 return true;
155 }
156 false
157 }
158
159 fn capacity(&self) -> CsCapacity {
160 CsCapacity::bytes(self.capacity_bytes.load(Ordering::Relaxed))
161 }
162
163 fn len(&self) -> usize {
164 self.entry_count.load(Ordering::Relaxed)
165 }
166
167 fn current_bytes(&self) -> usize {
168 self.inner.lock().unwrap().current_bytes
169 }
170
171 fn set_capacity(&self, max_bytes: usize) {
172 self.capacity_bytes.store(max_bytes, Ordering::Relaxed);
173 let mut inner = self.inner.lock().unwrap();
175 while inner.current_bytes > max_bytes {
176 if let Some((evicted_name, evicted)) = inner.cache.pop_lru() {
177 inner.current_bytes = inner.current_bytes.saturating_sub(evicted.data.len());
178 inner.prefix_index.remove(&evicted_name);
179 self.entry_count.fetch_sub(1, Ordering::Relaxed);
180 } else {
181 break;
182 }
183 }
184 }
185
186 fn variant_name(&self) -> &str {
187 "lru"
188 }
189
190 async fn evict_prefix(&self, prefix: &Name, limit: Option<usize>) -> usize {
191 let mut inner = self.inner.lock().unwrap();
192 let names: Vec<Arc<Name>> = inner.prefix_index.descendants(prefix);
193 let max = limit.unwrap_or(usize::MAX);
194 let mut evicted = 0;
195 for name in names {
196 if evicted >= max {
197 break;
198 }
199 if let Some(entry) = inner.cache.pop(name.as_ref()) {
200 inner.current_bytes = inner.current_bytes.saturating_sub(entry.data.len());
201 inner.prefix_index.remove(name.as_ref());
202 self.entry_count.fetch_sub(1, Ordering::Relaxed);
203 evicted += 1;
204 }
205 }
206 evicted
207 }
208}
209
210fn now_ns() -> u64 {
211 use std::time::{SystemTime, UNIX_EPOCH};
212 SystemTime::now()
213 .duration_since(UNIX_EPOCH)
214 .map(|d| d.as_nanos() as u64)
215 .unwrap_or(0)
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use ndn_packet::{Interest, Name, NameComponent};
222
223 fn arc_name(components: &[&str]) -> Arc<Name> {
224 Arc::new(Name::from_components(components.iter().map(|s| {
225 NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))
226 })))
227 }
228
229 fn meta_fresh() -> CsMeta {
230 CsMeta { stale_at: u64::MAX }
231 }
232
233 fn meta_stale() -> CsMeta {
234 CsMeta { stale_at: 0 } }
236
237 fn interest(components: &[&str]) -> Interest {
238 Interest::new((*arc_name(components)).clone())
239 }
240
241 fn interest_fresh(components: &[&str]) -> Interest {
242 let name = (*arc_name(components)).clone();
243 let _ = Interest::new(name);
244 use ndn_packet::tlv_type;
247 use ndn_tlv::TlvWriter;
248 let mut w = TlvWriter::new();
249 w.write_nested(tlv_type::INTEREST, |w| {
250 w.write_nested(tlv_type::NAME, |w| {
251 for comp in components {
252 w.write_tlv(tlv_type::NAME_COMPONENT, comp.as_bytes());
253 }
254 });
255 w.write_tlv(tlv_type::MUST_BE_FRESH, &[]);
256 });
257 Interest::decode(w.finish()).unwrap()
258 }
259
260 fn interest_can_be_prefix(components: &[&str]) -> Interest {
261 use ndn_packet::tlv_type;
262 use ndn_tlv::TlvWriter;
263 let mut w = TlvWriter::new();
264 w.write_nested(tlv_type::INTEREST, |w| {
265 w.write_nested(tlv_type::NAME, |w| {
266 for comp in components {
267 w.write_tlv(tlv_type::NAME_COMPONENT, comp.as_bytes());
268 }
269 });
270 w.write_tlv(tlv_type::CAN_BE_PREFIX, &[]);
271 });
272 Interest::decode(w.finish()).unwrap()
273 }
274
275 #[tokio::test]
278 async fn get_miss_returns_none() {
279 let cs = LruCs::new(65536);
280 assert!(cs.get(&interest(&["a"])).await.is_none());
281 }
282
283 #[tokio::test]
284 async fn insert_then_get_returns_entry() {
285 let cs = LruCs::new(65536);
286 let name = arc_name(&["a", "b"]);
287 cs.insert(Bytes::from_static(b"data"), name.clone(), meta_fresh())
288 .await;
289 let entry = cs.get(&interest(&["a", "b"])).await.unwrap();
290 assert_eq!(entry.data.as_ref(), b"data");
291 }
292
293 #[tokio::test]
294 async fn insert_returns_inserted() {
295 let cs = LruCs::new(65536);
296 let r = cs
297 .insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_fresh())
298 .await;
299 assert_eq!(r, InsertResult::Inserted);
300 }
301
302 #[tokio::test]
303 async fn insert_replaces_existing() {
304 let cs = LruCs::new(65536);
305 let name = arc_name(&["a"]);
306 cs.insert(Bytes::from_static(b"old"), name.clone(), meta_fresh())
307 .await;
308 let r = cs
309 .insert(Bytes::from_static(b"new"), name.clone(), meta_fresh())
310 .await;
311 assert_eq!(r, InsertResult::Replaced);
312 let entry = cs.get(&interest(&["a"])).await.unwrap();
313 assert_eq!(entry.data.as_ref(), b"new");
314 }
315
316 #[tokio::test]
319 async fn must_be_fresh_rejects_stale_entry() {
320 let cs = LruCs::new(65536);
321 cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_stale())
322 .await;
323 assert!(cs.get(&interest_fresh(&["a"])).await.is_none());
324 }
325
326 #[tokio::test]
327 async fn must_be_fresh_accepts_fresh_entry() {
328 let cs = LruCs::new(65536);
329 cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_fresh())
330 .await;
331 assert!(cs.get(&interest_fresh(&["a"])).await.is_some());
332 }
333
334 #[tokio::test]
335 async fn no_must_be_fresh_returns_stale_entry() {
336 let cs = LruCs::new(65536);
337 cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_stale())
338 .await;
339 assert!(cs.get(&interest(&["a"])).await.is_some());
341 }
342
343 #[tokio::test]
346 async fn can_be_prefix_finds_longer_name() {
347 let cs = LruCs::new(65536);
348 cs.insert(
349 Bytes::from_static(b"v"),
350 arc_name(&["a", "b", "1"]),
351 meta_fresh(),
352 )
353 .await;
354 let entry = cs.get(&interest_can_be_prefix(&["a", "b"])).await;
355 assert!(entry.is_some());
356 }
357
358 #[tokio::test]
359 async fn can_be_prefix_miss_for_unrelated_name() {
360 let cs = LruCs::new(65536);
361 cs.insert(
362 Bytes::from_static(b"v"),
363 arc_name(&["x", "y"]),
364 meta_fresh(),
365 )
366 .await;
367 assert!(cs.get(&interest_can_be_prefix(&["a", "b"])).await.is_none());
368 }
369
370 #[tokio::test]
373 async fn evict_removes_entry() {
374 let cs = LruCs::new(65536);
375 let name = arc_name(&["a"]);
376 cs.insert(Bytes::from_static(b"x"), name.clone(), meta_fresh())
377 .await;
378 let removed = cs.evict(&name).await;
379 assert!(removed);
380 assert!(cs.get(&interest(&["a"])).await.is_none());
381 }
382
383 #[tokio::test]
384 async fn evict_nonexistent_returns_false() {
385 let cs = LruCs::new(65536);
386 assert!(!cs.evict(&arc_name(&["z"])).await);
387 }
388
389 #[tokio::test]
390 async fn evict_removes_from_prefix_index() {
391 let cs = LruCs::new(65536);
392 cs.insert(
393 Bytes::from_static(b"x"),
394 arc_name(&["a", "b"]),
395 meta_fresh(),
396 )
397 .await;
398 cs.evict(&arc_name(&["a", "b"])).await;
399 assert!(cs.get(&interest_can_be_prefix(&["a"])).await.is_none());
401 }
402
403 #[tokio::test]
406 async fn capacity_is_reported() {
407 let cs = LruCs::new(1024);
408 assert_eq!(cs.capacity().max_bytes, 1024);
409 }
410
411 #[tokio::test]
412 async fn lru_eviction_keeps_byte_count_bounded() {
413 let cs = LruCs::new(20);
415 cs.insert(Bytes::from(vec![0u8; 10]), arc_name(&["a"]), meta_fresh())
416 .await;
417 cs.insert(Bytes::from(vec![0u8; 10]), arc_name(&["b"]), meta_fresh())
418 .await;
419 cs.insert(Bytes::from(vec![0u8; 10]), arc_name(&["c"]), meta_fresh())
421 .await;
422 assert!(cs.get(&interest(&["a"])).await.is_none());
423 assert!(cs.get(&interest(&["b"])).await.is_some());
424 assert!(cs.get(&interest(&["c"])).await.is_some());
425 }
426
427 #[tokio::test]
430 async fn implicit_digest_lookup_matches() {
431 let cs = LruCs::new(65536);
432 let data_bytes = Bytes::from_static(b"wire-format-data");
433 let name = arc_name(&["a", "b"]);
434 cs.insert(data_bytes.clone(), name.clone(), meta_fresh())
435 .await;
436
437 let digest = ring::digest::digest(&ring::digest::SHA256, &data_bytes);
439 let mut comps: Vec<NameComponent> = name.components().to_vec();
440 comps.push(NameComponent {
441 typ: ndn_packet::tlv_type::IMPLICIT_SHA256,
442 value: Bytes::copy_from_slice(digest.as_ref()),
443 });
444 let interest_name = Name::from_components(comps);
445 let i = Interest::new(interest_name);
446 let entry = cs.get(&i).await.expect("implicit digest hit");
447 assert_eq!(entry.data.as_ref(), b"wire-format-data");
448 }
449
450 #[tokio::test]
451 async fn implicit_digest_wrong_hash_misses() {
452 let cs = LruCs::new(65536);
453 cs.insert(Bytes::from_static(b"data"), arc_name(&["a"]), meta_fresh())
454 .await;
455
456 let mut comps: Vec<NameComponent> = arc_name(&["a"]).components().to_vec();
458 comps.push(NameComponent {
459 typ: ndn_packet::tlv_type::IMPLICIT_SHA256,
460 value: Bytes::from_static(&[0u8; 32]),
461 });
462 let i = Interest::new(Name::from_components(comps));
463 assert!(cs.get(&i).await.is_none());
464 }
465
466 #[tokio::test]
467 async fn lru_eviction_removes_from_prefix_index() {
468 let cs = LruCs::new(20);
469 cs.insert(
470 Bytes::from(vec![0u8; 10]),
471 arc_name(&["a", "b"]),
472 meta_fresh(),
473 )
474 .await;
475 cs.insert(
476 Bytes::from(vec![0u8; 10]),
477 arc_name(&["b", "c"]),
478 meta_fresh(),
479 )
480 .await;
481 cs.insert(
483 Bytes::from(vec![0u8; 10]),
484 arc_name(&["c", "d"]),
485 meta_fresh(),
486 )
487 .await;
488 assert!(cs.get(&interest_can_be_prefix(&["a"])).await.is_none());
490 }
491
492 #[tokio::test]
495 async fn len_tracks_entries() {
496 let cs = LruCs::new(65536);
497 assert_eq!(cs.len(), 0);
498 cs.insert(Bytes::from_static(b"x"), arc_name(&["a"]), meta_fresh())
499 .await;
500 assert_eq!(cs.len(), 1);
501 cs.insert(Bytes::from_static(b"y"), arc_name(&["b"]), meta_fresh())
502 .await;
503 assert_eq!(cs.len(), 2);
504 cs.evict(&arc_name(&["a"])).await;
505 assert_eq!(cs.len(), 1);
506 }
507
508 #[tokio::test]
509 async fn set_capacity_evicts_excess() {
510 let cs = LruCs::new(100);
511 cs.insert(Bytes::from(vec![0u8; 40]), arc_name(&["a"]), meta_fresh())
512 .await;
513 cs.insert(Bytes::from(vec![0u8; 40]), arc_name(&["b"]), meta_fresh())
514 .await;
515 assert_eq!(cs.len(), 2);
516 cs.set_capacity(50);
518 assert_eq!(cs.capacity().max_bytes, 50);
519 assert_eq!(cs.len(), 1);
520 assert!(cs.get(&interest(&["b"])).await.is_some());
522 }
523
524 #[tokio::test]
525 async fn variant_name_is_lru() {
526 let cs = LruCs::new(1024);
527 assert_eq!(cs.variant_name(), "lru");
528 }
529
530 #[tokio::test]
531 async fn evict_prefix_removes_matching_entries() {
532 let cs = LruCs::new(65536);
533 cs.insert(
534 Bytes::from_static(b"1"),
535 arc_name(&["a", "b", "1"]),
536 meta_fresh(),
537 )
538 .await;
539 cs.insert(
540 Bytes::from_static(b"2"),
541 arc_name(&["a", "b", "2"]),
542 meta_fresh(),
543 )
544 .await;
545 cs.insert(
546 Bytes::from_static(b"3"),
547 arc_name(&["x", "y"]),
548 meta_fresh(),
549 )
550 .await;
551 let name_ab = Name::from_components(
552 ["a", "b"]
553 .iter()
554 .map(|s| NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))),
555 );
556 let evicted = cs.evict_prefix(&name_ab, None).await;
557 assert_eq!(evicted, 2);
558 assert_eq!(cs.len(), 1);
559 assert!(cs.get(&interest(&["x", "y"])).await.is_some());
561 }
562
563 #[tokio::test]
564 async fn evict_prefix_respects_limit() {
565 let cs = LruCs::new(65536);
566 cs.insert(
567 Bytes::from_static(b"1"),
568 arc_name(&["a", "1"]),
569 meta_fresh(),
570 )
571 .await;
572 cs.insert(
573 Bytes::from_static(b"2"),
574 arc_name(&["a", "2"]),
575 meta_fresh(),
576 )
577 .await;
578 cs.insert(
579 Bytes::from_static(b"3"),
580 arc_name(&["a", "3"]),
581 meta_fresh(),
582 )
583 .await;
584 let name_a = Name::from_components(std::iter::once(NameComponent::generic(
585 Bytes::copy_from_slice(b"a"),
586 )));
587 let evicted = cs.evict_prefix(&name_a, Some(1)).await;
588 assert_eq!(evicted, 1);
589 assert_eq!(cs.len(), 2);
590 }
591}