ndn_store/
observable_cs.rs

1//! Observable content store wrapper with event hooks and atomic counters.
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use bytes::Bytes;
7
8use ndn_packet::{Interest, Name};
9
10use crate::{CsCapacity, CsEntry, CsMeta, CsStats, ErasedContentStore, InsertResult};
11
12/// Events emitted by an observable content store.
13#[derive(Debug)]
14pub enum CsEvent {
15    Hit { name: Arc<Name> },
16    Miss { name: Arc<Name> },
17    Insert { name: Arc<Name>, bytes: usize },
18    Evict { name: Arc<Name> },
19}
20
21/// Observer that receives CS events.
22///
23/// Implementations must be non-blocking — the observer is called inline
24/// on the hot path. Use a channel or atomic buffer for expensive processing.
25pub trait CsObserver: Send + Sync + 'static {
26    fn on_event(&self, event: CsEvent);
27}
28
29/// Atomic counters for CS hit/miss/insert/eviction tracking.
30struct CsStatsCounters {
31    hits: AtomicU64,
32    misses: AtomicU64,
33    inserts: AtomicU64,
34    evictions: AtomicU64,
35}
36
37impl CsStatsCounters {
38    fn new() -> Self {
39        Self {
40            hits: AtomicU64::new(0),
41            misses: AtomicU64::new(0),
42            inserts: AtomicU64::new(0),
43            evictions: AtomicU64::new(0),
44        }
45    }
46
47    fn snapshot(&self) -> CsStats {
48        CsStats {
49            hits: self.hits.load(Ordering::Relaxed),
50            misses: self.misses.load(Ordering::Relaxed),
51            inserts: self.inserts.load(Ordering::Relaxed),
52            evictions: self.evictions.load(Ordering::Relaxed),
53        }
54    }
55}
56
57/// Wraps any [`ErasedContentStore`] with hit/miss/insert/eviction counters
58/// and an optional [`CsObserver`] callback.
59///
60/// When no observer is registered, the overhead is a single `Option` check
61/// plus an atomic increment per operation.
62pub struct ObservableCs {
63    inner: Arc<dyn ErasedContentStore>,
64    observer: Option<Arc<dyn CsObserver>>,
65    counters: CsStatsCounters,
66}
67
68impl ObservableCs {
69    pub fn new(inner: Arc<dyn ErasedContentStore>, observer: Option<Arc<dyn CsObserver>>) -> Self {
70        Self {
71            inner,
72            observer,
73            counters: CsStatsCounters::new(),
74        }
75    }
76
77    fn emit(&self, event: CsEvent) {
78        if let Some(ref obs) = self.observer {
79            obs.on_event(event);
80        }
81    }
82}
83
84impl ErasedContentStore for ObservableCs {
85    fn get_erased<'a>(
86        &'a self,
87        interest: &'a Interest,
88    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<CsEntry>> + Send + 'a>> {
89        Box::pin(async move {
90            let result = self.inner.get_erased(interest).await;
91            let name = Arc::clone(&interest.name);
92            if result.is_some() {
93                self.counters.hits.fetch_add(1, Ordering::Relaxed);
94                self.emit(CsEvent::Hit { name });
95            } else {
96                self.counters.misses.fetch_add(1, Ordering::Relaxed);
97                self.emit(CsEvent::Miss { name });
98            }
99            result
100        })
101    }
102
103    fn insert_erased(
104        &self,
105        data: Bytes,
106        name: Arc<Name>,
107        meta: CsMeta,
108    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = InsertResult> + Send + '_>> {
109        Box::pin(async move {
110            let bytes = data.len();
111            let result = self.inner.insert_erased(data, name.clone(), meta).await;
112            if result != InsertResult::Skipped {
113                self.counters.inserts.fetch_add(1, Ordering::Relaxed);
114                self.emit(CsEvent::Insert { name, bytes });
115            }
116            result
117        })
118    }
119
120    fn evict_erased<'a>(
121        &'a self,
122        name: &'a Name,
123    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send + 'a>> {
124        Box::pin(async move {
125            let removed = self.inner.evict_erased(name).await;
126            if removed {
127                self.counters.evictions.fetch_add(1, Ordering::Relaxed);
128                self.emit(CsEvent::Evict {
129                    name: Arc::new(name.clone()),
130                });
131            }
132            removed
133        })
134    }
135
136    fn evict_prefix_erased<'a>(
137        &'a self,
138        prefix: &'a Name,
139        limit: Option<usize>,
140    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = usize> + Send + 'a>> {
141        Box::pin(async move {
142            let evicted = self.inner.evict_prefix_erased(prefix, limit).await;
143            self.counters
144                .evictions
145                .fetch_add(evicted as u64, Ordering::Relaxed);
146            evicted
147        })
148    }
149
150    fn capacity(&self) -> CsCapacity {
151        self.inner.capacity()
152    }
153
154    fn set_capacity(&self, max_bytes: usize) {
155        self.inner.set_capacity(max_bytes);
156    }
157
158    fn len(&self) -> usize {
159        self.inner.len()
160    }
161
162    fn is_empty(&self) -> bool {
163        self.inner.is_empty()
164    }
165
166    fn current_bytes(&self) -> usize {
167        self.inner.current_bytes()
168    }
169
170    fn variant_name(&self) -> &str {
171        self.inner.variant_name()
172    }
173
174    fn stats(&self) -> CsStats {
175        self.counters.snapshot()
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::LruCs;
183    use ndn_packet::NameComponent;
184    use std::sync::atomic::AtomicUsize;
185
186    fn arc_name(components: &[&str]) -> Arc<Name> {
187        Arc::new(Name::from_components(components.iter().map(|s| {
188            NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))
189        })))
190    }
191
192    fn interest(components: &[&str]) -> Interest {
193        Interest::new((*arc_name(components)).clone())
194    }
195
196    struct CountingObserver {
197        hits: AtomicUsize,
198        misses: AtomicUsize,
199        inserts: AtomicUsize,
200    }
201
202    impl CountingObserver {
203        fn new() -> Self {
204            Self {
205                hits: AtomicUsize::new(0),
206                misses: AtomicUsize::new(0),
207                inserts: AtomicUsize::new(0),
208            }
209        }
210    }
211
212    impl CsObserver for CountingObserver {
213        fn on_event(&self, event: CsEvent) {
214            match event {
215                CsEvent::Hit { .. } => {
216                    self.hits.fetch_add(1, Ordering::Relaxed);
217                }
218                CsEvent::Miss { .. } => {
219                    self.misses.fetch_add(1, Ordering::Relaxed);
220                }
221                CsEvent::Insert { .. } => {
222                    self.inserts.fetch_add(1, Ordering::Relaxed);
223                }
224                CsEvent::Evict { .. } => {}
225            }
226        }
227    }
228
229    #[tokio::test]
230    async fn observable_tracks_hits_and_misses() {
231        let observer = Arc::new(CountingObserver::new());
232        let inner: Arc<dyn ErasedContentStore> = Arc::new(LruCs::new(65536));
233        let cs = ObservableCs::new(inner, Some(Arc::clone(&observer) as _));
234
235        // Miss
236        cs.get_erased(&interest(&["a"])).await;
237        assert_eq!(observer.misses.load(Ordering::Relaxed), 1);
238
239        // Insert + hit
240        cs.insert_erased(
241            Bytes::from_static(b"data"),
242            arc_name(&["a"]),
243            CsMeta { stale_at: u64::MAX },
244        )
245        .await;
246        cs.get_erased(&interest(&["a"])).await;
247        assert_eq!(observer.hits.load(Ordering::Relaxed), 1);
248        assert_eq!(observer.inserts.load(Ordering::Relaxed), 1);
249    }
250
251    #[tokio::test]
252    async fn stats_reflect_operations() {
253        let inner: Arc<dyn ErasedContentStore> = Arc::new(LruCs::new(65536));
254        let cs = ObservableCs::new(inner, None);
255        cs.insert_erased(
256            Bytes::from_static(b"x"),
257            arc_name(&["a"]),
258            CsMeta { stale_at: u64::MAX },
259        )
260        .await;
261        cs.get_erased(&interest(&["a"])).await; // hit
262        cs.get_erased(&interest(&["b"])).await; // miss
263
264        let stats = cs.stats();
265        assert_eq!(stats.inserts, 1);
266        assert_eq!(stats.hits, 1);
267        assert_eq!(stats.misses, 1);
268    }
269}