1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::sync::Arc;
4
5use bytes::Bytes;
6
7use ndn_packet::{Interest, Name};
8
9use crate::{ContentStore, CsCapacity, CsEntry, CsMeta, CsStats, InsertResult};
10
11pub struct ShardedCs<C: ContentStore> {
20 shards: Vec<C>,
21 shard_count: usize,
22}
23
24impl<C: ContentStore> ShardedCs<C> {
25 pub fn new(shards: Vec<C>) -> Self {
31 let shard_count = shards.len();
32 assert!(shard_count > 0, "ShardedCs requires at least one shard");
33 Self {
34 shards,
35 shard_count,
36 }
37 }
38
39 pub fn shard_count(&self) -> usize {
41 self.shard_count
42 }
43
44 fn shard_for(&self, name: &Name) -> usize {
45 match name.components().first() {
46 Some(first) => {
47 let mut h = DefaultHasher::new();
48 first.hash(&mut h);
49 (h.finish() as usize) % self.shard_count
50 }
51 None => 0, }
53 }
54}
55
56impl<C: ContentStore> ContentStore for ShardedCs<C> {
57 async fn get(&self, interest: &Interest) -> Option<CsEntry> {
58 let idx = self.shard_for(&interest.name);
59 self.shards[idx].get(interest).await
60 }
61
62 async fn insert(&self, data: Bytes, name: Arc<Name>, meta: CsMeta) -> InsertResult {
63 let idx = self.shard_for(&name);
64 self.shards[idx].insert(data, name, meta).await
65 }
66
67 async fn evict(&self, name: &Name) -> bool {
68 let idx = self.shard_for(name);
69 self.shards[idx].evict(name).await
70 }
71
72 fn capacity(&self) -> CsCapacity {
73 let total: usize = self.shards.iter().map(|s| s.capacity().max_bytes).sum();
74 CsCapacity::bytes(total)
75 }
76
77 fn len(&self) -> usize {
78 self.shards.iter().map(|s| s.len()).sum()
79 }
80
81 fn current_bytes(&self) -> usize {
82 self.shards.iter().map(|s| s.current_bytes()).sum()
83 }
84
85 fn set_capacity(&self, max_bytes: usize) {
86 let per_shard = max_bytes / self.shard_count;
87 for shard in &self.shards {
88 shard.set_capacity(per_shard);
89 }
90 }
91
92 fn variant_name(&self) -> &str {
93 "sharded-lru"
94 }
95
96 async fn evict_prefix(&self, prefix: &Name, limit: Option<usize>) -> usize {
97 let idx = self.shard_for(prefix);
99 self.shards[idx].evict_prefix(prefix, limit).await
100 }
101
102 fn stats(&self) -> CsStats {
103 let mut combined = CsStats::default();
104 for shard in &self.shards {
105 let s = shard.stats();
106 combined.hits += s.hits;
107 combined.misses += s.misses;
108 combined.inserts += s.inserts;
109 combined.evictions += s.evictions;
110 }
111 combined
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use crate::LruCs;
119 use ndn_packet::NameComponent;
120
121 fn arc_name(components: &[&str]) -> Arc<Name> {
122 Arc::new(Name::from_components(components.iter().map(|s| {
123 NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))
124 })))
125 }
126
127 fn meta_fresh() -> CsMeta {
128 CsMeta { stale_at: u64::MAX }
129 }
130
131 fn interest(components: &[&str]) -> Interest {
132 Interest::new((*arc_name(components)).clone())
133 }
134
135 fn make_sharded(n: usize, shard_bytes: usize) -> ShardedCs<LruCs> {
136 ShardedCs::new((0..n).map(|_| LruCs::new(shard_bytes)).collect())
137 }
138
139 #[test]
142 fn shard_count_reported() {
143 let cs = make_sharded(4, 1024);
144 assert_eq!(cs.shard_count(), 4);
145 }
146
147 #[test]
148 fn capacity_is_sum_of_shards() {
149 let cs = make_sharded(4, 1024);
150 assert_eq!(cs.capacity().max_bytes, 4 * 1024);
151 }
152
153 #[test]
154 #[should_panic]
155 fn empty_shards_panics() {
156 let _cs: ShardedCs<LruCs> = ShardedCs::new(vec![]);
157 }
158
159 #[tokio::test]
162 async fn insert_then_get_roundtrip() {
163 let cs = make_sharded(4, 65536);
164 let name = arc_name(&["edu", "ucla", "data"]);
165 cs.insert(Bytes::from_static(b"payload"), name.clone(), meta_fresh())
166 .await;
167 let entry = cs.get(&interest(&["edu", "ucla", "data"])).await.unwrap();
168 assert_eq!(entry.data.as_ref(), b"payload");
169 }
170
171 #[tokio::test]
172 async fn miss_returns_none() {
173 let cs = make_sharded(2, 65536);
174 assert!(cs.get(&interest(&["x"])).await.is_none());
175 }
176
177 #[tokio::test]
178 async fn names_with_same_first_component_land_in_same_shard() {
179 let cs = make_sharded(4, 65536);
181 cs.insert(
182 Bytes::from_static(b"v1"),
183 arc_name(&["a", "1"]),
184 meta_fresh(),
185 )
186 .await;
187 cs.insert(
188 Bytes::from_static(b"v2"),
189 arc_name(&["a", "2"]),
190 meta_fresh(),
191 )
192 .await;
193 assert!(cs.get(&interest(&["a", "1"])).await.is_some());
194 assert!(cs.get(&interest(&["a", "2"])).await.is_some());
195 }
196
197 #[tokio::test]
200 async fn evict_removes_entry() {
201 let cs = make_sharded(2, 65536);
202 let name = arc_name(&["b", "1"]);
203 cs.insert(Bytes::from_static(b"v"), name.clone(), meta_fresh())
204 .await;
205 assert!(cs.evict(&name).await);
206 assert!(cs.get(&interest(&["b", "1"])).await.is_none());
207 }
208
209 #[tokio::test]
210 async fn evict_nonexistent_returns_false() {
211 let cs = make_sharded(2, 65536);
212 assert!(!cs.evict(&arc_name(&["z"])).await);
213 }
214
215 #[tokio::test]
218 async fn single_shard_works() {
219 let cs = make_sharded(1, 65536);
220 cs.insert(Bytes::from_static(b"data"), arc_name(&["a"]), meta_fresh())
221 .await;
222 assert!(cs.get(&interest(&["a"])).await.is_some());
223 }
224}