ndn_engine/
builder.rs

1use std::sync::{Arc, OnceLock};
2
3use anyhow::Result;
4use ndn_discovery::{DiscoveryProtocol, NeighborTable, NoDiscovery};
5use tokio::task::JoinSet;
6use tokio_util::sync::CancellationToken;
7
8use ndn_packet::Name;
9use ndn_security::{
10    CertCache, CertFetcher, SchemaRule, SecurityManager, SecurityProfile, TrustSchema, Validator,
11};
12use ndn_store::{
13    CsAdmissionPolicy, CsObserver, ErasedContentStore, LruCs, ObservableCs, Pit, StrategyTable,
14};
15use ndn_strategy::{BestRouteStrategy, MeasurementsTable};
16use ndn_transport::{Face, FaceTable};
17
18use crate::{
19    Fib, ForwarderEngine,
20    discovery_context::EngineDiscoveryContext,
21    dispatcher::PacketDispatcher,
22    engine::{EngineInner, ShutdownHandle},
23    enricher::ContextEnricher,
24    rib::Rib,
25    routing::{RoutingManager, RoutingProtocol},
26    stages::{
27        CsInsertStage, CsLookupStage, ErasedStrategy, PitCheckStage, PitMatchStage, StrategyStage,
28        TlvDecodeStage, ValidationStage,
29    },
30};
31
32/// Configuration for the forwarding engine.
33pub struct EngineConfig {
34    /// Capacity of the inter-task channel (backpressure bound).
35    pub pipeline_channel_cap: usize,
36    /// Content store byte capacity. Zero disables caching.
37    pub cs_capacity_bytes: usize,
38    /// Number of parallel pipeline processing threads.
39    ///
40    /// - `0` (default): auto-detect from available CPU parallelism.
41    /// - `1`: single-threaded — all pipeline processing runs inline in the
42    ///   pipeline runner task (lowest latency, no task spawn overhead).
43    /// - `N > 1`: spawn per-packet tokio tasks so up to N pipeline passes
44    ///   run in parallel across cores (highest throughput with fragmented
45    ///   UDP traffic).
46    pub pipeline_threads: usize,
47}
48
49impl Default for EngineConfig {
50    fn default() -> Self {
51        Self {
52            pipeline_channel_cap: 4096,
53            cs_capacity_bytes: 64 * 1024 * 1024, // 64 MB
54            pipeline_threads: 0,
55        }
56    }
57}
58
59/// Constructs and wires a `ForwarderEngine`.
60pub struct EngineBuilder {
61    config: EngineConfig,
62    face_table: Arc<FaceTable>,
63    faces: Vec<Box<dyn FnOnce(Arc<FaceTable>) + Send>>,
64    strategy: Option<Arc<dyn ErasedStrategy>>,
65    security: Option<Arc<SecurityManager>>,
66    enrichers: Vec<Arc<dyn ContextEnricher>>,
67    cs: Option<Arc<dyn ErasedContentStore>>,
68    admission: Option<Arc<dyn CsAdmissionPolicy>>,
69    cs_observer: Option<Arc<dyn CsObserver>>,
70    security_profile: SecurityProfile,
71    discovery: Option<Arc<dyn DiscoveryProtocol>>,
72    routing_protocols: Vec<Arc<dyn RoutingProtocol>>,
73    /// Static trust schema rules applied at startup in addition to those
74    /// implied by the security profile.
75    schema_rules: Vec<SchemaRule>,
76}
77
78impl EngineBuilder {
79    pub fn new(config: EngineConfig) -> Self {
80        Self {
81            config,
82            face_table: Arc::new(FaceTable::new()),
83            faces: Vec::new(),
84            strategy: None,
85            security: None,
86            enrichers: Vec::new(),
87            cs: None,
88            admission: None,
89            cs_observer: None,
90            security_profile: SecurityProfile::Default,
91            discovery: None,
92            routing_protocols: Vec::new(),
93            schema_rules: Vec::new(),
94        }
95    }
96
97    /// Pre-allocate a `FaceId` from the engine's face table.
98    ///
99    /// This allows callers to know the ID that will be assigned to a face
100    /// *before* calling `build()`, so the ID can be passed to discovery
101    /// protocols or other components at construction time.  The actual face
102    /// object should be added via `builder.face(…)` or
103    /// `engine.add_face_with_persistency(…)` after `build()`.
104    pub fn alloc_face_id(&self) -> ndn_transport::FaceId {
105        self.face_table.alloc_id()
106    }
107
108    /// Register a face to be added at startup.
109    pub fn face<F: Face>(mut self, face: F) -> Self {
110        self.faces.push(Box::new(move |table| {
111            table.insert(face);
112        }));
113        self
114    }
115
116    /// Override the forwarding strategy (default: `BestRouteStrategy`).
117    pub fn strategy<S: ErasedStrategy>(mut self, s: S) -> Self {
118        self.strategy = Some(Arc::new(s));
119        self
120    }
121
122    /// Set the security manager for signing and verification.
123    ///
124    /// When set, the engine exposes the manager via `ForwarderEngine::security()`
125    /// so pipeline stages and the management layer can access it.
126    pub fn security(mut self, mgr: SecurityManager) -> Self {
127        self.security = Some(Arc::new(mgr));
128        self
129    }
130
131    /// Override the content store implementation (default: `LruCs`).
132    pub fn content_store(mut self, cs: Arc<dyn ErasedContentStore>) -> Self {
133        self.cs = Some(cs);
134        self
135    }
136
137    /// Override the CS admission policy (default: `DefaultAdmissionPolicy`).
138    pub fn admission_policy(mut self, policy: Arc<dyn CsAdmissionPolicy>) -> Self {
139        self.admission = Some(policy);
140        self
141    }
142
143    /// Register a CS observer for hit/miss/insert/eviction events.
144    ///
145    /// When set, the CS is wrapped in [`ObservableCs`] which adds atomic
146    /// counters and calls the observer on every operation.
147    pub fn cs_observer(mut self, obs: Arc<dyn CsObserver>) -> Self {
148        self.cs_observer = Some(obs);
149        self
150    }
151
152    /// Set the security profile (default: `SecurityProfile::Default`).
153    ///
154    /// - `Default`: auto-wires validator + cert fetcher from SecurityManager
155    /// - `AcceptSigned`: verify signatures but skip chain walking
156    /// - `Disabled`: no validation (benchmarking only)
157    /// - `Custom(v)`: use a caller-provided validator
158    pub fn security_profile(mut self, p: SecurityProfile) -> Self {
159        self.security_profile = p;
160        self
161    }
162
163    /// Add a static trust schema rule loaded at startup.
164    ///
165    /// Rules are added to the validator's schema after the profile's default
166    /// rules are applied. Call this once per rule, or apply many rules by
167    /// iterating over a config list:
168    ///
169    /// ```rust,ignore
170    /// for rule in &config.security.rules {
171    ///     if let Ok(r) = SchemaRule::parse(&format!("{} => {}", rule.data, rule.key)) {
172    ///         builder = builder.schema_rule(r);
173    ///     }
174    /// }
175    /// ```
176    pub fn schema_rule(mut self, rule: SchemaRule) -> Self {
177        self.schema_rules.push(rule);
178        self
179    }
180
181    /// Convenience: set a custom validator directly.
182    pub fn validator(mut self, v: Arc<Validator>) -> Self {
183        self.security_profile = SecurityProfile::Custom(v);
184        self
185    }
186
187    /// Set the discovery protocol (default: [`NoDiscovery`]).
188    ///
189    /// Use [`CompositeDiscovery`] to run multiple protocols simultaneously.
190    ///
191    /// [`CompositeDiscovery`]: ndn_discovery::CompositeDiscovery
192    pub fn discovery<D: DiscoveryProtocol>(mut self, d: D) -> Self {
193        self.discovery = Some(Arc::new(d));
194        self
195    }
196
197    /// Set a pre-boxed discovery protocol.
198    pub fn discovery_arc(mut self, d: Arc<dyn DiscoveryProtocol>) -> Self {
199        self.discovery = Some(d);
200        self
201    }
202
203    /// Register a routing protocol to start when the engine is built.
204    ///
205    /// Multiple protocols can be registered; each must use a distinct `origin`
206    /// value. They run as independent Tokio tasks and all write routes into the
207    /// shared RIB. Use [`ForwarderEngine::routing`] after `build()` to enable
208    /// or disable protocols dynamically at runtime.
209    pub fn routing_protocol<P: RoutingProtocol>(mut self, proto: P) -> Self {
210        self.routing_protocols.push(Arc::new(proto));
211        self
212    }
213
214    /// Register a cross-layer context enricher.
215    ///
216    /// Enrichers are called before every strategy invocation to populate
217    /// `StrategyContext::extensions` with data from external sources
218    /// (radio metrics, flow stats, location, etc.).
219    pub fn context_enricher(mut self, e: Arc<dyn ContextEnricher>) -> Self {
220        self.enrichers.push(e);
221        self
222    }
223
224    /// Build the engine, spawn all tasks, and return handles.
225    pub async fn build(self) -> Result<(ForwarderEngine, ShutdownHandle)> {
226        let fib = Arc::new(Fib::new());
227        let rib = Arc::new(Rib::new());
228        let pit = Arc::new(Pit::new());
229        let base_cs: Arc<dyn ErasedContentStore> = self
230            .cs
231            .unwrap_or_else(|| Arc::new(LruCs::new(self.config.cs_capacity_bytes)));
232        let cs: Arc<dyn ErasedContentStore> = if let Some(obs) = self.cs_observer {
233            Arc::new(ObservableCs::new(base_cs, Some(obs)))
234        } else {
235            base_cs
236        };
237        let face_table = self.face_table;
238        let measurements = Arc::new(MeasurementsTable::new());
239
240        // Register pre-configured faces.
241        for add_face in self.faces {
242            add_face(Arc::clone(&face_table));
243        }
244
245        let cancel = CancellationToken::new();
246        let mut tasks = JoinSet::new();
247
248        // PIT expiry task.
249        {
250            let pit_clone = Arc::clone(&pit);
251            let cancel_clone = cancel.clone();
252            tasks.spawn(async move {
253                crate::expiry::run_expiry_task(pit_clone, cancel_clone).await;
254            });
255        }
256
257        // RIB expiry task — drains expired routes and recomputes affected FIB entries.
258        {
259            let rib_clone = Arc::clone(&rib);
260            let fib_clone = Arc::clone(&fib);
261            let cancel_clone = cancel.clone();
262            tasks.spawn(async move {
263                crate::expiry::run_rib_expiry_task(rib_clone, fib_clone, cancel_clone).await;
264            });
265        }
266
267        // Build strategy table with the default strategy at root.
268        let default_strategy: Arc<dyn ErasedStrategy> = self
269            .strategy
270            .unwrap_or_else(|| Arc::new(BestRouteStrategy::new()));
271        let strategy_table = Arc::new(StrategyTable::<dyn ErasedStrategy>::new());
272        strategy_table.insert(&Name::root(), Arc::clone(&default_strategy));
273
274        let face_states = Arc::new(dashmap::DashMap::new());
275
276        // Resolve security profile into validator + cert fetcher.
277        let (validator, cert_fetcher) =
278            resolve_security_profile(self.security_profile, &self.security);
279
280        // Apply static schema rules from config (e.g. [[security.rule]] entries).
281        if let Some(v) = &validator {
282            for rule in self.schema_rules {
283                v.add_schema_rule(rule);
284            }
285        }
286
287        // Keep a clone of the validator Arc for the engine (management API access).
288        let engine_validator = validator.clone();
289
290        // Discovery protocol (default: no-op).
291        let discovery: Arc<dyn DiscoveryProtocol> =
292            self.discovery.unwrap_or_else(|| Arc::new(NoDiscovery));
293        let neighbors = NeighborTable::new();
294
295        // Routing manager — owns the RIB→FIB pipeline and running protocols.
296        let routing = Arc::new(RoutingManager::new(
297            Arc::clone(&rib),
298            Arc::clone(&fib),
299            Arc::clone(&face_table),
300            Arc::clone(&neighbors),
301            cancel.clone(),
302        ));
303
304        // Build EngineInner first. `pipeline_tx` is a `OnceLock` so we can
305        // set it after `PacketDispatcher::spawn()` returns the sender, after
306        // the Arc<EngineInner> is already created (needed for the discovery
307        // context Weak back-reference).
308        let inner = Arc::new(EngineInner {
309            fib: Arc::clone(&fib),
310            rib: Arc::clone(&rib),
311            routing: Arc::clone(&routing),
312            pit: Arc::clone(&pit),
313            cs: Arc::clone(&cs),
314            face_table: Arc::clone(&face_table),
315            measurements: Arc::clone(&measurements),
316            strategy_table: Arc::clone(&strategy_table),
317            security: self.security,
318            validator: engine_validator,
319            pipeline_tx: OnceLock::new(),
320            face_states: Arc::clone(&face_states),
321            discovery: Arc::clone(&discovery),
322            neighbors: Arc::clone(&neighbors),
323            discovery_ctx: OnceLock::new(),
324        });
325
326        // Create the discovery context with a Weak back-reference to break
327        // the reference cycle (EngineInner → Arc<ctx> → Weak<EngineInner>).
328        let discovery_ctx = EngineDiscoveryContext::new(
329            Arc::downgrade(&inner),
330            Arc::clone(&neighbors),
331            cancel.child_token(),
332        );
333        let _ = inner.discovery_ctx.set(Arc::clone(&discovery_ctx));
334
335        let dispatcher = PacketDispatcher {
336            face_table: Arc::clone(&face_table),
337            face_states: Arc::clone(&face_states),
338            rib: Arc::clone(&rib),
339            decode: TlvDecodeStage {
340                face_table: Arc::clone(&face_table),
341                reassembly: dashmap::DashMap::new(),
342            },
343            cs_lookup: CsLookupStage {
344                cs: Arc::clone(&cs),
345            },
346            pit_check: PitCheckStage {
347                pit: Arc::clone(&pit),
348            },
349            strategy: StrategyStage {
350                strategy_table: Arc::clone(&strategy_table),
351                default_strategy: Arc::clone(&default_strategy),
352                fib: Arc::clone(&fib),
353                measurements: Arc::clone(&measurements),
354                pit: Arc::clone(&pit),
355                face_table: Arc::clone(&face_table),
356                enrichers: self.enrichers,
357            },
358            pit_match: PitMatchStage {
359                pit: Arc::clone(&pit),
360            },
361            validation: ValidationStage::new(
362                validator,
363                cert_fetcher,
364                crate::stages::validation::PendingQueueConfig::default(),
365            ),
366            cs_insert: CsInsertStage {
367                cs: Arc::clone(&cs),
368                admission: self
369                    .admission
370                    .unwrap_or_else(|| Arc::new(ndn_store::DefaultAdmissionPolicy)),
371            },
372            channel_cap: self.config.pipeline_channel_cap,
373            pipeline_threads: resolve_pipeline_threads(self.config.pipeline_threads),
374            discovery: Arc::clone(&discovery),
375            discovery_ctx: Arc::clone(&discovery_ctx),
376        };
377
378        let pipeline_tx = dispatcher.spawn(cancel.clone(), &mut tasks);
379
380        // Now that we have the pipeline sender, store it in EngineInner.
381        let _ = inner.pipeline_tx.set(pipeline_tx);
382
383        // Idle face sweep task.
384        {
385            let face_states_clone = Arc::clone(&face_states);
386            let face_table_clone = Arc::clone(&face_table);
387            let fib_clone = Arc::clone(&fib);
388            let rib_clone = Arc::clone(&rib);
389            let cancel_clone = cancel.clone();
390            let d = Arc::clone(&discovery);
391            let ctx = Arc::clone(&discovery_ctx);
392            tasks.spawn(async move {
393                crate::expiry::run_idle_face_task(
394                    face_states_clone,
395                    face_table_clone,
396                    fib_clone,
397                    rib_clone,
398                    cancel_clone,
399                    d,
400                    ctx,
401                )
402                .await;
403            });
404        }
405
406        // Discovery tick task — interval from protocol's tick_interval().
407        {
408            let d = Arc::clone(&discovery);
409            let ctx = Arc::clone(&discovery_ctx);
410            let cancel_clone = cancel.clone();
411            let tick_dur = discovery.tick_interval();
412            tasks.spawn(async move {
413                let mut interval = tokio::time::interval(tick_dur);
414                interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
415                loop {
416                    tokio::select! {
417                        _ = cancel_clone.cancelled() => break,
418                        _ = interval.tick() => {
419                            d.on_tick(std::time::Instant::now(), &*ctx);
420                        }
421                    }
422                }
423            });
424        }
425
426        // Notify discovery about faces registered before build().
427        for face_id in face_table.face_ids() {
428            discovery.on_face_up(face_id, &*discovery_ctx);
429        }
430
431        // Start any routing protocols registered before build().
432        for proto in self.routing_protocols {
433            routing.enable(proto);
434        }
435
436        let engine = ForwarderEngine { inner };
437        let handle = ShutdownHandle { cancel, tasks };
438        Ok((engine, handle))
439    }
440}
441
442/// Resolve a `SecurityProfile` into a concrete validator and optional cert fetcher.
443fn resolve_security_profile(
444    profile: SecurityProfile,
445    security: &Option<Arc<SecurityManager>>,
446) -> (Option<Arc<Validator>>, Option<Arc<CertFetcher>>) {
447    use std::time::Duration;
448
449    match profile {
450        SecurityProfile::Disabled => (None, None),
451
452        SecurityProfile::Custom(v) => (Some(v), None),
453
454        SecurityProfile::AcceptSigned => {
455            let schema = TrustSchema::accept_all();
456            let validator = if let Some(mgr) = security {
457                let cert_cache = Arc::new(CertCache::new());
458                // Pre-populate from manager's cache.
459                // Trust anchors make AcceptSigned succeed for cached certs.
460                let anchors = Arc::new(dashmap::DashMap::new());
461                for name in mgr.trust_anchor_names() {
462                    if let Some(cert) = mgr.trust_anchor(&name) {
463                        cert_cache.insert(cert.clone());
464                        anchors.insert(name, cert);
465                    }
466                }
467                Arc::new(Validator::with_chain(schema, cert_cache, anchors, None, 1))
468            } else {
469                // No manager — accept_all schema with empty cache.
470                Arc::new(Validator::new(schema))
471            };
472            (Some(validator), None)
473        }
474
475        SecurityProfile::Default => {
476            let Some(mgr) = security else {
477                // No SecurityManager configured — fall back to AcceptSigned.
478                // Every Data packet must carry a cryptographically valid signature
479                // (DigestSha256 or stronger), but no trust anchor or namespace
480                // hierarchy is enforced.
481                //
482                // Configure a [security] block with trust anchors for full
483                // hierarchical validation.
484                tracing::info!(
485                    "No SecurityManager configured; using AcceptSigned validation \
486                     (DigestSha256 or stronger required, hierarchy not enforced). \
487                     Configure a [security] block with trust anchors for full \
488                     hierarchical validation."
489                );
490                let validator = Arc::new(Validator::new(TrustSchema::accept_all()));
491                return (Some(validator), None);
492            };
493
494            let schema = TrustSchema::hierarchical();
495            let cert_cache = Arc::new(CertCache::new());
496            let anchors = Arc::new(dashmap::DashMap::new());
497
498            // Load trust anchors from the manager.
499            for name in mgr.trust_anchor_names() {
500                if let Some(cert) = mgr.trust_anchor(&name) {
501                    cert_cache.insert(cert.clone());
502                    anchors.insert(name, cert);
503                }
504            }
505
506            // Build a CertFetcher. The FetchFn is a no-op placeholder for now;
507            // the router wires a real one via AppFace after engine construction.
508            // Certs pre-loaded from trust anchors will satisfy most chain walks
509            // without fetching.
510            let fetcher = Arc::new(CertFetcher::new(
511                Arc::clone(&cert_cache),
512                Arc::new(|_name| Box::pin(async { None })),
513                Duration::from_secs(4),
514            ));
515
516            let validator = Arc::new(Validator::with_chain(
517                schema,
518                Arc::clone(&cert_cache),
519                anchors,
520                Some(Arc::clone(&fetcher)),
521                5,
522            ));
523
524            (Some(validator), Some(fetcher))
525        }
526    }
527}
528
529/// Resolve `pipeline_threads` config: 0 → auto-detect, otherwise clamp to ≥ 1.
530fn resolve_pipeline_threads(configured: usize) -> usize {
531    if configured == 0 {
532        std::thread::available_parallelism()
533            .map(|n| n.get())
534            .unwrap_or(1)
535    } else {
536        configured
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use std::time::Duration;
544
545    #[tokio::test]
546    async fn build_returns_usable_engine() {
547        let (engine, handle) = EngineBuilder::new(EngineConfig::default())
548            .build()
549            .await
550            .unwrap();
551        let _ = engine.fib();
552        let _ = engine.pit();
553        let _ = engine.faces();
554        let _ = engine.cs();
555        handle.shutdown().await;
556    }
557
558    #[tokio::test]
559    async fn engine_clone_shares_same_tables() {
560        let (engine, handle) = EngineBuilder::new(EngineConfig::default())
561            .build()
562            .await
563            .unwrap();
564        let clone = engine.clone();
565        assert!(Arc::ptr_eq(&engine.fib(), &clone.fib()));
566        handle.shutdown().await;
567    }
568
569    #[tokio::test]
570    async fn shutdown_completes_promptly() {
571        let (_engine, handle) = EngineBuilder::new(EngineConfig::default())
572            .build()
573            .await
574            .unwrap();
575        tokio::time::timeout(Duration::from_millis(500), handle.shutdown())
576            .await
577            .expect("shutdown did not complete within 500 ms");
578    }
579}