ndn_engine/
engine.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::{Arc, OnceLock};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use dashmap::DashMap;
6use ndn_discovery::{DiscoveryProtocol, NeighborTable};
7use tokio::sync::mpsc;
8use tokio::task::JoinSet;
9use tokio_util::sync::CancellationToken;
10
11use ndn_packet::Interest;
12use ndn_packet::lp::encode_lp_packet;
13use ndn_security::{SecurityManager, Validator};
14use ndn_store::{ErasedContentStore, Pit, PitToken, StrategyTable};
15use ndn_strategy::MeasurementsTable;
16use ndn_transport::{Face, FaceId, FacePersistency, FaceTable};
17
18use crate::discovery_context::EngineDiscoveryContext;
19
20use crate::stages::ErasedStrategy;
21
22use crate::Fib;
23use crate::dispatcher::InboundPacket;
24use crate::rib::Rib;
25use crate::routing::RoutingManager;
26
27/// Default outbound send queue capacity per face.
28///
29/// Must be large enough to absorb bursts from parallel pipeline tasks that
30/// all dispatch to the same face near-simultaneously.  When full, outbound
31/// packets are dropped (equivalent to a congestion drop at the output queue —
32/// consistent with NFD's `GenericLinkService` model).
33///
34/// With NDNLPv2 fragmentation, a single Data packet may expand to ~6
35/// fragments, each occupying one queue slot.  2048 slots ≈ ~340 Data
36/// packets — enough headroom for sustained bursts over high-throughput
37/// links without silent drops.
38pub const DEFAULT_SEND_QUEUE_CAP: usize = 2048;
39
40/// Per-face packet and byte counters.  All fields are `AtomicU64`, updated by
41/// the pipeline without holding any lock.
42#[derive(Default)]
43pub struct FaceCounters {
44    pub in_interests: AtomicU64,
45    pub in_data: AtomicU64,
46    pub out_interests: AtomicU64,
47    pub out_data: AtomicU64,
48    pub in_bytes: AtomicU64,
49    pub out_bytes: AtomicU64,
50}
51
52/// Per-face lifecycle state stored alongside the cancellation token.
53pub struct FaceState {
54    pub cancel: CancellationToken,
55    pub persistency: FacePersistency,
56    /// Last packet activity (nanoseconds since Unix epoch).
57    /// Updated on recv and send; used for idle-timeout of on-demand faces.
58    pub last_activity: AtomicU64,
59    /// Per-face traffic counters (incremented by pipeline stages).
60    pub counters: FaceCounters,
61    /// Outbound send queue.
62    ///
63    /// The pipeline pushes packets here via `try_send` (non-blocking) and a
64    /// dedicated per-face send task drains the queue, calling `face.send()`
65    /// sequentially.  This decouples pipeline processing from I/O, preserves
66    /// per-face ordering (critical for TCP framing), and provides bounded
67    /// backpressure.
68    pub send_tx: mpsc::Sender<bytes::Bytes>,
69    /// NDNLPv2 per-hop reliability state (unicast UDP faces only).
70    #[cfg(feature = "face-net")]
71    pub reliability: Option<std::sync::Mutex<ndn_faces::net::reliability::LpReliability>>,
72    /// NDNLPv2 link mode: auto-detected at the face I/O layer when the remote
73    /// peer sends LP-wrapped packets (type 0x64).  Once set, the face sender
74    /// wraps all outgoing bare NDN packets in a minimal LpPacket so that
75    /// NDNLPv2-native peers (e.g. NDNts) can parse them.
76    ///
77    /// This mirrors NFD's `GenericLinkService` behavior: LP encoding is a
78    /// per-link property determined by what the peer sends, not by face kind.
79    pub uses_lp: AtomicBool,
80}
81
82impl FaceState {
83    pub fn new(
84        cancel: CancellationToken,
85        persistency: FacePersistency,
86        send_tx: mpsc::Sender<bytes::Bytes>,
87    ) -> Self {
88        let now = SystemTime::now()
89            .duration_since(UNIX_EPOCH)
90            .unwrap_or_default()
91            .as_nanos() as u64;
92        Self {
93            cancel,
94            persistency,
95            last_activity: AtomicU64::new(now),
96            counters: FaceCounters::default(),
97            send_tx,
98            #[cfg(feature = "face-net")]
99            reliability: None,
100            uses_lp: AtomicBool::new(false),
101        }
102    }
103
104    /// Create a FaceState with NDNLPv2 reliability enabled.
105    #[cfg(feature = "face-net")]
106    pub fn new_reliable(
107        cancel: CancellationToken,
108        persistency: FacePersistency,
109        send_tx: mpsc::Sender<bytes::Bytes>,
110        mtu: usize,
111    ) -> Self {
112        let now = SystemTime::now()
113            .duration_since(UNIX_EPOCH)
114            .unwrap_or_default()
115            .as_nanos() as u64;
116        Self {
117            cancel,
118            persistency,
119            last_activity: AtomicU64::new(now),
120            counters: FaceCounters::default(),
121            send_tx,
122            reliability: Some(std::sync::Mutex::new(
123                ndn_faces::net::reliability::LpReliability::new(mtu),
124            )),
125            uses_lp: AtomicBool::new(false),
126        }
127    }
128
129    pub fn touch(&self) {
130        let now = SystemTime::now()
131            .duration_since(UNIX_EPOCH)
132            .unwrap_or_default()
133            .as_nanos() as u64;
134        self.last_activity.store(now, Ordering::Relaxed);
135    }
136}
137
138/// Shared tables owned by the engine, accessible to all tasks via `Arc`.
139pub struct EngineInner {
140    pub fib: Arc<Fib>,
141    pub rib: Arc<Rib>,
142    pub routing: Arc<RoutingManager>,
143    pub pit: Arc<Pit>,
144    pub cs: Arc<dyn ErasedContentStore>,
145    pub face_table: Arc<FaceTable>,
146    pub measurements: Arc<MeasurementsTable>,
147    pub strategy_table: Arc<StrategyTable<dyn ErasedStrategy>>,
148    /// Security manager for signing/verification (optional — `None` disables
149    /// security policy enforcement).
150    pub security: Option<Arc<SecurityManager>>,
151    /// Active validator — shared with `ValidationStage` and the management API.
152    ///
153    /// The schema inside the validator is behind a `RwLock`, allowing runtime
154    /// modification via `/localhost/nfd/security/schema-*` commands.
155    pub validator: Option<Arc<Validator>>,
156    /// Pipeline inbound channel — used to spawn readers for dynamically-added
157    /// faces (those registered after `build()` completes).
158    ///
159    /// Stored in `OnceLock` because the sender is obtained from
160    /// `PacketDispatcher::spawn()` which runs after `Arc<EngineInner>` is
161    /// created (needed for the discovery context back-reference).
162    pub(crate) pipeline_tx: OnceLock<mpsc::Sender<InboundPacket>>,
163    /// Per-face state: cancellation token, persistency level, and last activity.
164    pub(crate) face_states: Arc<DashMap<FaceId, FaceState>>,
165    /// Active discovery protocol (default: `NoDiscovery`).
166    pub discovery: Arc<dyn DiscoveryProtocol>,
167    /// Engine-owned neighbor table shared with discovery protocols.
168    pub neighbors: Arc<NeighborTable>,
169    /// Discovery context.  Set once after `Arc<EngineInner>` is created to
170    /// break the reference cycle (EngineInner → Arc<ctx> → Weak<EngineInner>).
171    pub(crate) discovery_ctx: OnceLock<Arc<EngineDiscoveryContext>>,
172}
173
174/// Handle to a running forwarding engine.
175///
176/// Cloning the handle gives another reference to the same running engine.
177#[derive(Clone)]
178pub struct ForwarderEngine {
179    pub(crate) inner: Arc<EngineInner>,
180}
181
182impl ForwarderEngine {
183    pub fn fib(&self) -> Arc<Fib> {
184        Arc::clone(&self.inner.fib)
185    }
186
187    pub fn rib(&self) -> Arc<Rib> {
188        Arc::clone(&self.inner.rib)
189    }
190
191    pub fn routing(&self) -> Arc<RoutingManager> {
192        Arc::clone(&self.inner.routing)
193    }
194
195    pub fn faces(&self) -> Arc<FaceTable> {
196        Arc::clone(&self.inner.face_table)
197    }
198
199    pub fn pit(&self) -> Arc<Pit> {
200        Arc::clone(&self.inner.pit)
201    }
202
203    pub fn cs(&self) -> Arc<dyn ErasedContentStore> {
204        Arc::clone(&self.inner.cs)
205    }
206
207    pub fn security(&self) -> Option<Arc<SecurityManager>> {
208        self.inner.security.as_ref().map(Arc::clone)
209    }
210
211    /// The active validator, if any.
212    ///
213    /// The returned `Arc<Validator>` is the same instance used by the pipeline.
214    /// Its trust schema can be modified at runtime via
215    /// [`Validator::add_schema_rule`], [`Validator::remove_schema_rule`], and
216    /// [`Validator::set_schema`].
217    pub fn validator(&self) -> Option<Arc<Validator>> {
218        self.inner.validator.as_ref().map(Arc::clone)
219    }
220
221    pub fn strategy_table(&self) -> Arc<StrategyTable<dyn ErasedStrategy>> {
222        Arc::clone(&self.inner.strategy_table)
223    }
224
225    pub fn neighbors(&self) -> Arc<NeighborTable> {
226        Arc::clone(&self.inner.neighbors)
227    }
228
229    pub fn measurements(&self) -> Arc<MeasurementsTable> {
230        Arc::clone(&self.inner.measurements)
231    }
232
233    pub fn discovery(&self) -> Arc<dyn DiscoveryProtocol> {
234        Arc::clone(&self.inner.discovery)
235    }
236
237    /// The discovery context for this engine.
238    ///
239    /// Panics if called before `build()` completes (OnceLock not yet set).
240    pub fn discovery_ctx(&self) -> Arc<EngineDiscoveryContext> {
241        self.inner
242            .discovery_ctx
243            .get()
244            .expect("discovery_ctx not initialized")
245            .clone()
246    }
247
248    /// Look up the source face that originally sent an Interest.
249    pub fn source_face_id(&self, interest: &Interest) -> Option<FaceId> {
250        let token = PitToken::from_interest_full(
251            &interest.name,
252            Some(interest.selectors()),
253            interest.forwarding_hint(),
254        );
255        self.inner
256            .pit
257            .with_entry(&token, |entry| {
258                entry.in_records.first().map(|r| FaceId(r.face_id))
259            })
260            .flatten()
261    }
262
263    /// Register a face and immediately start its packet-reader task.
264    ///
265    /// Persistence defaults to `OnDemand`. Use `add_face_with_persistency` for
266    /// management-created or permanent faces.
267    pub fn add_face<F: Face + 'static>(&self, face: F, cancel: CancellationToken) {
268        self.add_face_with_persistency(face, cancel, FacePersistency::OnDemand);
269    }
270
271    /// Register a face with an explicit persistence level.
272    ///
273    /// Spawns both a recv-reader task (pushes inbound packets to the pipeline
274    /// channel) and a send-writer task (drains the per-face outbound queue
275    /// and calls `face.send()`).
276    pub fn add_face_with_persistency<F: Face + 'static>(
277        &self,
278        face: F,
279        cancel: CancellationToken,
280        persistency: FacePersistency,
281    ) {
282        let face_id = face.id();
283        let kind = face.kind();
284        let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
285        #[cfg(feature = "face-net")]
286        let state = if kind == ndn_transport::FaceKind::Udp {
287            FaceState::new_reliable(
288                cancel.clone(),
289                persistency,
290                send_tx,
291                ndn_faces::net::DEFAULT_UDP_MTU,
292            )
293        } else {
294            FaceState::new(cancel.clone(), persistency, send_tx)
295        };
296        #[cfg(not(feature = "face-net"))]
297        let state = FaceState::new(cancel.clone(), persistency, send_tx);
298        self.inner.face_states.insert(face_id, state);
299        self.inner.face_table.insert(face);
300        let erased = self
301            .inner
302            .face_table
303            .get(face_id)
304            .expect("face was just inserted");
305
306        let discovery = Arc::clone(&self.inner.discovery);
307        let discovery_ctx = self.discovery_ctx();
308
309        // Spawn the outbound send task.
310        tokio::spawn(run_face_sender(
311            Arc::clone(&erased),
312            send_rx,
313            persistency,
314            crate::dispatcher::FaceRunnerCtx {
315                face_id,
316                cancel: cancel.clone(),
317                face_table: Arc::clone(&self.inner.face_table),
318                fib: Arc::clone(&self.inner.fib),
319                rib: Arc::clone(&self.inner.rib),
320                face_states: Arc::clone(&self.inner.face_states),
321                discovery: Arc::clone(&discovery),
322                discovery_ctx: Arc::clone(&discovery_ctx),
323            },
324        ));
325
326        // Spawn the inbound recv task.
327        tokio::spawn(crate::dispatcher::run_face_reader(
328            erased,
329            self.inner
330                .pipeline_tx
331                .get()
332                .expect("pipeline_tx initialized")
333                .clone(),
334            Arc::clone(&self.inner.pit),
335            crate::dispatcher::FaceRunnerCtx {
336                face_id,
337                cancel,
338                face_table: Arc::clone(&self.inner.face_table),
339                fib: Arc::clone(&self.inner.fib),
340                rib: Arc::clone(&self.inner.rib),
341                face_states: Arc::clone(&self.inner.face_states),
342                discovery: Arc::clone(&discovery),
343                discovery_ctx,
344            },
345        ));
346
347        // Notify discovery that a new face is up.
348        let ctx = self.discovery_ctx();
349        discovery.on_face_up(face_id, &*ctx);
350    }
351
352    /// Register a send-only face (no recv loop spawned).
353    ///
354    /// Use this for faces created by a listener that handles inbound packets
355    /// itself via `inject_packet`.  The face is added to the face table so
356    /// the dispatcher can send Data/Nack to it, but no `run_face_reader`
357    /// task is spawned.  A send-writer task is spawned to drain the outbound
358    /// queue.
359    pub fn add_face_send_only<F: Face + 'static>(&self, face: F, cancel: CancellationToken) {
360        let face_id = face.id();
361        let kind = face.kind();
362        let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
363        #[cfg(feature = "face-net")]
364        let state = if kind == ndn_transport::FaceKind::Udp {
365            FaceState::new_reliable(
366                cancel.clone(),
367                FacePersistency::OnDemand,
368                send_tx,
369                ndn_faces::net::DEFAULT_UDP_MTU,
370            )
371        } else {
372            FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx)
373        };
374        #[cfg(not(feature = "face-net"))]
375        let state = FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx);
376        self.inner.face_states.insert(face_id, state);
377        self.inner.face_table.insert(face);
378
379        let erased = self
380            .inner
381            .face_table
382            .get(face_id)
383            .expect("face was just inserted");
384        let discovery = Arc::clone(&self.inner.discovery);
385        let discovery_ctx = self.discovery_ctx();
386        tokio::spawn(run_face_sender(
387            erased,
388            send_rx,
389            FacePersistency::OnDemand,
390            crate::dispatcher::FaceRunnerCtx {
391                face_id,
392                cancel,
393                face_table: Arc::clone(&self.inner.face_table),
394                fib: Arc::clone(&self.inner.fib),
395                rib: Arc::clone(&self.inner.rib),
396                face_states: Arc::clone(&self.inner.face_states),
397                discovery: Arc::clone(&discovery),
398                discovery_ctx: Arc::clone(&discovery_ctx),
399            },
400        ));
401
402        // Notify discovery (send-only faces are still reachable peers).
403        discovery.on_face_up(face_id, &*discovery_ctx);
404    }
405
406    /// Inject a raw packet into the pipeline as if it arrived from `face_id`.
407    ///
408    /// Processes the reliability layer (Ack extraction / piggybacked Ack
409    /// processing) before enqueuing, matching the same path as `run_face_reader`.
410    /// `meta` carries the link-layer source address when available (use
411    /// `InboundMeta::udp(src)` for UDP listeners, `InboundMeta::none()` when
412    /// the source is implicit in the face).
413    ///
414    /// `discovery.on_inbound()` is called later inside `process_packet`, after
415    /// LP-unwrap and fragment reassembly, at the single call site in the pipeline.
416    ///
417    /// Returns `Err(())` if the pipeline channel is closed.
418    pub async fn inject_packet(
419        &self,
420        raw: bytes::Bytes,
421        face_id: FaceId,
422        arrival: u64,
423        meta: ndn_discovery::InboundMeta,
424    ) -> Result<(), ()> {
425        // Feed inbound packet to the reliability layer (same as run_face_reader).
426        // This extracts TxSeq for Ack and processes piggybacked Acks from the
427        // remote end.  Only applies when the face has reliability enabled.
428        if let Some(states) = self.inner.face_states.get(&face_id)
429            && let Some(rel) = states.reliability.as_ref()
430        {
431            rel.lock().unwrap().on_receive(&raw);
432        }
433
434        let tx = match self.inner.pipeline_tx.get() {
435            Some(tx) => tx,
436            None => return Err(()),
437        };
438        tx.send(InboundPacket {
439            raw,
440            face_id,
441            arrival,
442            meta,
443        })
444        .await
445        .map_err(|_| ())
446    }
447
448    /// Get the cancellation token for a face, if one exists.
449    pub fn face_token(&self, face_id: FaceId) -> Option<CancellationToken> {
450        self.inner
451            .face_states
452            .get(&face_id)
453            .map(|r| r.cancel.clone())
454    }
455
456    /// Access the face states map (for idle timeout sweeps).
457    pub fn face_states(&self) -> Arc<DashMap<FaceId, FaceState>> {
458        Arc::clone(&self.inner.face_states)
459    }
460}
461
462/// Handle to gracefully shut down the engine.
463pub struct ShutdownHandle {
464    pub(crate) cancel: CancellationToken,
465    pub(crate) tasks: JoinSet<()>,
466}
467
468impl ShutdownHandle {
469    /// Get a clone of the cancellation token for this engine.
470    pub fn cancel_token(&self) -> CancellationToken {
471        self.cancel.clone()
472    }
473
474    /// Cancel all engine tasks and wait for them to finish.
475    pub async fn shutdown(mut self) {
476        self.cancel.cancel();
477        while let Some(result) = self.tasks.join_next().await {
478            if let Err(e) = result {
479                tracing::warn!("engine task panicked during shutdown: {e}");
480            }
481        }
482    }
483}
484
485/// Per-face outbound send task.
486///
487/// Drains the face's outbound channel and calls `face.send_bytes()` for each
488/// packet, preserving per-face ordering (critical for TCP TLV framing).
489///
490/// For reliability-enabled faces (unicast UDP), outgoing packets are processed
491/// through `LpReliability::on_send()` which fragments, assigns TxSequences,
492/// piggybacks Acks, and buffers for retransmit. A 50ms tick drives the
493/// retransmit timer and flushes pending Acks.
494///
495/// On send error:
496/// - **Permanent**: log and continue (the face retries on the next packet).
497/// - **Persistent/OnDemand**: stop the send loop.
498///
499/// On cancellation or channel close: exits cleanly.
500pub(crate) async fn run_face_sender(
501    face: Arc<dyn ndn_transport::ErasedFace>,
502    mut rx: mpsc::Receiver<bytes::Bytes>,
503    persistency: FacePersistency,
504    ctx: crate::dispatcher::FaceRunnerCtx,
505) {
506    let crate::dispatcher::FaceRunnerCtx {
507        face_id,
508        cancel,
509        face_table,
510        fib,
511        rib,
512        face_states,
513        discovery,
514        discovery_ctx,
515    } = ctx;
516    // Check if reliability is enabled by looking at the face state.
517    let has_reliability = face_states
518        .get(&face_id)
519        .map(|s| s.reliability.is_some())
520        .unwrap_or(false);
521
522    let mut retx_tick = tokio::time::interval(std::time::Duration::from_millis(50));
523    retx_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
524
525    // Helper closure for send errors.
526    let handle_send_error = |e: ndn_transport::FaceError| -> bool {
527        match persistency {
528            FacePersistency::Permanent => {
529                tracing::warn!(face=%face_id, error=%e, "send error on permanent face, continuing");
530                false // don't break
531            }
532            _ => {
533                tracing::warn!(face=%face_id, error=%e, "send error, closing face");
534                if persistency == FacePersistency::OnDemand {
535                    discovery.on_face_down(face_id, &*discovery_ctx);
536                    if let Some((_, state)) = face_states.remove(&face_id) {
537                        state.cancel.cancel();
538                    }
539                    rib.handle_face_down(face_id, &fib);
540                    fib.remove_face(face_id);
541                    face_table.remove(face_id);
542                }
543                true // break
544            }
545        }
546    };
547
548    loop {
549        tokio::select! {
550            _ = cancel.cancelled() => break,
551            pkt = rx.recv() => {
552                let pkt = match pkt {
553                    Some(p) => p,
554                    None => break,
555                };
556
557                if has_reliability {
558                    // Reliability-enabled path: fragment + assign TxSeq + piggyback Acks.
559                    let wires = {
560                        let state = face_states.get(&face_id);
561                        match state.as_ref().and_then(|s| s.reliability.as_ref()) {
562                            Some(rel) => rel.lock().unwrap().on_send(&pkt),
563                            None => vec![pkt],
564                        }
565                    };
566                    for wire in wires {
567                        if let Err(e) = face.send_bytes(wire).await
568                            && handle_send_error(e)
569                        {
570                            return;
571                        }
572                    }
573                } else {
574                    // Non-reliability path.
575                    // If the peer uses NDNLPv2 (auto-detected by run_face_reader when
576                    // the first LP-wrapped packet arrives), wrap outgoing packets in a
577                    // minimal LpPacket so the peer can parse them.  encode_lp_packet is
578                    // idempotent: LP-already-wrapped Nacks pass through unchanged.
579                    let wire = if face_states
580                        .get(&face_id)
581                        .map(|s| s.uses_lp.load(Ordering::Relaxed))
582                        .unwrap_or(false)
583                    {
584                        encode_lp_packet(&pkt)
585                    } else {
586                        pkt
587                    };
588                    if let Err(e) = face.send_bytes(wire).await
589                        && handle_send_error(e)
590                    {
591                        return;
592                    }
593                }
594            },
595            _ = retx_tick.tick(), if has_reliability => {
596                let (retx, ack_pkt) = {
597                    let state = face_states.get(&face_id);
598                    match state.as_ref().and_then(|s| s.reliability.as_ref()) {
599                        Some(rel) => {
600                            let mut rel = rel.lock().unwrap();
601                            let retx = rel.check_retransmit();
602                            let ack_pkt = rel.flush_acks();
603                            (retx, ack_pkt)
604                        }
605                        None => (vec![], None),
606                    }
607                };
608                for wire in retx {
609                    if let Err(e) = face.send_bytes(wire).await
610                        && handle_send_error(e)
611                    {
612                        return;
613                    }
614                }
615                if let Some(wire) = ack_pkt {
616                    let _ = face.send_bytes(wire).await;
617                }
618            }
619        }
620    }
621}