ndn_engine/
expiry.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use dashmap::DashMap;
5use ndn_discovery::DiscoveryProtocol;
6use tokio_util::sync::CancellationToken;
7
8use ndn_store::Pit;
9use ndn_transport::{FaceId, FaceKind, FacePersistency, FaceTable};
10
11use crate::Fib;
12use crate::discovery_context::EngineDiscoveryContext;
13use crate::engine::FaceState;
14use crate::rib::Rib;
15
16/// Background task that drains expired PIT entries every millisecond.
17///
18/// Runs until the cancellation token is cancelled.
19pub async fn run_expiry_task(pit: Arc<Pit>, cancel: CancellationToken) {
20    let interval = Duration::from_millis(1);
21    loop {
22        tokio::select! {
23            _ = cancel.cancelled() => break,
24            _ = tokio::time::sleep(interval) => {
25                let now = now_ns();
26                let expired = pit.drain_expired(now);
27                if !expired.is_empty() {
28                    tracing::trace!(count = expired.len(), "PIT entries expired");
29                }
30            }
31        }
32    }
33}
34
35/// Background task that drains expired RIB entries and recomputes affected FIB
36/// entries.
37///
38/// Runs every second. Route lifetimes are registered by routing protocols and
39/// apps via `expiration_period` in RIB register commands; this task purges them
40/// when they lapse and keeps the FIB converged.
41pub async fn run_rib_expiry_task(rib: Arc<Rib>, fib: Arc<Fib>, cancel: CancellationToken) {
42    let interval = Duration::from_secs(1);
43    loop {
44        tokio::select! {
45            _ = cancel.cancelled() => break,
46            _ = tokio::time::sleep(interval) => {
47                let affected = rib.drain_expired();
48                if !affected.is_empty() {
49                    tracing::debug!(count = affected.len(), "RIB entries expired");
50                    for prefix in &affected {
51                        rib.apply_to_fib(prefix, &fib);
52                    }
53                }
54            }
55        }
56    }
57}
58
59/// Default idle timeout for on-demand faces (5 minutes).
60const IDLE_TIMEOUT_NS: u64 = 5 * 60 * 1_000_000_000;
61
62/// Sweep interval for idle face detection.
63const IDLE_SWEEP_INTERVAL: Duration = Duration::from_secs(30);
64
65/// Background task that removes on-demand faces that have been idle for too
66/// long (no packets sent or received within `IDLE_TIMEOUT_NS`).
67///
68/// Runs every 30 seconds until the cancellation token is cancelled.
69pub async fn run_idle_face_task(
70    face_states: Arc<DashMap<FaceId, FaceState>>,
71    face_table: Arc<FaceTable>,
72    fib: Arc<Fib>,
73    rib: Arc<Rib>,
74    cancel: CancellationToken,
75    discovery: Arc<dyn DiscoveryProtocol>,
76    discovery_ctx: Arc<EngineDiscoveryContext>,
77) {
78    loop {
79        tokio::select! {
80            _ = cancel.cancelled() => break,
81            _ = tokio::time::sleep(IDLE_SWEEP_INTERVAL) => {
82                let now = now_ns();
83                let mut expired = Vec::new();
84
85                for entry in face_states.iter() {
86                    if entry.persistency != FacePersistency::OnDemand {
87                        continue;
88                    }
89                    // Local faces (App, SHM, Internal) use cancel-token lifecycle,
90                    // not idle timeout.  Their last_activity is never updated in
91                    // run_face_reader, so they would be falsely reaped here.
92                    //
93                    // Connection-oriented faces (Unix, Tcp, WebSocket, Management)
94                    // are also excluded: when the remote end closes the socket the
95                    // face reader exits and the OnDemand cleanup path in inbound.rs
96                    // removes the face immediately.  Idle timeout is only meaningful
97                    // for connectionless (UDP) faces where no disconnect signal exists.
98                    let face_id = *entry.key();
99                    if let Some(face) = face_table.get(face_id)
100                        && matches!(
101                            face.kind(),
102                            FaceKind::App
103                                | FaceKind::Shm
104                                | FaceKind::Internal
105                                | FaceKind::Unix
106                                | FaceKind::Tcp
107                                | FaceKind::WebSocket
108                                | FaceKind::Management,
109                        )
110                    {
111                        continue;
112                    }
113                    let last = entry.last_activity.load(std::sync::atomic::Ordering::Relaxed);
114                    if now.saturating_sub(last) > IDLE_TIMEOUT_NS {
115                        expired.push(face_id);
116                    }
117                }
118
119                for face_id in expired {
120                    discovery.on_face_down(face_id, &*discovery_ctx);
121                    if let Some((_, state)) = face_states.remove(&face_id) {
122                        state.cancel.cancel();
123                    }
124                    // Flush RIB routes before FIB so apply_to_fib can recompute
125                    // from remaining routes. fib.remove_face handles discovery routes.
126                    rib.handle_face_down(face_id, &fib);
127                    fib.remove_face(face_id);
128                    face_table.remove(face_id);
129                    tracing::debug!(face=%face_id, "idle on-demand face removed");
130                }
131            }
132        }
133    }
134}
135
136fn now_ns() -> u64 {
137    use std::time::{SystemTime, UNIX_EPOCH};
138    SystemTime::now()
139        .duration_since(UNIX_EPOCH)
140        .map(|d| d.as_nanos() as u64)
141        .unwrap_or(0)
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[tokio::test]
149    async fn expiry_task_cancels_promptly() {
150        let pit = Arc::new(Pit::new());
151        let cancel = CancellationToken::new();
152        let task = tokio::spawn(run_expiry_task(pit, cancel.clone()));
153        cancel.cancel();
154        tokio::time::timeout(Duration::from_millis(200), task)
155            .await
156            .expect("expiry task did not stop after cancellation")
157            .expect("task panicked");
158    }
159
160    #[tokio::test]
161    async fn expiry_task_runs_without_panic() {
162        let pit = Arc::new(Pit::new());
163        let cancel = CancellationToken::new();
164        let task = tokio::spawn(run_expiry_task(pit, cancel.clone()));
165        // Let a few ticks pass to ensure the loop body executes at least once.
166        tokio::time::sleep(Duration::from_millis(5)).await;
167        cancel.cancel();
168        tokio::time::timeout(Duration::from_millis(200), task)
169            .await
170            .expect("expiry task did not stop after cancellation")
171            .expect("task panicked");
172    }
173}