ndn_engine/
discovery_context.rs

1//! Engine-side implementation of `DiscoveryContext`.
2//!
3//! `EngineDiscoveryContext` is the bridge between discovery protocols and the
4//! engine's internal tables.  It holds a `Weak<EngineInner>` to break the
5//! reference cycle (EngineInner → Arc<EngineDiscoveryContext> → Weak<EngineInner>).
6
7use std::sync::{Arc, Weak};
8use std::time::Instant;
9
10use bytes::Bytes;
11use dashmap::DashMap;
12use ndn_discovery::{
13    DiscoveryContext, NeighborTable, NeighborTableView, NeighborUpdate, ProtocolId,
14};
15use ndn_packet::Name;
16use ndn_transport::{ErasedFace, FaceId, FacePersistency};
17use tokio::sync::mpsc;
18use tokio_util::sync::CancellationToken;
19use tracing::warn;
20
21use crate::engine::{DEFAULT_SEND_QUEUE_CAP, EngineInner, FaceState};
22
23/// Side-table tracking which FIB entries were installed by each discovery
24/// protocol, so they can be bulk-removed via `remove_fib_entries_by_owner`.
25type OwnedRoutes = DashMap<ProtocolId, Vec<(Name, FaceId)>>;
26
27/// Engine implementation of `DiscoveryContext`.
28///
29/// Constructed once at engine build time and stored in `EngineInner`.  Every
30/// task that may call `on_face_up` / `on_face_down` / `on_inbound` receives an
31/// `Arc<EngineDiscoveryContext>`.
32pub struct EngineDiscoveryContext {
33    /// Weak reference to the engine's shared inner state.  We use `Weak` here
34    /// so that `EngineInner → Arc<EngineDiscoveryContext> → Weak<EngineInner>`
35    /// doesn't form a strong reference cycle.
36    pub(crate) inner: Weak<EngineInner>,
37    /// Direct strong reference to the neighbor table.  This is the same
38    /// `Arc<NeighborTable>` stored in `EngineInner::neighbors`, duplicated here
39    /// so `neighbors() -> &dyn NeighborTableView` can return a reference valid
40    /// for the lifetime of `&self` without needing to upgrade the `Weak`.
41    neighbors: Arc<NeighborTable>,
42    /// Cancellation token for faces dynamically added by discovery protocols.
43    pub(crate) cancel: CancellationToken,
44    /// Tracks `(prefix, nexthop_face_id)` pairs for each owning protocol.
45    owned_routes: Arc<OwnedRoutes>,
46}
47
48impl EngineDiscoveryContext {
49    pub(crate) fn new(
50        inner: Weak<EngineInner>,
51        neighbors: Arc<NeighborTable>,
52        cancel: CancellationToken,
53    ) -> Arc<Self> {
54        Arc::new(Self {
55            inner,
56            neighbors,
57            cancel,
58            owned_routes: Arc::new(DashMap::new()),
59        })
60    }
61}
62
63impl DiscoveryContext for EngineDiscoveryContext {
64    // ── Face management ──────────────────────────────────────────────────────
65
66    fn alloc_face_id(&self) -> FaceId {
67        let inner = match self.inner.upgrade() {
68            Some(i) => i,
69            None => {
70                warn!("DiscoveryContext::alloc_face_id called after engine shutdown");
71                return FaceId(0);
72            }
73        };
74        inner.face_table.alloc_id()
75    }
76
77    fn add_face(&self, face: Arc<dyn ErasedFace>) -> FaceId {
78        let inner = match self.inner.upgrade() {
79            Some(i) => i,
80            None => {
81                warn!("DiscoveryContext::add_face called after engine shutdown");
82                return FaceId(0);
83            }
84        };
85
86        let face_id = face.id();
87        let kind = face.kind();
88        let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
89        let cancel = self.cancel.child_token();
90
91        #[cfg(feature = "face-net")]
92        let state = if kind == ndn_transport::FaceKind::Udp {
93            FaceState::new_reliable(
94                cancel.clone(),
95                FacePersistency::OnDemand,
96                send_tx,
97                ndn_faces::net::DEFAULT_UDP_MTU,
98            )
99        } else {
100            FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx)
101        };
102        #[cfg(not(feature = "face-net"))]
103        let state = FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx);
104        inner.face_states.insert(face_id, state);
105        inner.face_table.insert_arc(Arc::clone(&face));
106
107        let discovery = Arc::clone(&inner.discovery);
108        let discovery_ctx = inner
109            .discovery_ctx
110            .get()
111            .expect("EngineDiscoveryContext not yet initialized")
112            .clone();
113
114        // Spawn outbound send task.
115        {
116            let d = Arc::clone(&discovery);
117            let ctx = Arc::clone(&discovery_ctx);
118            tokio::spawn(crate::engine::run_face_sender(
119                Arc::clone(&face),
120                send_rx,
121                FacePersistency::OnDemand,
122                crate::dispatcher::FaceRunnerCtx {
123                    face_id,
124                    cancel: cancel.clone(),
125                    face_table: Arc::clone(&inner.face_table),
126                    fib: Arc::clone(&inner.fib),
127                    rib: Arc::clone(&inner.rib),
128                    face_states: Arc::clone(&inner.face_states),
129                    discovery: d,
130                    discovery_ctx: ctx,
131                },
132            ));
133        }
134
135        // Spawn inbound recv task.
136        let pipeline_tx = match inner.pipeline_tx.get() {
137            Some(tx) => tx.clone(),
138            None => {
139                warn!("DiscoveryContext::add_face: pipeline_tx not yet initialized");
140                return FaceId(0);
141            }
142        };
143        tokio::spawn(crate::dispatcher::run_face_reader(
144            face,
145            pipeline_tx,
146            Arc::clone(&inner.pit),
147            crate::dispatcher::FaceRunnerCtx {
148                face_id,
149                cancel,
150                face_table: Arc::clone(&inner.face_table),
151                fib: Arc::clone(&inner.fib),
152                rib: Arc::clone(&inner.rib),
153                face_states: Arc::clone(&inner.face_states),
154                discovery,
155                discovery_ctx,
156            },
157        ));
158
159        face_id
160    }
161
162    fn remove_face(&self, face_id: FaceId) {
163        let inner = match self.inner.upgrade() {
164            Some(i) => i,
165            None => return,
166        };
167        if let Some((_, state)) = inner.face_states.remove(&face_id) {
168            state.cancel.cancel();
169        }
170        inner.rib.handle_face_down(face_id, &inner.fib);
171        inner.fib.remove_face(face_id);
172        inner.face_table.remove(face_id);
173    }
174
175    // ── FIB management ───────────────────────────────────────────────────────
176
177    fn add_fib_entry(&self, prefix: &Name, nexthop: FaceId, cost: u32, owner: ProtocolId) {
178        let inner = match self.inner.upgrade() {
179            Some(i) => i,
180            None => return,
181        };
182        inner.fib.add_nexthop(prefix, nexthop, cost);
183        self.owned_routes
184            .entry(owner)
185            .or_default()
186            .push((prefix.clone(), nexthop));
187    }
188
189    fn remove_fib_entry(&self, prefix: &Name, nexthop: FaceId, owner: ProtocolId) {
190        let inner = match self.inner.upgrade() {
191            Some(i) => i,
192            None => return,
193        };
194        inner.fib.remove_nexthop(prefix, nexthop);
195        if let Some(mut routes) = self.owned_routes.get_mut(&owner) {
196            routes.retain(|(n, f)| !(n == prefix && *f == nexthop));
197        }
198    }
199
200    fn remove_fib_entries_by_owner(&self, owner: ProtocolId) {
201        let inner = match self.inner.upgrade() {
202            Some(i) => i,
203            None => return,
204        };
205        if let Some((_, routes)) = self.owned_routes.remove(&owner) {
206            for (prefix, nexthop) in routes {
207                inner.fib.remove_nexthop(&prefix, nexthop);
208            }
209        }
210    }
211
212    // ── Neighbor table ───────────────────────────────────────────────────────
213
214    fn neighbors(&self) -> Arc<dyn NeighborTableView> {
215        Arc::clone(&self.neighbors) as Arc<dyn NeighborTableView>
216    }
217
218    fn update_neighbor(&self, update: NeighborUpdate) {
219        self.neighbors.apply(update);
220    }
221
222    // ── Packet I/O ───────────────────────────────────────────────────────────
223
224    fn send_on(&self, face_id: FaceId, pkt: Bytes) {
225        let inner = match self.inner.upgrade() {
226            Some(i) => i,
227            None => return,
228        };
229        if let Some(state) = inner.face_states.get(&face_id) {
230            let _ = state.send_tx.try_send(pkt);
231        }
232    }
233
234    // ── Time ─────────────────────────────────────────────────────────────────
235
236    fn now(&self) -> Instant {
237        Instant::now()
238    }
239}