ndn_store/
sharded_cs.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::sync::Arc;
4
5use bytes::Bytes;
6
7use ndn_packet::{Interest, Name};
8
9use crate::{ContentStore, CsCapacity, CsEntry, CsMeta, CsStats, InsertResult};
10
11/// Shards a `ContentStore` across `N` instances to reduce lock contention.
12///
13/// Shard selection is by **first name component** (not full name hash) so that
14/// related content (`/video/seg/1`, `/video/seg/2`) lands in the same shard,
15/// preserving LRU locality for sequential access.
16///
17/// The shard count is the length of the `shards` `Vec` and is fixed at
18/// construction time.
19pub struct ShardedCs<C: ContentStore> {
20    shards: Vec<C>,
21    shard_count: usize,
22}
23
24impl<C: ContentStore> ShardedCs<C> {
25    /// Create a `ShardedCs` from pre-constructed inner stores.
26    ///
27    /// # Panics
28    ///
29    /// Panics if `shards` is empty.
30    pub fn new(shards: Vec<C>) -> Self {
31        let shard_count = shards.len();
32        assert!(shard_count > 0, "ShardedCs requires at least one shard");
33        Self {
34            shards,
35            shard_count,
36        }
37    }
38
39    /// Number of shards.
40    pub fn shard_count(&self) -> usize {
41        self.shard_count
42    }
43
44    fn shard_for(&self, name: &Name) -> usize {
45        match name.components().first() {
46            Some(first) => {
47                let mut h = DefaultHasher::new();
48                first.hash(&mut h);
49                (h.finish() as usize) % self.shard_count
50            }
51            None => 0, // root name → shard 0
52        }
53    }
54}
55
56impl<C: ContentStore> ContentStore for ShardedCs<C> {
57    async fn get(&self, interest: &Interest) -> Option<CsEntry> {
58        let idx = self.shard_for(&interest.name);
59        self.shards[idx].get(interest).await
60    }
61
62    async fn insert(&self, data: Bytes, name: Arc<Name>, meta: CsMeta) -> InsertResult {
63        let idx = self.shard_for(&name);
64        self.shards[idx].insert(data, name, meta).await
65    }
66
67    async fn evict(&self, name: &Name) -> bool {
68        let idx = self.shard_for(name);
69        self.shards[idx].evict(name).await
70    }
71
72    fn capacity(&self) -> CsCapacity {
73        let total: usize = self.shards.iter().map(|s| s.capacity().max_bytes).sum();
74        CsCapacity::bytes(total)
75    }
76
77    fn len(&self) -> usize {
78        self.shards.iter().map(|s| s.len()).sum()
79    }
80
81    fn current_bytes(&self) -> usize {
82        self.shards.iter().map(|s| s.current_bytes()).sum()
83    }
84
85    fn set_capacity(&self, max_bytes: usize) {
86        let per_shard = max_bytes / self.shard_count;
87        for shard in &self.shards {
88            shard.set_capacity(per_shard);
89        }
90    }
91
92    fn variant_name(&self) -> &str {
93        "sharded-lru"
94    }
95
96    async fn evict_prefix(&self, prefix: &Name, limit: Option<usize>) -> usize {
97        // Route to the correct shard based on first component.
98        let idx = self.shard_for(prefix);
99        self.shards[idx].evict_prefix(prefix, limit).await
100    }
101
102    fn stats(&self) -> CsStats {
103        let mut combined = CsStats::default();
104        for shard in &self.shards {
105            let s = shard.stats();
106            combined.hits += s.hits;
107            combined.misses += s.misses;
108            combined.inserts += s.inserts;
109            combined.evictions += s.evictions;
110        }
111        combined
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use crate::LruCs;
119    use ndn_packet::NameComponent;
120
121    fn arc_name(components: &[&str]) -> Arc<Name> {
122        Arc::new(Name::from_components(components.iter().map(|s| {
123            NameComponent::generic(Bytes::copy_from_slice(s.as_bytes()))
124        })))
125    }
126
127    fn meta_fresh() -> CsMeta {
128        CsMeta { stale_at: u64::MAX }
129    }
130
131    fn interest(components: &[&str]) -> Interest {
132        Interest::new((*arc_name(components)).clone())
133    }
134
135    fn make_sharded(n: usize, shard_bytes: usize) -> ShardedCs<LruCs> {
136        ShardedCs::new((0..n).map(|_| LruCs::new(shard_bytes)).collect())
137    }
138
139    // ── construction ─────────────────────────────────────────────────────────
140
141    #[test]
142    fn shard_count_reported() {
143        let cs = make_sharded(4, 1024);
144        assert_eq!(cs.shard_count(), 4);
145    }
146
147    #[test]
148    fn capacity_is_sum_of_shards() {
149        let cs = make_sharded(4, 1024);
150        assert_eq!(cs.capacity().max_bytes, 4 * 1024);
151    }
152
153    #[test]
154    #[should_panic]
155    fn empty_shards_panics() {
156        let _cs: ShardedCs<LruCs> = ShardedCs::new(vec![]);
157    }
158
159    // ── insert / get ──────────────────────────────────────────────────────────
160
161    #[tokio::test]
162    async fn insert_then_get_roundtrip() {
163        let cs = make_sharded(4, 65536);
164        let name = arc_name(&["edu", "ucla", "data"]);
165        cs.insert(Bytes::from_static(b"payload"), name.clone(), meta_fresh())
166            .await;
167        let entry = cs.get(&interest(&["edu", "ucla", "data"])).await.unwrap();
168        assert_eq!(entry.data.as_ref(), b"payload");
169    }
170
171    #[tokio::test]
172    async fn miss_returns_none() {
173        let cs = make_sharded(2, 65536);
174        assert!(cs.get(&interest(&["x"])).await.is_none());
175    }
176
177    #[tokio::test]
178    async fn names_with_same_first_component_land_in_same_shard() {
179        // /a/1 and /a/2 share first component → same shard → both accessible.
180        let cs = make_sharded(4, 65536);
181        cs.insert(
182            Bytes::from_static(b"v1"),
183            arc_name(&["a", "1"]),
184            meta_fresh(),
185        )
186        .await;
187        cs.insert(
188            Bytes::from_static(b"v2"),
189            arc_name(&["a", "2"]),
190            meta_fresh(),
191        )
192        .await;
193        assert!(cs.get(&interest(&["a", "1"])).await.is_some());
194        assert!(cs.get(&interest(&["a", "2"])).await.is_some());
195    }
196
197    // ── evict ─────────────────────────────────────────────────────────────────
198
199    #[tokio::test]
200    async fn evict_removes_entry() {
201        let cs = make_sharded(2, 65536);
202        let name = arc_name(&["b", "1"]);
203        cs.insert(Bytes::from_static(b"v"), name.clone(), meta_fresh())
204            .await;
205        assert!(cs.evict(&name).await);
206        assert!(cs.get(&interest(&["b", "1"])).await.is_none());
207    }
208
209    #[tokio::test]
210    async fn evict_nonexistent_returns_false() {
211        let cs = make_sharded(2, 65536);
212        assert!(!cs.evict(&arc_name(&["z"])).await);
213    }
214
215    // ── single shard ──────────────────────────────────────────────────────────
216
217    #[tokio::test]
218    async fn single_shard_works() {
219        let cs = make_sharded(1, 65536);
220        cs.insert(Bytes::from_static(b"data"), arc_name(&["a"]), meta_fresh())
221            .await;
222        assert!(cs.get(&interest(&["a"])).await.is_some());
223    }
224}