ndn_engine/stages/
cs.rs

1use std::sync::Arc;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use tracing::trace;
5
6use crate::pipeline::{Action, DecodedPacket, PacketContext};
7use ndn_packet::CachePolicyType;
8use ndn_store::{CsAdmissionPolicy, CsMeta, ErasedContentStore};
9
10use crate::stages::decode::LpCachePolicy;
11
12/// Look up the CS before hitting the PIT/FIB.
13///
14/// On a cache hit: stores `CsEntry` in `ctx.tags`, sets `ctx.cs_hit = true`,
15/// sets `ctx.out_faces = [ctx.face_id]`, and returns `Action::Satisfy(ctx)`
16/// so the dispatcher fans the cached Data back to the requesting face without
17/// touching the PIT.
18///
19/// On a miss: `Action::Continue(ctx)` to proceed to `PitCheckStage`.
20pub struct CsLookupStage {
21    pub cs: Arc<dyn ErasedContentStore>,
22}
23
24impl CsLookupStage {
25    pub async fn process(&self, mut ctx: PacketContext) -> Action {
26        let interest = match &ctx.packet {
27            DecodedPacket::Interest(i) => i,
28            // CS lookup only applies to Interests.
29            _ => return Action::Continue(ctx),
30        };
31
32        if let Some(entry) = self.cs.get_erased(interest).await {
33            trace!(face=%ctx.face_id, name=?ctx.name, "cs-lookup: HIT");
34            ctx.cs_hit = true;
35            ctx.out_faces.push(ctx.face_id);
36            ctx.tags.insert(entry);
37            Action::Satisfy(ctx)
38        } else {
39            trace!(face=%ctx.face_id, name=?ctx.name, "cs-lookup: MISS");
40            Action::Continue(ctx)
41        }
42    }
43}
44
45/// Insert Data into the CS after a successful PIT match.
46///
47/// Reads `ctx.raw_bytes` (the wire-format Data) and the decoded name.
48/// Freshness defaults to 0 (immediately stale) if `FreshnessPeriod` is absent.
49/// The admission policy is consulted before inserting — Data that fails the
50/// policy check is not cached.
51pub struct CsInsertStage {
52    pub cs: Arc<dyn ErasedContentStore>,
53    pub admission: Arc<dyn CsAdmissionPolicy>,
54}
55
56impl CsInsertStage {
57    pub async fn process(&self, ctx: PacketContext) -> Action {
58        if let DecodedPacket::Data(ref data) = ctx.packet {
59            // NDNLPv2 CachePolicy::NoCache overrides admission.
60            if ctx
61                .tags
62                .get::<LpCachePolicy>()
63                .is_some_and(|p| matches!(p.0, CachePolicyType::NoCache))
64            {
65                trace!(name=%data.name, "cs-insert: NoCache LP policy, skipping");
66                return Action::Satisfy(ctx);
67            }
68
69            if !self.admission.should_admit(data) {
70                trace!(name=%data.name, "cs-insert: rejected by admission policy");
71                return Action::Satisfy(ctx);
72            }
73
74            let now_ns = SystemTime::now()
75                .duration_since(UNIX_EPOCH)
76                .unwrap_or_default()
77                .as_nanos() as u64;
78
79            let freshness_ms = data
80                .meta_info()
81                .and_then(|m| m.freshness_period)
82                .map(|d| d.as_millis() as u64)
83                .unwrap_or(0);
84            let stale_at = now_ns + freshness_ms * 1_000_000;
85
86            let meta = CsMeta { stale_at };
87            self.cs
88                .insert_erased(ctx.raw_bytes.clone(), data.name.clone(), meta)
89                .await;
90            trace!(name=%data.name, freshness_ms, "cs-insert: cached");
91        }
92        Action::Satisfy(ctx)
93    }
94}