ndn_store/
observable_cs.rs1use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use bytes::Bytes;
7
8use ndn_packet::{Interest, Name};
9
10use crate::{CsCapacity, CsEntry, CsMeta, CsStats, ErasedContentStore, InsertResult};
11
12#[derive(Debug)]
14pub enum CsEvent {
15 Hit { name: Arc<Name> },
16 Miss { name: Arc<Name> },
17 Insert { name: Arc<Name>, bytes: usize },
18 Evict { name: Arc<Name> },
19}
20
21pub trait CsObserver: Send + Sync + 'static {
26 fn on_event(&self, event: CsEvent);
27}
28
29struct CsStatsCounters {
31 hits: AtomicU64,
32 misses: AtomicU64,
33 inserts: AtomicU64,
34 evictions: AtomicU64,
35}
36
37impl CsStatsCounters {
38 fn new() -> Self {
39 Self {
40 hits: AtomicU64::new(0),
41 misses: AtomicU64::new(0),
42 inserts: AtomicU64::new(0),
43 evictions: AtomicU64::new(0),
44 }
45 }
46
47 fn snapshot(&self) -> CsStats {
48 CsStats {
49 hits: self.hits.load(Ordering::Relaxed),
50 misses: self.misses.load(Ordering::Relaxed),
51 inserts: self.inserts.load(Ordering::Relaxed),
52 evictions: self.evictions.load(Ordering::Relaxed),
53 }
54 }
55}
56
57pub struct ObservableCs {
63 inner: Arc<dyn ErasedContentStore>,
64 observer: Option<Arc<dyn CsObserver>>,
65 counters: CsStatsCounters,
66}
67
68impl ObservableCs {
69 pub fn new(inner: Arc<dyn ErasedContentStore>, observer: Option<Arc<dyn CsObserver>>) -> Self {
70 Self {
71 inner,
72 observer,
73 counters: CsStatsCounters::new(),
74 }
75 }
76
77 fn emit(&self, event: CsEvent) {
78 if let Some(ref obs) = self.observer {
79 obs.on_event(event);
80 }
81 }
82}
83
84impl ErasedContentStore for ObservableCs {
85 fn get_erased<'a>(
86 &'a self,
87 interest: &'a Interest,
88 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<CsEntry>> + Send + 'a>> {
89 Box::pin(async move {
90 let result = self.inner.get_erased(interest).await;
91 let name = Arc::clone(&interest.name);
92 if result.is_some() {
93 self.counters.hits.fetch_add(1, Ordering::Relaxed);
94 self.emit(CsEvent::Hit { name });
95 } else {
96 self.counters.misses.fetch_add(1, Ordering::Relaxed);
97 self.emit(CsEvent::Miss { name });
98 }
99 result
100 })
101 }
102
103 fn insert_erased(
104 &self,
105 data: Bytes,
106 name: Arc<Name>,
107 meta: CsMeta,
108 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = InsertResult> + Send + '_>> {
109 Box::pin(async move {
110 let bytes = data.len();
111 let result = self.inner.insert_erased(data, name.clone(), meta).await;
112 if result != InsertResult::Skipped {
113 self.counters.inserts.fetch_add(1, Ordering::Relaxed);
114 self.emit(CsEvent::Insert { name, bytes });
115 }
116 result
117 })
118 }
119
120 fn evict_erased<'a>(
121 &'a self,
122 name: &'a Name,
123 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send + 'a>> {
124 Box::pin(async move {
125 let removed = self.inner.evict_erased(name).await;
126 if removed {
127 self.counters.evictions.fetch_add(1, Ordering::Relaxed);
128 self.emit(CsEvent::Evict {
129 name: Arc::new(name.clone()),
130 });
131 }
132 removed
133 })
134 }
135
136 fn evict_prefix_erased<'a>(
137 &'a self,
138 prefix: &'a Name,
139 limit: Option<usize>,
140 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = usize> + Send + 'a>> {
141 Box::pin(async move {
142 let evicted = self.inner.evict_prefix_erased(prefix, limit).await;
143 self.counters
144 .evictions
145 .fetch_add(evicted as u64, Ordering::Relaxed);
146 evicted
147 })
148 }
149
150 fn capacity(&self) -> CsCapacity {
151 self.inner.capacity()
152 }
153
154 fn set_capacity(&self, max_bytes: usize) {
155 self.inner.set_capacity(max_bytes);
156 }
157
158 fn len(&self) -> usize {
159 self.inner.len()
160 }
161
162 fn is_empty(&self) -> bool {
163 self.inner.is_empty()
164 }
165
166 fn current_bytes(&self) -> usize {
167 self.inner.current_bytes()
168 }
169
170 fn variant_name(&self) -> &str {
171 self.inner.variant_name()
172 }
173
174 fn stats(&self) -> CsStats {
175 self.counters.snapshot()
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::LruCs;
183 use ndn_packet::NameComponent;
184 use std::sync::atomic::AtomicUsize;
185
186 fn arc_name(components: &[&str]) -> Arc<Name> {
187 Arc::new(Name::from_components(components.iter().map(|s| {
188 NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))
189 })))
190 }
191
192 fn interest(components: &[&str]) -> Interest {
193 Interest::new((*arc_name(components)).clone())
194 }
195
196 struct CountingObserver {
197 hits: AtomicUsize,
198 misses: AtomicUsize,
199 inserts: AtomicUsize,
200 }
201
202 impl CountingObserver {
203 fn new() -> Self {
204 Self {
205 hits: AtomicUsize::new(0),
206 misses: AtomicUsize::new(0),
207 inserts: AtomicUsize::new(0),
208 }
209 }
210 }
211
212 impl CsObserver for CountingObserver {
213 fn on_event(&self, event: CsEvent) {
214 match event {
215 CsEvent::Hit { .. } => {
216 self.hits.fetch_add(1, Ordering::Relaxed);
217 }
218 CsEvent::Miss { .. } => {
219 self.misses.fetch_add(1, Ordering::Relaxed);
220 }
221 CsEvent::Insert { .. } => {
222 self.inserts.fetch_add(1, Ordering::Relaxed);
223 }
224 CsEvent::Evict { .. } => {}
225 }
226 }
227 }
228
229 #[tokio::test]
230 async fn observable_tracks_hits_and_misses() {
231 let observer = Arc::new(CountingObserver::new());
232 let inner: Arc<dyn ErasedContentStore> = Arc::new(LruCs::new(65536));
233 let cs = ObservableCs::new(inner, Some(Arc::clone(&observer) as _));
234
235 cs.get_erased(&interest(&["a"])).await;
237 assert_eq!(observer.misses.load(Ordering::Relaxed), 1);
238
239 cs.insert_erased(
241 Bytes::from_static(b"data"),
242 arc_name(&["a"]),
243 CsMeta { stale_at: u64::MAX },
244 )
245 .await;
246 cs.get_erased(&interest(&["a"])).await;
247 assert_eq!(observer.hits.load(Ordering::Relaxed), 1);
248 assert_eq!(observer.inserts.load(Ordering::Relaxed), 1);
249 }
250
251 #[tokio::test]
252 async fn stats_reflect_operations() {
253 let inner: Arc<dyn ErasedContentStore> = Arc::new(LruCs::new(65536));
254 let cs = ObservableCs::new(inner, None);
255 cs.insert_erased(
256 Bytes::from_static(b"x"),
257 arc_name(&["a"]),
258 CsMeta { stale_at: u64::MAX },
259 )
260 .await;
261 cs.get_erased(&interest(&["a"])).await; cs.get_erased(&interest(&["b"])).await; let stats = cs.stats();
265 assert_eq!(stats.inserts, 1);
266 assert_eq!(stats.hits, 1);
267 assert_eq!(stats.misses, 1);
268 }
269}