ndn_discovery/service_discovery/
records.rs

1//! Record storage, publication, lifecycle management, and helper utilities.
2
3use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5use ndn_packet::Name;
6use ndn_transport::FaceId;
7use tracing::{debug, info};
8
9use crate::config::DiscoveryScope;
10use crate::prefix_announce::ServiceRecord;
11
12use super::ServiceDiscoveryProtocol;
13
14/// Entry in the local service record table.
15pub(crate) struct RecordEntry {
16    pub(super) record: ServiceRecord,
17    /// Timestamp used for the Data version component.
18    pub(super) published_at_ms: u64,
19    /// When this record expires (auto-withdrawn).  `None` = no TTL expiry.
20    pub(super) expires_at: Option<Instant>,
21    /// The local face that "owns" this record — typically the face registered
22    /// in the FIB for the announced prefix (i.e. the app's data face).
23    ///
24    /// When this face goes down the record is automatically withdrawn so that
25    /// stale records do not accumulate after an app exits.  `None` means no
26    /// face-based cleanup (permanent or TTL-only).
27    pub(super) owner_face: Option<FaceId>,
28}
29
30/// Rate-limit tracker per producer name.
31pub(crate) struct ProducerRateLimit {
32    /// Count of registrations in the current window.
33    pub(super) count: u32,
34    /// Start of the current window.
35    pub(super) window_start: Instant,
36}
37
38impl ServiceDiscoveryProtocol {
39    /// Publish a service record.
40    ///
41    /// Records are stored locally and served in response to browse Interests.
42    /// Call this whenever the set of served prefixes changes.
43    pub fn publish(&self, record: ServiceRecord) {
44        let ts = current_timestamp_ms();
45        let mut records = self.local_records.lock().unwrap();
46        // Replace existing record for the same (prefix, node) pair.
47        let existing = records.iter().position(|e| {
48            e.record.announced_prefix == record.announced_prefix
49                && e.record.node_name == record.node_name
50        });
51        info!(
52            prefix = %record.announced_prefix,
53            node   = %record.node_name,
54            freshness_ms = record.freshness_ms,
55            "service record published",
56        );
57        let entry = RecordEntry {
58            record,
59            published_at_ms: ts,
60            expires_at: None,
61            owner_face: None,
62        };
63        if let Some(idx) = existing {
64            records[idx] = entry;
65        } else {
66            records.push(entry);
67        }
68    }
69
70    /// Publish a service record with a finite TTL.
71    ///
72    /// The record is automatically withdrawn after `ttl_ms` milliseconds.
73    /// Use this for explicitly time-limited runtime announcements where the
74    /// caller manages their own renewal (e.g. a short-lived reservation).
75    ///
76    /// For app-lifetime tracking prefer [`publish_with_owner`]; for permanent
77    /// config-based records use [`publish`].
78    pub fn publish_with_ttl(&self, record: ServiceRecord, ttl_ms: u64) {
79        let ts = current_timestamp_ms();
80        let expires_at = Instant::now() + Duration::from_millis(ttl_ms);
81        let mut records = self.local_records.lock().unwrap();
82        let existing = records.iter().position(|e| {
83            e.record.announced_prefix == record.announced_prefix
84                && e.record.node_name == record.node_name
85        });
86        info!(
87            prefix       = %record.announced_prefix,
88            node         = %record.node_name,
89            freshness_ms = record.freshness_ms,
90            ttl_ms,
91            "service record published (TTL)",
92        );
93        let entry = RecordEntry {
94            record,
95            published_at_ms: ts,
96            expires_at: Some(expires_at),
97            owner_face: None,
98        };
99        if let Some(idx) = existing {
100            records[idx] = entry;
101        } else {
102            records.push(entry);
103        }
104    }
105
106    /// Publish a service record owned by a specific local face.
107    ///
108    /// The record is automatically withdrawn when `owner_face` goes down.
109    /// Use this for runtime announcements (e.g. `ndn-ctl service announce`)
110    /// so that the record disappears when the app's data face closes, without
111    /// requiring the app to explicitly call withdraw.
112    ///
113    /// Config-based records (from `served_prefixes`) should use the permanent
114    /// [`publish`](Self::publish) instead.
115    pub fn publish_with_owner(&self, record: ServiceRecord, owner_face: FaceId) {
116        let ts = current_timestamp_ms();
117        let mut records = self.local_records.lock().unwrap();
118        let existing = records.iter().position(|e| {
119            e.record.announced_prefix == record.announced_prefix
120                && e.record.node_name == record.node_name
121        });
122        info!(
123            prefix       = %record.announced_prefix,
124            node         = %record.node_name,
125            freshness_ms = record.freshness_ms,
126            owner_face   = ?owner_face,
127            "service record published (owned by face)",
128        );
129        let entry = RecordEntry {
130            record,
131            published_at_ms: ts,
132            expires_at: None,
133            owner_face: Some(owner_face),
134        };
135        if let Some(idx) = existing {
136            records[idx] = entry;
137        } else {
138            records.push(entry);
139        }
140    }
141
142    /// Withdraw a service record.
143    pub fn withdraw(&self, announced_prefix: &Name) {
144        let mut records = self.local_records.lock().unwrap();
145        let before = records.len();
146        records.retain(|e| &e.record.announced_prefix != announced_prefix);
147        if records.len() < before {
148            info!(prefix = %announced_prefix, "service record withdrawn");
149        } else {
150            debug!(prefix = %announced_prefix, "service record withdraw: prefix not found (no-op)");
151        }
152    }
153
154    /// Return a snapshot of locally published service records.
155    pub fn local_records(&self) -> Vec<ServiceRecord> {
156        self.local_records
157            .lock()
158            .unwrap()
159            .iter()
160            .map(|e| e.record.clone())
161            .collect()
162    }
163
164    /// Return a snapshot of all known service records — both local and
165    /// records received from remote peers.
166    ///
167    /// Deduplicated: if the same `(announced_prefix, node_name)` pair appears
168    /// in both tables, the local version takes precedence.
169    pub fn all_records(&self) -> Vec<ServiceRecord> {
170        let local = self.local_records.lock().unwrap();
171        let peers = self.peer_records.lock().unwrap();
172
173        let mut out: Vec<ServiceRecord> = local.iter().map(|e| e.record.clone()).collect();
174        for pr in peers.iter() {
175            let already = out
176                .iter()
177                .any(|r| r.announced_prefix == pr.announced_prefix && r.node_name == pr.node_name);
178            if !already {
179                out.push(pr.clone());
180            }
181        }
182        out
183    }
184
185    // ── Helpers ───────────────────────────────────────────────────────────────
186
187    pub(super) fn is_in_scope(&self, _prefix: &Name) -> bool {
188        match self.config.auto_populate_scope {
189            DiscoveryScope::LinkLocal => {
190                // Accept anything under /ndn/local/ or /ndn/site/ for backwards compat
191                true
192            }
193            DiscoveryScope::Site => true,
194            DiscoveryScope::Global => true,
195        }
196    }
197
198    pub(super) fn check_rate_limit(&self, producer: &Name, now: Instant) -> bool {
199        let key = producer.to_string();
200        let window = self.config.max_registrations_window;
201        let limit = self.config.max_registrations_per_producer;
202
203        let mut limits = self.rate_limits.lock().unwrap();
204        let entry = limits.entry(key).or_insert_with(|| ProducerRateLimit {
205            count: 0,
206            window_start: now,
207        });
208
209        if now.duration_since(entry.window_start) >= window {
210            // New window.
211            entry.count = 1;
212            entry.window_start = now;
213            true
214        } else if entry.count < limit {
215            entry.count += 1;
216            true
217        } else {
218            false
219        }
220    }
221}
222
223pub(super) fn current_timestamp_ms() -> u64 {
224    SystemTime::now()
225        .duration_since(UNIX_EPOCH)
226        .map(|d| d.as_millis() as u64)
227        .unwrap_or(0)
228}