1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::{Arc, OnceLock};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use dashmap::DashMap;
6use ndn_discovery::{DiscoveryProtocol, NeighborTable};
7use tokio::sync::mpsc;
8use tokio::task::JoinSet;
9use tokio_util::sync::CancellationToken;
10
11use ndn_packet::Interest;
12use ndn_packet::lp::encode_lp_packet;
13use ndn_security::{SecurityManager, Validator};
14use ndn_store::{ErasedContentStore, Pit, PitToken, StrategyTable};
15use ndn_strategy::MeasurementsTable;
16use ndn_transport::{Face, FaceId, FacePersistency, FaceTable};
17
18use crate::discovery_context::EngineDiscoveryContext;
19
20use crate::stages::ErasedStrategy;
21
22use crate::Fib;
23use crate::dispatcher::InboundPacket;
24use crate::rib::Rib;
25use crate::routing::RoutingManager;
26
27pub const DEFAULT_SEND_QUEUE_CAP: usize = 2048;
39
40#[derive(Default)]
43pub struct FaceCounters {
44 pub in_interests: AtomicU64,
45 pub in_data: AtomicU64,
46 pub out_interests: AtomicU64,
47 pub out_data: AtomicU64,
48 pub in_bytes: AtomicU64,
49 pub out_bytes: AtomicU64,
50}
51
52pub struct FaceState {
54 pub cancel: CancellationToken,
55 pub persistency: FacePersistency,
56 pub last_activity: AtomicU64,
59 pub counters: FaceCounters,
61 pub send_tx: mpsc::Sender<bytes::Bytes>,
69 #[cfg(feature = "face-net")]
71 pub reliability: Option<std::sync::Mutex<ndn_faces::net::reliability::LpReliability>>,
72 pub uses_lp: AtomicBool,
80}
81
82impl FaceState {
83 pub fn new(
84 cancel: CancellationToken,
85 persistency: FacePersistency,
86 send_tx: mpsc::Sender<bytes::Bytes>,
87 ) -> Self {
88 let now = SystemTime::now()
89 .duration_since(UNIX_EPOCH)
90 .unwrap_or_default()
91 .as_nanos() as u64;
92 Self {
93 cancel,
94 persistency,
95 last_activity: AtomicU64::new(now),
96 counters: FaceCounters::default(),
97 send_tx,
98 #[cfg(feature = "face-net")]
99 reliability: None,
100 uses_lp: AtomicBool::new(false),
101 }
102 }
103
104 #[cfg(feature = "face-net")]
106 pub fn new_reliable(
107 cancel: CancellationToken,
108 persistency: FacePersistency,
109 send_tx: mpsc::Sender<bytes::Bytes>,
110 mtu: usize,
111 ) -> Self {
112 let now = SystemTime::now()
113 .duration_since(UNIX_EPOCH)
114 .unwrap_or_default()
115 .as_nanos() as u64;
116 Self {
117 cancel,
118 persistency,
119 last_activity: AtomicU64::new(now),
120 counters: FaceCounters::default(),
121 send_tx,
122 reliability: Some(std::sync::Mutex::new(
123 ndn_faces::net::reliability::LpReliability::new(mtu),
124 )),
125 uses_lp: AtomicBool::new(false),
126 }
127 }
128
129 pub fn touch(&self) {
130 let now = SystemTime::now()
131 .duration_since(UNIX_EPOCH)
132 .unwrap_or_default()
133 .as_nanos() as u64;
134 self.last_activity.store(now, Ordering::Relaxed);
135 }
136}
137
138pub struct EngineInner {
140 pub fib: Arc<Fib>,
141 pub rib: Arc<Rib>,
142 pub routing: Arc<RoutingManager>,
143 pub pit: Arc<Pit>,
144 pub cs: Arc<dyn ErasedContentStore>,
145 pub face_table: Arc<FaceTable>,
146 pub measurements: Arc<MeasurementsTable>,
147 pub strategy_table: Arc<StrategyTable<dyn ErasedStrategy>>,
148 pub security: Option<Arc<SecurityManager>>,
151 pub validator: Option<Arc<Validator>>,
156 pub(crate) pipeline_tx: OnceLock<mpsc::Sender<InboundPacket>>,
163 pub(crate) face_states: Arc<DashMap<FaceId, FaceState>>,
165 pub discovery: Arc<dyn DiscoveryProtocol>,
167 pub neighbors: Arc<NeighborTable>,
169 pub(crate) discovery_ctx: OnceLock<Arc<EngineDiscoveryContext>>,
172}
173
174#[derive(Clone)]
178pub struct ForwarderEngine {
179 pub(crate) inner: Arc<EngineInner>,
180}
181
182impl ForwarderEngine {
183 pub fn fib(&self) -> Arc<Fib> {
184 Arc::clone(&self.inner.fib)
185 }
186
187 pub fn rib(&self) -> Arc<Rib> {
188 Arc::clone(&self.inner.rib)
189 }
190
191 pub fn routing(&self) -> Arc<RoutingManager> {
192 Arc::clone(&self.inner.routing)
193 }
194
195 pub fn faces(&self) -> Arc<FaceTable> {
196 Arc::clone(&self.inner.face_table)
197 }
198
199 pub fn pit(&self) -> Arc<Pit> {
200 Arc::clone(&self.inner.pit)
201 }
202
203 pub fn cs(&self) -> Arc<dyn ErasedContentStore> {
204 Arc::clone(&self.inner.cs)
205 }
206
207 pub fn security(&self) -> Option<Arc<SecurityManager>> {
208 self.inner.security.as_ref().map(Arc::clone)
209 }
210
211 pub fn validator(&self) -> Option<Arc<Validator>> {
218 self.inner.validator.as_ref().map(Arc::clone)
219 }
220
221 pub fn strategy_table(&self) -> Arc<StrategyTable<dyn ErasedStrategy>> {
222 Arc::clone(&self.inner.strategy_table)
223 }
224
225 pub fn neighbors(&self) -> Arc<NeighborTable> {
226 Arc::clone(&self.inner.neighbors)
227 }
228
229 pub fn measurements(&self) -> Arc<MeasurementsTable> {
230 Arc::clone(&self.inner.measurements)
231 }
232
233 pub fn discovery(&self) -> Arc<dyn DiscoveryProtocol> {
234 Arc::clone(&self.inner.discovery)
235 }
236
237 pub fn discovery_ctx(&self) -> Arc<EngineDiscoveryContext> {
241 self.inner
242 .discovery_ctx
243 .get()
244 .expect("discovery_ctx not initialized")
245 .clone()
246 }
247
248 pub fn source_face_id(&self, interest: &Interest) -> Option<FaceId> {
250 let token = PitToken::from_interest_full(
251 &interest.name,
252 Some(interest.selectors()),
253 interest.forwarding_hint(),
254 );
255 self.inner
256 .pit
257 .with_entry(&token, |entry| {
258 entry.in_records.first().map(|r| FaceId(r.face_id))
259 })
260 .flatten()
261 }
262
263 pub fn add_face<F: Face + 'static>(&self, face: F, cancel: CancellationToken) {
268 self.add_face_with_persistency(face, cancel, FacePersistency::OnDemand);
269 }
270
271 pub fn add_face_with_persistency<F: Face + 'static>(
277 &self,
278 face: F,
279 cancel: CancellationToken,
280 persistency: FacePersistency,
281 ) {
282 let face_id = face.id();
283 let kind = face.kind();
284 let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
285 #[cfg(feature = "face-net")]
286 let state = if kind == ndn_transport::FaceKind::Udp {
287 FaceState::new_reliable(
288 cancel.clone(),
289 persistency,
290 send_tx,
291 ndn_faces::net::DEFAULT_UDP_MTU,
292 )
293 } else {
294 FaceState::new(cancel.clone(), persistency, send_tx)
295 };
296 #[cfg(not(feature = "face-net"))]
297 let state = FaceState::new(cancel.clone(), persistency, send_tx);
298 self.inner.face_states.insert(face_id, state);
299 self.inner.face_table.insert(face);
300 let erased = self
301 .inner
302 .face_table
303 .get(face_id)
304 .expect("face was just inserted");
305
306 let discovery = Arc::clone(&self.inner.discovery);
307 let discovery_ctx = self.discovery_ctx();
308
309 tokio::spawn(run_face_sender(
311 Arc::clone(&erased),
312 send_rx,
313 persistency,
314 crate::dispatcher::FaceRunnerCtx {
315 face_id,
316 cancel: cancel.clone(),
317 face_table: Arc::clone(&self.inner.face_table),
318 fib: Arc::clone(&self.inner.fib),
319 rib: Arc::clone(&self.inner.rib),
320 face_states: Arc::clone(&self.inner.face_states),
321 discovery: Arc::clone(&discovery),
322 discovery_ctx: Arc::clone(&discovery_ctx),
323 },
324 ));
325
326 tokio::spawn(crate::dispatcher::run_face_reader(
328 erased,
329 self.inner
330 .pipeline_tx
331 .get()
332 .expect("pipeline_tx initialized")
333 .clone(),
334 Arc::clone(&self.inner.pit),
335 crate::dispatcher::FaceRunnerCtx {
336 face_id,
337 cancel,
338 face_table: Arc::clone(&self.inner.face_table),
339 fib: Arc::clone(&self.inner.fib),
340 rib: Arc::clone(&self.inner.rib),
341 face_states: Arc::clone(&self.inner.face_states),
342 discovery: Arc::clone(&discovery),
343 discovery_ctx,
344 },
345 ));
346
347 let ctx = self.discovery_ctx();
349 discovery.on_face_up(face_id, &*ctx);
350 }
351
352 pub fn add_face_send_only<F: Face + 'static>(&self, face: F, cancel: CancellationToken) {
360 let face_id = face.id();
361 let kind = face.kind();
362 let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
363 #[cfg(feature = "face-net")]
364 let state = if kind == ndn_transport::FaceKind::Udp {
365 FaceState::new_reliable(
366 cancel.clone(),
367 FacePersistency::OnDemand,
368 send_tx,
369 ndn_faces::net::DEFAULT_UDP_MTU,
370 )
371 } else {
372 FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx)
373 };
374 #[cfg(not(feature = "face-net"))]
375 let state = FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx);
376 self.inner.face_states.insert(face_id, state);
377 self.inner.face_table.insert(face);
378
379 let erased = self
380 .inner
381 .face_table
382 .get(face_id)
383 .expect("face was just inserted");
384 let discovery = Arc::clone(&self.inner.discovery);
385 let discovery_ctx = self.discovery_ctx();
386 tokio::spawn(run_face_sender(
387 erased,
388 send_rx,
389 FacePersistency::OnDemand,
390 crate::dispatcher::FaceRunnerCtx {
391 face_id,
392 cancel,
393 face_table: Arc::clone(&self.inner.face_table),
394 fib: Arc::clone(&self.inner.fib),
395 rib: Arc::clone(&self.inner.rib),
396 face_states: Arc::clone(&self.inner.face_states),
397 discovery: Arc::clone(&discovery),
398 discovery_ctx: Arc::clone(&discovery_ctx),
399 },
400 ));
401
402 discovery.on_face_up(face_id, &*discovery_ctx);
404 }
405
406 pub async fn inject_packet(
419 &self,
420 raw: bytes::Bytes,
421 face_id: FaceId,
422 arrival: u64,
423 meta: ndn_discovery::InboundMeta,
424 ) -> Result<(), ()> {
425 if let Some(states) = self.inner.face_states.get(&face_id)
429 && let Some(rel) = states.reliability.as_ref()
430 {
431 rel.lock().unwrap().on_receive(&raw);
432 }
433
434 let tx = match self.inner.pipeline_tx.get() {
435 Some(tx) => tx,
436 None => return Err(()),
437 };
438 tx.send(InboundPacket {
439 raw,
440 face_id,
441 arrival,
442 meta,
443 })
444 .await
445 .map_err(|_| ())
446 }
447
448 pub fn face_token(&self, face_id: FaceId) -> Option<CancellationToken> {
450 self.inner
451 .face_states
452 .get(&face_id)
453 .map(|r| r.cancel.clone())
454 }
455
456 pub fn face_states(&self) -> Arc<DashMap<FaceId, FaceState>> {
458 Arc::clone(&self.inner.face_states)
459 }
460}
461
462pub struct ShutdownHandle {
464 pub(crate) cancel: CancellationToken,
465 pub(crate) tasks: JoinSet<()>,
466}
467
468impl ShutdownHandle {
469 pub fn cancel_token(&self) -> CancellationToken {
471 self.cancel.clone()
472 }
473
474 pub async fn shutdown(mut self) {
476 self.cancel.cancel();
477 while let Some(result) = self.tasks.join_next().await {
478 if let Err(e) = result {
479 tracing::warn!("engine task panicked during shutdown: {e}");
480 }
481 }
482 }
483}
484
485pub(crate) async fn run_face_sender(
501 face: Arc<dyn ndn_transport::ErasedFace>,
502 mut rx: mpsc::Receiver<bytes::Bytes>,
503 persistency: FacePersistency,
504 ctx: crate::dispatcher::FaceRunnerCtx,
505) {
506 let crate::dispatcher::FaceRunnerCtx {
507 face_id,
508 cancel,
509 face_table,
510 fib,
511 rib,
512 face_states,
513 discovery,
514 discovery_ctx,
515 } = ctx;
516 let has_reliability = face_states
518 .get(&face_id)
519 .map(|s| s.reliability.is_some())
520 .unwrap_or(false);
521
522 let mut retx_tick = tokio::time::interval(std::time::Duration::from_millis(50));
523 retx_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
524
525 let handle_send_error = |e: ndn_transport::FaceError| -> bool {
527 match persistency {
528 FacePersistency::Permanent => {
529 tracing::warn!(face=%face_id, error=%e, "send error on permanent face, continuing");
530 false }
532 _ => {
533 tracing::warn!(face=%face_id, error=%e, "send error, closing face");
534 if persistency == FacePersistency::OnDemand {
535 discovery.on_face_down(face_id, &*discovery_ctx);
536 if let Some((_, state)) = face_states.remove(&face_id) {
537 state.cancel.cancel();
538 }
539 rib.handle_face_down(face_id, &fib);
540 fib.remove_face(face_id);
541 face_table.remove(face_id);
542 }
543 true }
545 }
546 };
547
548 loop {
549 tokio::select! {
550 _ = cancel.cancelled() => break,
551 pkt = rx.recv() => {
552 let pkt = match pkt {
553 Some(p) => p,
554 None => break,
555 };
556
557 if has_reliability {
558 let wires = {
560 let state = face_states.get(&face_id);
561 match state.as_ref().and_then(|s| s.reliability.as_ref()) {
562 Some(rel) => rel.lock().unwrap().on_send(&pkt),
563 None => vec![pkt],
564 }
565 };
566 for wire in wires {
567 if let Err(e) = face.send_bytes(wire).await
568 && handle_send_error(e)
569 {
570 return;
571 }
572 }
573 } else {
574 let wire = if face_states
580 .get(&face_id)
581 .map(|s| s.uses_lp.load(Ordering::Relaxed))
582 .unwrap_or(false)
583 {
584 encode_lp_packet(&pkt)
585 } else {
586 pkt
587 };
588 if let Err(e) = face.send_bytes(wire).await
589 && handle_send_error(e)
590 {
591 return;
592 }
593 }
594 },
595 _ = retx_tick.tick(), if has_reliability => {
596 let (retx, ack_pkt) = {
597 let state = face_states.get(&face_id);
598 match state.as_ref().and_then(|s| s.reliability.as_ref()) {
599 Some(rel) => {
600 let mut rel = rel.lock().unwrap();
601 let retx = rel.check_retransmit();
602 let ack_pkt = rel.flush_acks();
603 (retx, ack_pkt)
604 }
605 None => (vec![], None),
606 }
607 };
608 for wire in retx {
609 if let Err(e) = face.send_bytes(wire).await
610 && handle_send_error(e)
611 {
612 return;
613 }
614 }
615 if let Some(wire) = ack_pkt {
616 let _ = face.send_bytes(wire).await;
617 }
618 }
619 }
620 }
621}