1use std::sync::{Arc, OnceLock};
2
3use anyhow::Result;
4use ndn_discovery::{DiscoveryProtocol, NeighborTable, NoDiscovery};
5use tokio::task::JoinSet;
6use tokio_util::sync::CancellationToken;
7
8use ndn_packet::Name;
9use ndn_security::{
10 CertCache, CertFetcher, SchemaRule, SecurityManager, SecurityProfile, TrustSchema, Validator,
11};
12use ndn_store::{
13 CsAdmissionPolicy, CsObserver, ErasedContentStore, LruCs, ObservableCs, Pit, StrategyTable,
14};
15use ndn_strategy::{BestRouteStrategy, MeasurementsTable};
16use ndn_transport::{Face, FaceTable};
17
18use crate::{
19 Fib, ForwarderEngine,
20 discovery_context::EngineDiscoveryContext,
21 dispatcher::PacketDispatcher,
22 engine::{EngineInner, ShutdownHandle},
23 enricher::ContextEnricher,
24 rib::Rib,
25 routing::{RoutingManager, RoutingProtocol},
26 stages::{
27 CsInsertStage, CsLookupStage, ErasedStrategy, PitCheckStage, PitMatchStage, StrategyStage,
28 TlvDecodeStage, ValidationStage,
29 },
30};
31
32pub struct EngineConfig {
34 pub pipeline_channel_cap: usize,
36 pub cs_capacity_bytes: usize,
38 pub pipeline_threads: usize,
47}
48
49impl Default for EngineConfig {
50 fn default() -> Self {
51 Self {
52 pipeline_channel_cap: 4096,
53 cs_capacity_bytes: 64 * 1024 * 1024, pipeline_threads: 0,
55 }
56 }
57}
58
59pub struct EngineBuilder {
61 config: EngineConfig,
62 face_table: Arc<FaceTable>,
63 faces: Vec<Box<dyn FnOnce(Arc<FaceTable>) + Send>>,
64 strategy: Option<Arc<dyn ErasedStrategy>>,
65 security: Option<Arc<SecurityManager>>,
66 enrichers: Vec<Arc<dyn ContextEnricher>>,
67 cs: Option<Arc<dyn ErasedContentStore>>,
68 admission: Option<Arc<dyn CsAdmissionPolicy>>,
69 cs_observer: Option<Arc<dyn CsObserver>>,
70 security_profile: SecurityProfile,
71 discovery: Option<Arc<dyn DiscoveryProtocol>>,
72 routing_protocols: Vec<Arc<dyn RoutingProtocol>>,
73 schema_rules: Vec<SchemaRule>,
76}
77
78impl EngineBuilder {
79 pub fn new(config: EngineConfig) -> Self {
80 Self {
81 config,
82 face_table: Arc::new(FaceTable::new()),
83 faces: Vec::new(),
84 strategy: None,
85 security: None,
86 enrichers: Vec::new(),
87 cs: None,
88 admission: None,
89 cs_observer: None,
90 security_profile: SecurityProfile::Default,
91 discovery: None,
92 routing_protocols: Vec::new(),
93 schema_rules: Vec::new(),
94 }
95 }
96
97 pub fn alloc_face_id(&self) -> ndn_transport::FaceId {
105 self.face_table.alloc_id()
106 }
107
108 pub fn face<F: Face>(mut self, face: F) -> Self {
110 self.faces.push(Box::new(move |table| {
111 table.insert(face);
112 }));
113 self
114 }
115
116 pub fn strategy<S: ErasedStrategy>(mut self, s: S) -> Self {
118 self.strategy = Some(Arc::new(s));
119 self
120 }
121
122 pub fn security(mut self, mgr: SecurityManager) -> Self {
127 self.security = Some(Arc::new(mgr));
128 self
129 }
130
131 pub fn content_store(mut self, cs: Arc<dyn ErasedContentStore>) -> Self {
133 self.cs = Some(cs);
134 self
135 }
136
137 pub fn admission_policy(mut self, policy: Arc<dyn CsAdmissionPolicy>) -> Self {
139 self.admission = Some(policy);
140 self
141 }
142
143 pub fn cs_observer(mut self, obs: Arc<dyn CsObserver>) -> Self {
148 self.cs_observer = Some(obs);
149 self
150 }
151
152 pub fn security_profile(mut self, p: SecurityProfile) -> Self {
159 self.security_profile = p;
160 self
161 }
162
163 pub fn schema_rule(mut self, rule: SchemaRule) -> Self {
177 self.schema_rules.push(rule);
178 self
179 }
180
181 pub fn validator(mut self, v: Arc<Validator>) -> Self {
183 self.security_profile = SecurityProfile::Custom(v);
184 self
185 }
186
187 pub fn discovery<D: DiscoveryProtocol>(mut self, d: D) -> Self {
193 self.discovery = Some(Arc::new(d));
194 self
195 }
196
197 pub fn discovery_arc(mut self, d: Arc<dyn DiscoveryProtocol>) -> Self {
199 self.discovery = Some(d);
200 self
201 }
202
203 pub fn routing_protocol<P: RoutingProtocol>(mut self, proto: P) -> Self {
210 self.routing_protocols.push(Arc::new(proto));
211 self
212 }
213
214 pub fn context_enricher(mut self, e: Arc<dyn ContextEnricher>) -> Self {
220 self.enrichers.push(e);
221 self
222 }
223
224 pub async fn build(self) -> Result<(ForwarderEngine, ShutdownHandle)> {
226 let fib = Arc::new(Fib::new());
227 let rib = Arc::new(Rib::new());
228 let pit = Arc::new(Pit::new());
229 let base_cs: Arc<dyn ErasedContentStore> = self
230 .cs
231 .unwrap_or_else(|| Arc::new(LruCs::new(self.config.cs_capacity_bytes)));
232 let cs: Arc<dyn ErasedContentStore> = if let Some(obs) = self.cs_observer {
233 Arc::new(ObservableCs::new(base_cs, Some(obs)))
234 } else {
235 base_cs
236 };
237 let face_table = self.face_table;
238 let measurements = Arc::new(MeasurementsTable::new());
239
240 for add_face in self.faces {
242 add_face(Arc::clone(&face_table));
243 }
244
245 let cancel = CancellationToken::new();
246 let mut tasks = JoinSet::new();
247
248 {
250 let pit_clone = Arc::clone(&pit);
251 let cancel_clone = cancel.clone();
252 tasks.spawn(async move {
253 crate::expiry::run_expiry_task(pit_clone, cancel_clone).await;
254 });
255 }
256
257 {
259 let rib_clone = Arc::clone(&rib);
260 let fib_clone = Arc::clone(&fib);
261 let cancel_clone = cancel.clone();
262 tasks.spawn(async move {
263 crate::expiry::run_rib_expiry_task(rib_clone, fib_clone, cancel_clone).await;
264 });
265 }
266
267 let default_strategy: Arc<dyn ErasedStrategy> = self
269 .strategy
270 .unwrap_or_else(|| Arc::new(BestRouteStrategy::new()));
271 let strategy_table = Arc::new(StrategyTable::<dyn ErasedStrategy>::new());
272 strategy_table.insert(&Name::root(), Arc::clone(&default_strategy));
273
274 let face_states = Arc::new(dashmap::DashMap::new());
275
276 let (validator, cert_fetcher) =
278 resolve_security_profile(self.security_profile, &self.security);
279
280 if let Some(v) = &validator {
282 for rule in self.schema_rules {
283 v.add_schema_rule(rule);
284 }
285 }
286
287 let engine_validator = validator.clone();
289
290 let discovery: Arc<dyn DiscoveryProtocol> =
292 self.discovery.unwrap_or_else(|| Arc::new(NoDiscovery));
293 let neighbors = NeighborTable::new();
294
295 let routing = Arc::new(RoutingManager::new(
297 Arc::clone(&rib),
298 Arc::clone(&fib),
299 Arc::clone(&face_table),
300 Arc::clone(&neighbors),
301 cancel.clone(),
302 ));
303
304 let inner = Arc::new(EngineInner {
309 fib: Arc::clone(&fib),
310 rib: Arc::clone(&rib),
311 routing: Arc::clone(&routing),
312 pit: Arc::clone(&pit),
313 cs: Arc::clone(&cs),
314 face_table: Arc::clone(&face_table),
315 measurements: Arc::clone(&measurements),
316 strategy_table: Arc::clone(&strategy_table),
317 security: self.security,
318 validator: engine_validator,
319 pipeline_tx: OnceLock::new(),
320 face_states: Arc::clone(&face_states),
321 discovery: Arc::clone(&discovery),
322 neighbors: Arc::clone(&neighbors),
323 discovery_ctx: OnceLock::new(),
324 });
325
326 let discovery_ctx = EngineDiscoveryContext::new(
329 Arc::downgrade(&inner),
330 Arc::clone(&neighbors),
331 cancel.child_token(),
332 );
333 let _ = inner.discovery_ctx.set(Arc::clone(&discovery_ctx));
334
335 let dispatcher = PacketDispatcher {
336 face_table: Arc::clone(&face_table),
337 face_states: Arc::clone(&face_states),
338 rib: Arc::clone(&rib),
339 decode: TlvDecodeStage {
340 face_table: Arc::clone(&face_table),
341 reassembly: dashmap::DashMap::new(),
342 },
343 cs_lookup: CsLookupStage {
344 cs: Arc::clone(&cs),
345 },
346 pit_check: PitCheckStage {
347 pit: Arc::clone(&pit),
348 },
349 strategy: StrategyStage {
350 strategy_table: Arc::clone(&strategy_table),
351 default_strategy: Arc::clone(&default_strategy),
352 fib: Arc::clone(&fib),
353 measurements: Arc::clone(&measurements),
354 pit: Arc::clone(&pit),
355 face_table: Arc::clone(&face_table),
356 enrichers: self.enrichers,
357 },
358 pit_match: PitMatchStage {
359 pit: Arc::clone(&pit),
360 },
361 validation: ValidationStage::new(
362 validator,
363 cert_fetcher,
364 crate::stages::validation::PendingQueueConfig::default(),
365 ),
366 cs_insert: CsInsertStage {
367 cs: Arc::clone(&cs),
368 admission: self
369 .admission
370 .unwrap_or_else(|| Arc::new(ndn_store::DefaultAdmissionPolicy)),
371 },
372 channel_cap: self.config.pipeline_channel_cap,
373 pipeline_threads: resolve_pipeline_threads(self.config.pipeline_threads),
374 discovery: Arc::clone(&discovery),
375 discovery_ctx: Arc::clone(&discovery_ctx),
376 };
377
378 let pipeline_tx = dispatcher.spawn(cancel.clone(), &mut tasks);
379
380 let _ = inner.pipeline_tx.set(pipeline_tx);
382
383 {
385 let face_states_clone = Arc::clone(&face_states);
386 let face_table_clone = Arc::clone(&face_table);
387 let fib_clone = Arc::clone(&fib);
388 let rib_clone = Arc::clone(&rib);
389 let cancel_clone = cancel.clone();
390 let d = Arc::clone(&discovery);
391 let ctx = Arc::clone(&discovery_ctx);
392 tasks.spawn(async move {
393 crate::expiry::run_idle_face_task(
394 face_states_clone,
395 face_table_clone,
396 fib_clone,
397 rib_clone,
398 cancel_clone,
399 d,
400 ctx,
401 )
402 .await;
403 });
404 }
405
406 {
408 let d = Arc::clone(&discovery);
409 let ctx = Arc::clone(&discovery_ctx);
410 let cancel_clone = cancel.clone();
411 let tick_dur = discovery.tick_interval();
412 tasks.spawn(async move {
413 let mut interval = tokio::time::interval(tick_dur);
414 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
415 loop {
416 tokio::select! {
417 _ = cancel_clone.cancelled() => break,
418 _ = interval.tick() => {
419 d.on_tick(std::time::Instant::now(), &*ctx);
420 }
421 }
422 }
423 });
424 }
425
426 for face_id in face_table.face_ids() {
428 discovery.on_face_up(face_id, &*discovery_ctx);
429 }
430
431 for proto in self.routing_protocols {
433 routing.enable(proto);
434 }
435
436 let engine = ForwarderEngine { inner };
437 let handle = ShutdownHandle { cancel, tasks };
438 Ok((engine, handle))
439 }
440}
441
442fn resolve_security_profile(
444 profile: SecurityProfile,
445 security: &Option<Arc<SecurityManager>>,
446) -> (Option<Arc<Validator>>, Option<Arc<CertFetcher>>) {
447 use std::time::Duration;
448
449 match profile {
450 SecurityProfile::Disabled => (None, None),
451
452 SecurityProfile::Custom(v) => (Some(v), None),
453
454 SecurityProfile::AcceptSigned => {
455 let schema = TrustSchema::accept_all();
456 let validator = if let Some(mgr) = security {
457 let cert_cache = Arc::new(CertCache::new());
458 let anchors = Arc::new(dashmap::DashMap::new());
461 for name in mgr.trust_anchor_names() {
462 if let Some(cert) = mgr.trust_anchor(&name) {
463 cert_cache.insert(cert.clone());
464 anchors.insert(name, cert);
465 }
466 }
467 Arc::new(Validator::with_chain(schema, cert_cache, anchors, None, 1))
468 } else {
469 Arc::new(Validator::new(schema))
471 };
472 (Some(validator), None)
473 }
474
475 SecurityProfile::Default => {
476 let Some(mgr) = security else {
477 tracing::info!(
485 "No SecurityManager configured; using AcceptSigned validation \
486 (DigestSha256 or stronger required, hierarchy not enforced). \
487 Configure a [security] block with trust anchors for full \
488 hierarchical validation."
489 );
490 let validator = Arc::new(Validator::new(TrustSchema::accept_all()));
491 return (Some(validator), None);
492 };
493
494 let schema = TrustSchema::hierarchical();
495 let cert_cache = Arc::new(CertCache::new());
496 let anchors = Arc::new(dashmap::DashMap::new());
497
498 for name in mgr.trust_anchor_names() {
500 if let Some(cert) = mgr.trust_anchor(&name) {
501 cert_cache.insert(cert.clone());
502 anchors.insert(name, cert);
503 }
504 }
505
506 let fetcher = Arc::new(CertFetcher::new(
511 Arc::clone(&cert_cache),
512 Arc::new(|_name| Box::pin(async { None })),
513 Duration::from_secs(4),
514 ));
515
516 let validator = Arc::new(Validator::with_chain(
517 schema,
518 Arc::clone(&cert_cache),
519 anchors,
520 Some(Arc::clone(&fetcher)),
521 5,
522 ));
523
524 (Some(validator), Some(fetcher))
525 }
526 }
527}
528
529fn resolve_pipeline_threads(configured: usize) -> usize {
531 if configured == 0 {
532 std::thread::available_parallelism()
533 .map(|n| n.get())
534 .unwrap_or(1)
535 } else {
536 configured
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use std::time::Duration;
544
545 #[tokio::test]
546 async fn build_returns_usable_engine() {
547 let (engine, handle) = EngineBuilder::new(EngineConfig::default())
548 .build()
549 .await
550 .unwrap();
551 let _ = engine.fib();
552 let _ = engine.pit();
553 let _ = engine.faces();
554 let _ = engine.cs();
555 handle.shutdown().await;
556 }
557
558 #[tokio::test]
559 async fn engine_clone_shares_same_tables() {
560 let (engine, handle) = EngineBuilder::new(EngineConfig::default())
561 .build()
562 .await
563 .unwrap();
564 let clone = engine.clone();
565 assert!(Arc::ptr_eq(&engine.fib(), &clone.fib()));
566 handle.shutdown().await;
567 }
568
569 #[tokio::test]
570 async fn shutdown_completes_promptly() {
571 let (_engine, handle) = EngineBuilder::new(EngineConfig::default())
572 .build()
573 .await
574 .unwrap();
575 tokio::time::timeout(Duration::from_millis(500), handle.shutdown())
576 .await
577 .expect("shutdown did not complete within 500 ms");
578 }
579}