ndn_engine/
routing.rs

1use std::sync::Arc;
2
3use dashmap::DashMap;
4use ndn_discovery::NeighborTable;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use ndn_transport::FaceTable;
9
10use crate::{Fib, Rib};
11
12/// Engine handles passed to a routing protocol when it starts.
13///
14/// Bundles the shared tables a routing protocol needs to install routes and
15/// query forwarding state. Passed by value to [`RoutingProtocol::start`].
16pub struct RoutingHandle {
17    /// Routing Information Base — the protocol writes routes here via
18    /// `rib.add()` / `rib.remove()` and calls `rib.apply_to_fib()` to push
19    /// updates into the FIB.
20    pub rib: Arc<Rib>,
21    /// Forwarding Information Base — needed for `rib.apply_to_fib()` calls.
22    pub fib: Arc<Fib>,
23    /// Face table — read to enumerate active faces (e.g. to broadcast updates).
24    pub faces: Arc<FaceTable>,
25    /// Neighbor table — read-only view of discovered peers and their faces.
26    pub neighbors: Arc<NeighborTable>,
27}
28
29/// A routing protocol that manages routes in the RIB.
30///
31/// Implementations run as Tokio background tasks. Each protocol registers
32/// routes under a distinct [`origin`] value, which the RIB uses to namespace
33/// them. Multiple protocols run concurrently; the RIB computes the best
34/// nexthops across all origins when building FIB entries.
35///
36/// # Object safety
37///
38/// The trait is object-safe. [`start`] takes `&self` — implementations clone
39/// their internal state (typically held in an `Arc<Inner>`) to move into the
40/// spawned task.
41///
42/// # Example
43///
44/// ```rust,ignore
45/// struct MyProtocol { inner: Arc<MyState> }
46///
47/// impl RoutingProtocol for MyProtocol {
48///     fn origin(&self) -> u64 { ndn_config::control_parameters::origin::STATIC }
49///
50///     fn start(&self, handle: RoutingHandle, cancel: CancellationToken) -> JoinHandle<()> {
51///         let inner = Arc::clone(&self.inner);
52///         tokio::spawn(async move {
53///             inner.run(handle, cancel).await;
54///         })
55///     }
56/// }
57/// ```
58///
59/// [`origin`]: RoutingProtocol::origin
60/// [`start`]: RoutingProtocol::start
61pub trait RoutingProtocol: Send + Sync + 'static {
62    /// Route origin value this protocol registers under.
63    ///
64    /// Each running instance must use a unique value. Standard values are in
65    /// `ndn_config::control_parameters::origin` (e.g. `NLSR = 128`).
66    /// Custom protocols should use values in the range 64–127.
67    fn origin(&self) -> u64;
68
69    /// Start the protocol as a Tokio background task.
70    ///
71    /// Should run until `cancel` is cancelled, then return. The implementation
72    /// calls `handle.rib.add()` to register routes and
73    /// `handle.rib.apply_to_fib()` to push changes into the FIB.
74    ///
75    /// Routes are automatically flushed from the RIB when the protocol is
76    /// stopped via [`RoutingManager::disable`].
77    fn start(&self, handle: RoutingHandle, cancel: CancellationToken) -> JoinHandle<()>;
78}
79
80struct ProtocolHandle {
81    cancel: CancellationToken,
82    /// The task handle. Dropped (not awaited) when the protocol is disabled,
83    /// which keeps the task running briefly until it reaches a cancellation
84    /// checkpoint.
85    _task: JoinHandle<()>,
86}
87
88/// Manages a set of concurrently-running routing protocols.
89///
90/// # How it works
91///
92/// Each protocol is assigned a unique [`origin`] value. When enabled, the
93/// manager starts the protocol as an independent Tokio task with a child
94/// cancellation token. When disabled, the child token is cancelled (stopping
95/// the task at its next `cancel.cancelled().await`) and all routes the
96/// protocol registered are flushed from the RIB; the FIB is recomputed for
97/// affected prefixes from any remaining protocols' routes.
98///
99/// # Multiple protocols
100///
101/// Running two protocols simultaneously is the normal case: for example, DVR
102/// (origin 128) discovering routes for `/ndn/edu/…` and autoconf (origin 66)
103/// advertising link-local routes. Both write to the RIB under their origin;
104/// the RIB selects the lowest-cost nexthop per face when building FIB entries.
105///
106/// [`origin`]: RoutingProtocol::origin
107pub struct RoutingManager {
108    rib: Arc<Rib>,
109    fib: Arc<Fib>,
110    faces: Arc<FaceTable>,
111    neighbors: Arc<NeighborTable>,
112    handles: DashMap<u64, ProtocolHandle>,
113    /// Parent cancellation token from the engine. Child tokens created from
114    /// this are cancelled automatically when the engine shuts down.
115    engine_cancel: CancellationToken,
116}
117
118impl RoutingManager {
119    pub fn new(
120        rib: Arc<Rib>,
121        fib: Arc<Fib>,
122        faces: Arc<FaceTable>,
123        neighbors: Arc<NeighborTable>,
124        engine_cancel: CancellationToken,
125    ) -> Self {
126        Self {
127            rib,
128            fib,
129            faces,
130            neighbors,
131            handles: DashMap::new(),
132            engine_cancel,
133        }
134    }
135
136    /// Start a routing protocol.
137    ///
138    /// Creates a child cancellation token from the engine token so the protocol
139    /// is automatically stopped when the engine shuts down. If a protocol with
140    /// the same origin is already running, it is stopped and its routes flushed
141    /// before the new one starts.
142    pub fn enable(&self, proto: Arc<dyn RoutingProtocol>) {
143        let origin = proto.origin();
144        if self.handles.contains_key(&origin) {
145            self.stop_and_flush(origin);
146        }
147        let cancel = self.engine_cancel.child_token();
148        let handle = RoutingHandle {
149            rib: Arc::clone(&self.rib),
150            fib: Arc::clone(&self.fib),
151            faces: Arc::clone(&self.faces),
152            neighbors: Arc::clone(&self.neighbors),
153        };
154        let task = proto.start(handle, cancel.clone());
155        self.handles.insert(
156            origin,
157            ProtocolHandle {
158                cancel,
159                _task: task,
160            },
161        );
162        tracing::info!(origin, "routing protocol enabled");
163    }
164
165    /// Stop a routing protocol and flush all routes it registered.
166    ///
167    /// Cancels the protocol's task (the task exits at its next cancellation
168    /// checkpoint) and removes all routes registered under `origin` from the
169    /// RIB. FIB entries for affected prefixes are recomputed from any remaining
170    /// protocols' routes.
171    ///
172    /// Returns `true` if a protocol with that origin was running.
173    pub fn disable(&self, origin: u64) -> bool {
174        if self.handles.contains_key(&origin) {
175            self.stop_and_flush(origin);
176            tracing::info!(origin, "routing protocol disabled");
177            true
178        } else {
179            false
180        }
181    }
182
183    /// Returns the origin values of all currently-running protocols.
184    pub fn running_origins(&self) -> Vec<u64> {
185        self.handles.iter().map(|e| *e.key()).collect()
186    }
187
188    /// Returns the number of currently-running protocols.
189    pub fn running_count(&self) -> usize {
190        self.handles.len()
191    }
192
193    fn stop_and_flush(&self, origin: u64) {
194        if let Some((_, handle)) = self.handles.remove(&origin) {
195            handle.cancel.cancel();
196            // `handle._task` is dropped here. The task continues briefly until
197            // it observes the cancellation, then exits. We proceed immediately
198            // to flush routes so the FIB is updated without delay.
199        }
200        let affected = self.rib.flush_origin(origin);
201        let n = affected.len();
202        for prefix in &affected {
203            self.rib.apply_to_fib(prefix, &self.fib);
204        }
205        if n > 0 {
206            tracing::debug!(origin, prefixes = n, "RIB flushed for origin");
207        }
208    }
209}
210
211impl Drop for RoutingManager {
212    fn drop(&mut self) {
213        for entry in self.handles.iter() {
214            entry.value().cancel.cancel();
215        }
216    }
217}