1use std::sync::Arc;
2use std::time::Duration;
3
4use dashmap::DashMap;
5use ndn_discovery::DiscoveryProtocol;
6use tokio_util::sync::CancellationToken;
7
8use ndn_store::Pit;
9use ndn_transport::{FaceId, FaceKind, FacePersistency, FaceTable};
10
11use crate::Fib;
12use crate::discovery_context::EngineDiscoveryContext;
13use crate::engine::FaceState;
14use crate::rib::Rib;
15
16pub async fn run_expiry_task(pit: Arc<Pit>, cancel: CancellationToken) {
20 let interval = Duration::from_millis(1);
21 loop {
22 tokio::select! {
23 _ = cancel.cancelled() => break,
24 _ = tokio::time::sleep(interval) => {
25 let now = now_ns();
26 let expired = pit.drain_expired(now);
27 if !expired.is_empty() {
28 tracing::trace!(count = expired.len(), "PIT entries expired");
29 }
30 }
31 }
32 }
33}
34
35pub async fn run_rib_expiry_task(rib: Arc<Rib>, fib: Arc<Fib>, cancel: CancellationToken) {
42 let interval = Duration::from_secs(1);
43 loop {
44 tokio::select! {
45 _ = cancel.cancelled() => break,
46 _ = tokio::time::sleep(interval) => {
47 let affected = rib.drain_expired();
48 if !affected.is_empty() {
49 tracing::debug!(count = affected.len(), "RIB entries expired");
50 for prefix in &affected {
51 rib.apply_to_fib(prefix, &fib);
52 }
53 }
54 }
55 }
56 }
57}
58
59const IDLE_TIMEOUT_NS: u64 = 5 * 60 * 1_000_000_000;
61
62const IDLE_SWEEP_INTERVAL: Duration = Duration::from_secs(30);
64
65pub async fn run_idle_face_task(
70 face_states: Arc<DashMap<FaceId, FaceState>>,
71 face_table: Arc<FaceTable>,
72 fib: Arc<Fib>,
73 rib: Arc<Rib>,
74 cancel: CancellationToken,
75 discovery: Arc<dyn DiscoveryProtocol>,
76 discovery_ctx: Arc<EngineDiscoveryContext>,
77) {
78 loop {
79 tokio::select! {
80 _ = cancel.cancelled() => break,
81 _ = tokio::time::sleep(IDLE_SWEEP_INTERVAL) => {
82 let now = now_ns();
83 let mut expired = Vec::new();
84
85 for entry in face_states.iter() {
86 if entry.persistency != FacePersistency::OnDemand {
87 continue;
88 }
89 let face_id = *entry.key();
99 if let Some(face) = face_table.get(face_id)
100 && matches!(
101 face.kind(),
102 FaceKind::App
103 | FaceKind::Shm
104 | FaceKind::Internal
105 | FaceKind::Unix
106 | FaceKind::Tcp
107 | FaceKind::WebSocket
108 | FaceKind::Management,
109 )
110 {
111 continue;
112 }
113 let last = entry.last_activity.load(std::sync::atomic::Ordering::Relaxed);
114 if now.saturating_sub(last) > IDLE_TIMEOUT_NS {
115 expired.push(face_id);
116 }
117 }
118
119 for face_id in expired {
120 discovery.on_face_down(face_id, &*discovery_ctx);
121 if let Some((_, state)) = face_states.remove(&face_id) {
122 state.cancel.cancel();
123 }
124 rib.handle_face_down(face_id, &fib);
127 fib.remove_face(face_id);
128 face_table.remove(face_id);
129 tracing::debug!(face=%face_id, "idle on-demand face removed");
130 }
131 }
132 }
133 }
134}
135
136fn now_ns() -> u64 {
137 use std::time::{SystemTime, UNIX_EPOCH};
138 SystemTime::now()
139 .duration_since(UNIX_EPOCH)
140 .map(|d| d.as_nanos() as u64)
141 .unwrap_or(0)
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[tokio::test]
149 async fn expiry_task_cancels_promptly() {
150 let pit = Arc::new(Pit::new());
151 let cancel = CancellationToken::new();
152 let task = tokio::spawn(run_expiry_task(pit, cancel.clone()));
153 cancel.cancel();
154 tokio::time::timeout(Duration::from_millis(200), task)
155 .await
156 .expect("expiry task did not stop after cancellation")
157 .expect("task panicked");
158 }
159
160 #[tokio::test]
161 async fn expiry_task_runs_without_panic() {
162 let pit = Arc::new(Pit::new());
163 let cancel = CancellationToken::new();
164 let task = tokio::spawn(run_expiry_task(pit, cancel.clone()));
165 tokio::time::sleep(Duration::from_millis(5)).await;
167 cancel.cancel();
168 tokio::time::timeout(Duration::from_millis(200), task)
169 .await
170 .expect("expiry task did not stop after cancellation")
171 .expect("task panicked");
172 }
173}