ndn_engine/dispatcher/
mod.rs1mod inbound;
2mod outbound;
3mod pipeline;
4
5use std::sync::Arc;
6
7use bytes::Bytes;
8use tokio::sync::mpsc;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11
12use ndn_discovery::{DiscoveryProtocol, InboundMeta};
13use ndn_transport::{FaceId, FaceKind, FacePersistency, FaceTable};
14
15use crate::discovery_context::EngineDiscoveryContext;
16use crate::engine::{self, DEFAULT_SEND_QUEUE_CAP, FaceState};
17use crate::rib::Rib;
18
19use crate::stages::{
20 CsInsertStage, CsLookupStage, PitCheckStage, PitMatchStage, StrategyStage, TlvDecodeStage,
21 ValidationStage,
22};
23
24pub(crate) use inbound::run_face_reader;
25
26pub(crate) struct FaceRunnerCtx {
29 pub(crate) face_id: FaceId,
30 pub(crate) cancel: CancellationToken,
31 pub(crate) face_table: Arc<FaceTable>,
32 pub(crate) fib: Arc<crate::Fib>,
33 pub(crate) rib: Arc<Rib>,
34 pub(crate) face_states: Arc<dashmap::DashMap<FaceId, FaceState>>,
35 pub(crate) discovery: Arc<dyn DiscoveryProtocol>,
36 pub(crate) discovery_ctx: Arc<EngineDiscoveryContext>,
37}
38
39pub(crate) struct InboundPacket {
41 pub(crate) raw: Bytes,
42 pub(crate) face_id: FaceId,
43 pub(crate) arrival: u64,
44 pub(crate) meta: InboundMeta,
48}
49
50pub struct PacketDispatcher {
62 pub face_table: Arc<FaceTable>,
63 pub face_states: Arc<dashmap::DashMap<FaceId, FaceState>>,
64 pub rib: Arc<Rib>,
65 pub decode: TlvDecodeStage,
66 pub cs_lookup: CsLookupStage,
67 pub pit_check: PitCheckStage,
68 pub strategy: StrategyStage,
69 pub pit_match: PitMatchStage,
70 pub validation: ValidationStage,
71 pub cs_insert: CsInsertStage,
72 pub channel_cap: usize,
73 pub pipeline_threads: usize,
75 pub discovery: Arc<dyn DiscoveryProtocol>,
78 pub discovery_ctx: Arc<EngineDiscoveryContext>,
80}
81
82impl PacketDispatcher {
83 pub(crate) fn spawn(
89 self,
90 cancel: CancellationToken,
91 tasks: &mut JoinSet<()>,
92 ) -> mpsc::Sender<InboundPacket> {
93 let (tx, rx) = mpsc::channel::<InboundPacket>(self.channel_cap);
94 let dispatcher = Arc::new(self);
95
96 for face_id in dispatcher.face_table.face_ids() {
98 if let Some(face) = dispatcher.face_table.get(face_id) {
99 if !dispatcher.face_states.contains_key(&face_id) {
101 let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
102 let persistency = FacePersistency::Permanent;
103 #[cfg(feature = "face-net")]
104 let state = if face.kind() == FaceKind::Udp {
105 FaceState::new_reliable(
106 cancel.child_token(),
107 persistency,
108 send_tx,
109 ndn_faces::net::DEFAULT_UDP_MTU,
110 )
111 } else {
112 FaceState::new(cancel.child_token(), persistency, send_tx)
113 };
114 #[cfg(not(feature = "face-net"))]
115 let state = FaceState::new(cancel.child_token(), persistency, send_tx);
116 dispatcher.face_states.insert(face_id, state);
117 let send_face = Arc::clone(&face);
119 let send_cancel = cancel.clone();
120 let fs = Arc::clone(&dispatcher.face_states);
121 let ft = Arc::clone(&dispatcher.face_table);
122 let fib = Arc::clone(&dispatcher.strategy.fib);
123 let rib = Arc::clone(&dispatcher.rib);
124 tasks.spawn(engine::run_face_sender(
125 send_face,
126 send_rx,
127 persistency,
128 FaceRunnerCtx {
129 face_id,
130 cancel: send_cancel,
131 face_table: ft,
132 fib,
133 rib,
134 face_states: fs,
135 discovery: Arc::clone(&dispatcher.discovery),
136 discovery_ctx: Arc::clone(&dispatcher.discovery_ctx),
137 },
138 ));
139 }
140
141 let tx2 = tx.clone();
142 let pit = Arc::clone(&dispatcher.pit_check.pit);
143 let reader_ctx = FaceRunnerCtx {
144 face_id,
145 cancel: cancel.clone(),
146 face_table: Arc::clone(&dispatcher.face_table),
147 fib: Arc::clone(&dispatcher.strategy.fib),
148 rib: Arc::clone(&dispatcher.rib),
149 face_states: Arc::clone(&dispatcher.face_states),
150 discovery: Arc::clone(&dispatcher.discovery),
151 discovery_ctx: Arc::clone(&dispatcher.discovery_ctx),
152 };
153 tasks.spawn(async move {
154 run_face_reader(face, tx2, pit, reader_ctx).await;
155 });
156 }
157 }
158
159 let d = Arc::clone(&dispatcher);
161 let cancel2 = cancel.clone();
162 tasks.spawn(async move {
163 d.run_pipeline(rx, cancel2).await;
164 });
165
166 if dispatcher.validation.validator.is_some() {
168 let d = Arc::clone(&dispatcher);
169 let cancel3 = cancel.clone();
170 tasks.spawn(async move {
171 d.run_validation_drain(cancel3).await;
172 });
173 }
174
175 tx
176 }
177}