1use std::sync::Arc;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use tracing::trace;
5
6use crate::pipeline::{Action, DecodedPacket, PacketContext};
7use ndn_packet::CachePolicyType;
8use ndn_store::{CsAdmissionPolicy, CsMeta, ErasedContentStore};
9
10use crate::stages::decode::LpCachePolicy;
11
12pub struct CsLookupStage {
21 pub cs: Arc<dyn ErasedContentStore>,
22}
23
24impl CsLookupStage {
25 pub async fn process(&self, mut ctx: PacketContext) -> Action {
26 let interest = match &ctx.packet {
27 DecodedPacket::Interest(i) => i,
28 _ => return Action::Continue(ctx),
30 };
31
32 if let Some(entry) = self.cs.get_erased(interest).await {
33 trace!(face=%ctx.face_id, name=?ctx.name, "cs-lookup: HIT");
34 ctx.cs_hit = true;
35 ctx.out_faces.push(ctx.face_id);
36 ctx.tags.insert(entry);
37 Action::Satisfy(ctx)
38 } else {
39 trace!(face=%ctx.face_id, name=?ctx.name, "cs-lookup: MISS");
40 Action::Continue(ctx)
41 }
42 }
43}
44
45pub struct CsInsertStage {
52 pub cs: Arc<dyn ErasedContentStore>,
53 pub admission: Arc<dyn CsAdmissionPolicy>,
54}
55
56impl CsInsertStage {
57 pub async fn process(&self, ctx: PacketContext) -> Action {
58 if let DecodedPacket::Data(ref data) = ctx.packet {
59 if ctx
61 .tags
62 .get::<LpCachePolicy>()
63 .is_some_and(|p| matches!(p.0, CachePolicyType::NoCache))
64 {
65 trace!(name=%data.name, "cs-insert: NoCache LP policy, skipping");
66 return Action::Satisfy(ctx);
67 }
68
69 if !self.admission.should_admit(data) {
70 trace!(name=%data.name, "cs-insert: rejected by admission policy");
71 return Action::Satisfy(ctx);
72 }
73
74 let now_ns = SystemTime::now()
75 .duration_since(UNIX_EPOCH)
76 .unwrap_or_default()
77 .as_nanos() as u64;
78
79 let freshness_ms = data
80 .meta_info()
81 .and_then(|m| m.freshness_period)
82 .map(|d| d.as_millis() as u64)
83 .unwrap_or(0);
84 let stale_at = now_ns + freshness_ms * 1_000_000;
85
86 let meta = CsMeta { stale_at };
87 self.cs
88 .insert_erased(ctx.raw_bytes.clone(), data.name.clone(), meta)
89 .await;
90 trace!(name=%data.name, freshness_ms, "cs-insert: cached");
91 }
92 Action::Satisfy(ctx)
93 }
94}