1use std::sync::{Arc, Weak};
8use std::time::Instant;
9
10use bytes::Bytes;
11use dashmap::DashMap;
12use ndn_discovery::{
13 DiscoveryContext, NeighborTable, NeighborTableView, NeighborUpdate, ProtocolId,
14};
15use ndn_packet::Name;
16use ndn_transport::{ErasedFace, FaceId, FacePersistency};
17use tokio::sync::mpsc;
18use tokio_util::sync::CancellationToken;
19use tracing::warn;
20
21use crate::engine::{DEFAULT_SEND_QUEUE_CAP, EngineInner, FaceState};
22
23type OwnedRoutes = DashMap<ProtocolId, Vec<(Name, FaceId)>>;
26
27pub struct EngineDiscoveryContext {
33 pub(crate) inner: Weak<EngineInner>,
37 neighbors: Arc<NeighborTable>,
42 pub(crate) cancel: CancellationToken,
44 owned_routes: Arc<OwnedRoutes>,
46}
47
48impl EngineDiscoveryContext {
49 pub(crate) fn new(
50 inner: Weak<EngineInner>,
51 neighbors: Arc<NeighborTable>,
52 cancel: CancellationToken,
53 ) -> Arc<Self> {
54 Arc::new(Self {
55 inner,
56 neighbors,
57 cancel,
58 owned_routes: Arc::new(DashMap::new()),
59 })
60 }
61}
62
63impl DiscoveryContext for EngineDiscoveryContext {
64 fn alloc_face_id(&self) -> FaceId {
67 let inner = match self.inner.upgrade() {
68 Some(i) => i,
69 None => {
70 warn!("DiscoveryContext::alloc_face_id called after engine shutdown");
71 return FaceId(0);
72 }
73 };
74 inner.face_table.alloc_id()
75 }
76
77 fn add_face(&self, face: Arc<dyn ErasedFace>) -> FaceId {
78 let inner = match self.inner.upgrade() {
79 Some(i) => i,
80 None => {
81 warn!("DiscoveryContext::add_face called after engine shutdown");
82 return FaceId(0);
83 }
84 };
85
86 let face_id = face.id();
87 let kind = face.kind();
88 let (send_tx, send_rx) = mpsc::channel(DEFAULT_SEND_QUEUE_CAP);
89 let cancel = self.cancel.child_token();
90
91 #[cfg(feature = "face-net")]
92 let state = if kind == ndn_transport::FaceKind::Udp {
93 FaceState::new_reliable(
94 cancel.clone(),
95 FacePersistency::OnDemand,
96 send_tx,
97 ndn_faces::net::DEFAULT_UDP_MTU,
98 )
99 } else {
100 FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx)
101 };
102 #[cfg(not(feature = "face-net"))]
103 let state = FaceState::new(cancel.clone(), FacePersistency::OnDemand, send_tx);
104 inner.face_states.insert(face_id, state);
105 inner.face_table.insert_arc(Arc::clone(&face));
106
107 let discovery = Arc::clone(&inner.discovery);
108 let discovery_ctx = inner
109 .discovery_ctx
110 .get()
111 .expect("EngineDiscoveryContext not yet initialized")
112 .clone();
113
114 {
116 let d = Arc::clone(&discovery);
117 let ctx = Arc::clone(&discovery_ctx);
118 tokio::spawn(crate::engine::run_face_sender(
119 Arc::clone(&face),
120 send_rx,
121 FacePersistency::OnDemand,
122 crate::dispatcher::FaceRunnerCtx {
123 face_id,
124 cancel: cancel.clone(),
125 face_table: Arc::clone(&inner.face_table),
126 fib: Arc::clone(&inner.fib),
127 rib: Arc::clone(&inner.rib),
128 face_states: Arc::clone(&inner.face_states),
129 discovery: d,
130 discovery_ctx: ctx,
131 },
132 ));
133 }
134
135 let pipeline_tx = match inner.pipeline_tx.get() {
137 Some(tx) => tx.clone(),
138 None => {
139 warn!("DiscoveryContext::add_face: pipeline_tx not yet initialized");
140 return FaceId(0);
141 }
142 };
143 tokio::spawn(crate::dispatcher::run_face_reader(
144 face,
145 pipeline_tx,
146 Arc::clone(&inner.pit),
147 crate::dispatcher::FaceRunnerCtx {
148 face_id,
149 cancel,
150 face_table: Arc::clone(&inner.face_table),
151 fib: Arc::clone(&inner.fib),
152 rib: Arc::clone(&inner.rib),
153 face_states: Arc::clone(&inner.face_states),
154 discovery,
155 discovery_ctx,
156 },
157 ));
158
159 face_id
160 }
161
162 fn remove_face(&self, face_id: FaceId) {
163 let inner = match self.inner.upgrade() {
164 Some(i) => i,
165 None => return,
166 };
167 if let Some((_, state)) = inner.face_states.remove(&face_id) {
168 state.cancel.cancel();
169 }
170 inner.rib.handle_face_down(face_id, &inner.fib);
171 inner.fib.remove_face(face_id);
172 inner.face_table.remove(face_id);
173 }
174
175 fn add_fib_entry(&self, prefix: &Name, nexthop: FaceId, cost: u32, owner: ProtocolId) {
178 let inner = match self.inner.upgrade() {
179 Some(i) => i,
180 None => return,
181 };
182 inner.fib.add_nexthop(prefix, nexthop, cost);
183 self.owned_routes
184 .entry(owner)
185 .or_default()
186 .push((prefix.clone(), nexthop));
187 }
188
189 fn remove_fib_entry(&self, prefix: &Name, nexthop: FaceId, owner: ProtocolId) {
190 let inner = match self.inner.upgrade() {
191 Some(i) => i,
192 None => return,
193 };
194 inner.fib.remove_nexthop(prefix, nexthop);
195 if let Some(mut routes) = self.owned_routes.get_mut(&owner) {
196 routes.retain(|(n, f)| !(n == prefix && *f == nexthop));
197 }
198 }
199
200 fn remove_fib_entries_by_owner(&self, owner: ProtocolId) {
201 let inner = match self.inner.upgrade() {
202 Some(i) => i,
203 None => return,
204 };
205 if let Some((_, routes)) = self.owned_routes.remove(&owner) {
206 for (prefix, nexthop) in routes {
207 inner.fib.remove_nexthop(&prefix, nexthop);
208 }
209 }
210 }
211
212 fn neighbors(&self) -> Arc<dyn NeighborTableView> {
215 Arc::clone(&self.neighbors) as Arc<dyn NeighborTableView>
216 }
217
218 fn update_neighbor(&self, update: NeighborUpdate) {
219 self.neighbors.apply(update);
220 }
221
222 fn send_on(&self, face_id: FaceId, pkt: Bytes) {
225 let inner = match self.inner.upgrade() {
226 Some(i) => i,
227 None => return,
228 };
229 if let Some(state) = inner.face_states.get(&face_id) {
230 let _ = state.send_tx.try_send(pkt);
231 }
232 }
233
234 fn now(&self) -> Instant {
237 Instant::now()
238 }
239}