ndn_engine/dispatcher/
mod.rs

1mod inbound;
2mod outbound;
3mod pipeline;
4
5use std::sync::Arc;
6
7use bytes::Bytes;
8use tokio::sync::mpsc;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11
12use ndn_discovery::{DiscoveryProtocol, InboundMeta};
13use ndn_transport::{FaceId, FaceKind, FacePersistency, FaceTable};
14
15use crate::discovery_context::EngineDiscoveryContext;
16use crate::engine::{self, DEFAULT_SEND_QUEUE_CAP, FaceState};
17use crate::rib::Rib;
18
19use crate::stages::{
20    CsInsertStage, CsLookupStage, PitCheckStage, PitMatchStage, StrategyStage, TlvDecodeStage,
21    ValidationStage,
22};
23
24pub(crate) use inbound::run_face_reader;
25
26/// Shared context passed to face reader/sender tasks to avoid exceeding the
27/// function argument limit while keeping all fields explicit.
28pub(crate) struct FaceRunnerCtx {
29    pub(crate) face_id: FaceId,
30    pub(crate) cancel: CancellationToken,
31    pub(crate) face_table: Arc<FaceTable>,
32    pub(crate) fib: Arc<crate::Fib>,
33    pub(crate) rib: Arc<Rib>,
34    pub(crate) face_states: Arc<dashmap::DashMap<FaceId, FaceState>>,
35    pub(crate) discovery: Arc<dyn DiscoveryProtocol>,
36    pub(crate) discovery_ctx: Arc<EngineDiscoveryContext>,
37}
38
39/// A raw packet arriving from a face, bundled with the face it came from.
40pub(crate) struct InboundPacket {
41    pub(crate) raw: Bytes,
42    pub(crate) face_id: FaceId,
43    pub(crate) arrival: u64,
44    /// Link-layer source metadata (source IP:port for UDP, source MAC for Ethernet).
45    /// Used by discovery protocols to create unicast reply faces.
46    /// `None` when the injection path does not have source information.
47    pub(crate) meta: InboundMeta,
48}
49
50/// The packet dispatcher.
51///
52/// Spawns one Tokio task per face that reads packets from that face and sends
53/// them to a shared `mpsc` channel.  A single pipeline runner drains the
54/// channel, performs the fast-path fragment sieve, and spawns per-packet tasks
55/// for full pipeline processing across multiple cores.
56///
57/// The fragment sieve stays single-threaded (cheap DashMap entry, ~2 µs) while
58/// the expensive pipeline stages (decode, CS, PIT, strategy) run in parallel.
59/// All shared tables (PIT, FIB, CS, face table) are concurrent-safe, so
60/// parallel pipeline tasks are correct without additional synchronisation.
61pub struct PacketDispatcher {
62    pub face_table: Arc<FaceTable>,
63    pub face_states: Arc<dashmap::DashMap<FaceId, FaceState>>,
64    pub rib: Arc<Rib>,
65    pub decode: TlvDecodeStage,
66    pub cs_lookup: CsLookupStage,
67    pub pit_check: PitCheckStage,
68    pub strategy: StrategyStage,
69    pub pit_match: PitMatchStage,
70    pub validation: ValidationStage,
71    pub cs_insert: CsInsertStage,
72    pub channel_cap: usize,
73    /// Resolved pipeline thread count (always ≥ 1).
74    pub pipeline_threads: usize,
75    /// Active discovery protocol — receives `on_inbound` calls before packets
76    /// enter the NDN forwarding pipeline.
77    pub discovery: Arc<dyn DiscoveryProtocol>,
78    /// Engine discovery context — passed to protocol hooks.
79    pub discovery_ctx: Arc<EngineDiscoveryContext>,
80}
81
82impl PacketDispatcher {
83    /// Spawn face-reader tasks for all currently registered faces, plus the
84    /// pipeline runner.
85    ///
86    /// Returns the pipeline channel sender so the engine can spawn reader tasks
87    /// for faces added dynamically after `build()`.
88    pub(crate) fn spawn(
89        self,
90        cancel: CancellationToken,
91        tasks: &mut JoinSet<()>,
92    ) -> mpsc::Sender<InboundPacket> {
93        let (tx, rx) = mpsc::channel::<InboundPacket>(self.channel_cap);
94        let dispatcher = Arc::new(self);
95
96        // Spawn reader + sender tasks for each pre-registered face.
97        for face_id in dispatcher.face_table.face_ids() {
98            if let Some(face) = dispatcher.face_table.get(face_id) {
99                // Create FaceState with a send queue if not already present.
100                if !dispatcher.face_states.contains_key(&face_id) {
101                    let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
102                    let persistency = FacePersistency::Permanent;
103                    #[cfg(feature = "face-net")]
104                    let state = if face.kind() == FaceKind::Udp {
105                        FaceState::new_reliable(
106                            cancel.child_token(),
107                            persistency,
108                            send_tx,
109                            ndn_faces::net::DEFAULT_UDP_MTU,
110                        )
111                    } else {
112                        FaceState::new(cancel.child_token(), persistency, send_tx)
113                    };
114                    #[cfg(not(feature = "face-net"))]
115                    let state = FaceState::new(cancel.child_token(), persistency, send_tx);
116                    dispatcher.face_states.insert(face_id, state);
117                    // Spawn per-face send task.
118                    let send_face = Arc::clone(&face);
119                    let send_cancel = cancel.clone();
120                    let fs = Arc::clone(&dispatcher.face_states);
121                    let ft = Arc::clone(&dispatcher.face_table);
122                    let fib = Arc::clone(&dispatcher.strategy.fib);
123                    let rib = Arc::clone(&dispatcher.rib);
124                    tasks.spawn(engine::run_face_sender(
125                        send_face,
126                        send_rx,
127                        persistency,
128                        FaceRunnerCtx {
129                            face_id,
130                            cancel: send_cancel,
131                            face_table: ft,
132                            fib,
133                            rib,
134                            face_states: fs,
135                            discovery: Arc::clone(&dispatcher.discovery),
136                            discovery_ctx: Arc::clone(&dispatcher.discovery_ctx),
137                        },
138                    ));
139                }
140
141                let tx2 = tx.clone();
142                let pit = Arc::clone(&dispatcher.pit_check.pit);
143                let reader_ctx = FaceRunnerCtx {
144                    face_id,
145                    cancel: cancel.clone(),
146                    face_table: Arc::clone(&dispatcher.face_table),
147                    fib: Arc::clone(&dispatcher.strategy.fib),
148                    rib: Arc::clone(&dispatcher.rib),
149                    face_states: Arc::clone(&dispatcher.face_states),
150                    discovery: Arc::clone(&dispatcher.discovery),
151                    discovery_ctx: Arc::clone(&dispatcher.discovery_ctx),
152                };
153                tasks.spawn(async move {
154                    run_face_reader(face, tx2, pit, reader_ctx).await;
155                });
156            }
157        }
158
159        // Pipeline runner.
160        let d = Arc::clone(&dispatcher);
161        let cancel2 = cancel.clone();
162        tasks.spawn(async move {
163            d.run_pipeline(rx, cancel2).await;
164        });
165
166        // Validation pending queue drain task.
167        if dispatcher.validation.validator.is_some() {
168            let d = Arc::clone(&dispatcher);
169            let cancel3 = cancel.clone();
170            tasks.spawn(async move {
171                d.run_validation_drain(cancel3).await;
172            });
173        }
174
175        tx
176    }
177}