ndn_engine/stages/
pit.rs

1use std::sync::Arc;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use smallvec::SmallVec;
5use tracing::trace;
6
7use crate::pipeline::{Action, DecodedPacket, DropReason, PacketContext};
8use ndn_packet::Selector;
9use ndn_store::{NameHashes, Pit, PitEntry, PitToken};
10use ndn_transport::FaceId;
11
12/// Checks the PIT for a pending Interest.
13///
14/// **Duplicate suppression:** if the nonce has already been seen in the PIT
15/// entry, the Interest is a loop — drop it.
16///
17/// **Aggregation:** if a PIT entry already exists for the same (name, selector),
18/// add an in-record and return `Action::Drop` (the original forwarder already
19/// has an outstanding Interest; no need to forward again).
20///
21/// **New entry:** create a PIT entry, write `ctx.pit_token`, continue to
22/// `StrategyStage`.
23pub struct PitCheckStage {
24    pub pit: Arc<Pit>,
25}
26
27impl PitCheckStage {
28    pub fn process(&self, mut ctx: PacketContext) -> Action {
29        let interest = match &ctx.packet {
30            DecodedPacket::Interest(i) => i,
31            _ => return Action::Continue(ctx),
32        };
33
34        let now_ns = now_ns();
35        let lifetime_ms = interest
36            .lifetime()
37            .map(|d| d.as_millis() as u64)
38            .unwrap_or(4_000); // NDN default 4 s
39
40        let nonce = interest.nonce().unwrap_or(0);
41        let name_hash = ctx
42            .name_hashes
43            .as_ref()
44            .map(|h| h.full_hash())
45            .unwrap_or_else(|| NameHashes::full_name_hash(&interest.name));
46        let token = PitToken::from_name_hash(
47            name_hash,
48            Some(interest.selectors()),
49            interest.forwarding_hint(),
50        );
51        ctx.pit_token = Some(token);
52
53        enum ExistingResult {
54            Loop,
55            Aggregated,
56        }
57        if let Some(result) = self.pit.with_entry_mut(&token, |entry| {
58            // Loop detection.
59            if entry.nonces_seen.contains(&nonce) {
60                return ExistingResult::Loop;
61            }
62            // Aggregate: add in-record, suppress forwarding.
63            let expires_at = now_ns + lifetime_ms * 1_000_000;
64            entry.add_in_record(ctx.face_id.0, nonce, expires_at, ctx.lp_pit_token.clone());
65            ExistingResult::Aggregated
66        }) {
67            match result {
68                ExistingResult::Loop => {
69                    trace!(face=%ctx.face_id, name=%interest.name, nonce, "pit-check: loop detected");
70                    return Action::Drop(DropReason::LoopDetected);
71                }
72                ExistingResult::Aggregated => {
73                    trace!(face=%ctx.face_id, name=%interest.name, nonce, "pit-check: aggregated (suppressed)");
74                    return Action::Drop(DropReason::Suppressed);
75                }
76            }
77        }
78
79        // New PIT entry.
80        let name = interest.name.clone();
81        let selector = Some(interest.selectors().clone());
82        let mut entry = PitEntry::new(name, selector, now_ns, lifetime_ms);
83        entry.add_in_record(
84            ctx.face_id.0,
85            nonce,
86            now_ns + lifetime_ms * 1_000_000,
87            ctx.lp_pit_token.clone(),
88        );
89        self.pit.insert(token, entry);
90        trace!(face=%ctx.face_id, name=%interest.name, nonce, lifetime_ms, "pit-check: new entry");
91
92        Action::Continue(ctx)
93    }
94}
95
96/// Matches a Data packet against the PIT.
97///
98/// Collects in-record faces into `ctx.out_faces`, removes the PIT entry,
99/// and returns `Action::Continue(ctx)` so `CsInsertStage` can cache the Data.
100///
101/// If no matching PIT entry is found, the Data is unsolicited — drop it.
102pub struct PitMatchStage {
103    pub pit: Arc<Pit>,
104}
105
106impl PitMatchStage {
107    pub fn process(&self, mut ctx: PacketContext) -> Action {
108        let data = match &ctx.packet {
109            DecodedPacket::Data(d) => d,
110            _ => return Action::Continue(ctx),
111        };
112
113        // Pre-computed name hashes eliminate per-probe re-hashing.  For the
114        // full name we use the memoized hash; for CanBePrefix prefix probes
115        // we index into the prefix_hashes array instead of allocating a
116        // temporary Name and hashing it from scratch.
117        let hashes = ctx
118            .name_hashes
119            .get_or_insert_with(|| NameHashes::compute(&data.name));
120
121        // Try all selector combinations to find the PIT entry.
122        //
123        // PitCheck inserts with `from_name_hash(full, Some(selectors()), hint)`.
124        // Since Data packets don't carry selector information, we must probe
125        // all possible (can_be_prefix, must_be_fresh) combinations used at
126        // insertion time.  The default (false, false) is tried first as the
127        // common-case fast path.
128        let selector_probes: &[Option<Selector>] = &[
129            Some(Selector {
130                can_be_prefix: false,
131                must_be_fresh: false,
132            }),
133            Some(Selector {
134                can_be_prefix: true,
135                must_be_fresh: false,
136            }),
137            Some(Selector {
138                can_be_prefix: false,
139                must_be_fresh: true,
140            }),
141            Some(Selector {
142                can_be_prefix: true,
143                must_be_fresh: true,
144            }),
145            None,
146        ];
147
148        let full_hash = hashes.full_hash();
149        for sel in selector_probes {
150            let token = PitToken::from_name_hash(full_hash, sel.as_ref(), None);
151            if let Some((_, entry)) = self.pit.remove(&token) {
152                let faces: SmallVec<[FaceId; 4]> = entry.in_record_faces().map(FaceId).collect();
153                trace!(face=%ctx.face_id, name=%data.name, out_faces=?faces, "pit-match: satisfied");
154                ctx.out_faces = faces;
155                return Action::Continue(ctx);
156            }
157        }
158
159        // CanBePrefix: the Data name may be longer than the Interest name.
160        // Walk progressively shorter prefixes using pre-computed prefix hashes
161        // instead of allocating temporary Name objects and re-hashing.
162        let can_be_prefix_probes: &[Option<Selector>] = &[
163            Some(Selector {
164                can_be_prefix: true,
165                must_be_fresh: false,
166            }),
167            Some(Selector {
168                can_be_prefix: true,
169                must_be_fresh: true,
170            }),
171            None,
172        ];
173        let n_comps = hashes.len();
174        for prefix_len in (1..n_comps).rev() {
175            let prefix_hash = hashes.prefix_hash(prefix_len);
176            for sel in can_be_prefix_probes {
177                let token = PitToken::from_name_hash(prefix_hash, sel.as_ref(), None);
178                if let Some((_, entry)) = self.pit.remove(&token) {
179                    let faces: SmallVec<[FaceId; 4]> =
180                        entry.in_record_faces().map(FaceId).collect();
181                    trace!(face=%ctx.face_id, name=%data.name, prefix_len,
182                           out_faces=?faces, "pit-match: satisfied (can-be-prefix)");
183                    ctx.out_faces = faces;
184                    return Action::Continue(ctx);
185                }
186            }
187        }
188
189        trace!(face=%ctx.face_id, name=%data.name, "pit-match: unsolicited Data");
190        Action::Drop(DropReason::Other)
191    }
192}
193
194fn now_ns() -> u64 {
195    SystemTime::now()
196        .duration_since(UNIX_EPOCH)
197        .unwrap_or_default()
198        .as_nanos() as u64
199}