ndn_discovery/service_discovery/
records.rs1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5use ndn_packet::Name;
6use ndn_transport::FaceId;
7use tracing::{debug, info};
8
9use crate::config::DiscoveryScope;
10use crate::prefix_announce::ServiceRecord;
11
12use super::ServiceDiscoveryProtocol;
13
14pub(crate) struct RecordEntry {
16 pub(super) record: ServiceRecord,
17 pub(super) published_at_ms: u64,
19 pub(super) expires_at: Option<Instant>,
21 pub(super) owner_face: Option<FaceId>,
28}
29
30pub(crate) struct ProducerRateLimit {
32 pub(super) count: u32,
34 pub(super) window_start: Instant,
36}
37
38impl ServiceDiscoveryProtocol {
39 pub fn publish(&self, record: ServiceRecord) {
44 let ts = current_timestamp_ms();
45 let mut records = self.local_records.lock().unwrap();
46 let existing = records.iter().position(|e| {
48 e.record.announced_prefix == record.announced_prefix
49 && e.record.node_name == record.node_name
50 });
51 info!(
52 prefix = %record.announced_prefix,
53 node = %record.node_name,
54 freshness_ms = record.freshness_ms,
55 "service record published",
56 );
57 let entry = RecordEntry {
58 record,
59 published_at_ms: ts,
60 expires_at: None,
61 owner_face: None,
62 };
63 if let Some(idx) = existing {
64 records[idx] = entry;
65 } else {
66 records.push(entry);
67 }
68 }
69
70 pub fn publish_with_ttl(&self, record: ServiceRecord, ttl_ms: u64) {
79 let ts = current_timestamp_ms();
80 let expires_at = Instant::now() + Duration::from_millis(ttl_ms);
81 let mut records = self.local_records.lock().unwrap();
82 let existing = records.iter().position(|e| {
83 e.record.announced_prefix == record.announced_prefix
84 && e.record.node_name == record.node_name
85 });
86 info!(
87 prefix = %record.announced_prefix,
88 node = %record.node_name,
89 freshness_ms = record.freshness_ms,
90 ttl_ms,
91 "service record published (TTL)",
92 );
93 let entry = RecordEntry {
94 record,
95 published_at_ms: ts,
96 expires_at: Some(expires_at),
97 owner_face: None,
98 };
99 if let Some(idx) = existing {
100 records[idx] = entry;
101 } else {
102 records.push(entry);
103 }
104 }
105
106 pub fn publish_with_owner(&self, record: ServiceRecord, owner_face: FaceId) {
116 let ts = current_timestamp_ms();
117 let mut records = self.local_records.lock().unwrap();
118 let existing = records.iter().position(|e| {
119 e.record.announced_prefix == record.announced_prefix
120 && e.record.node_name == record.node_name
121 });
122 info!(
123 prefix = %record.announced_prefix,
124 node = %record.node_name,
125 freshness_ms = record.freshness_ms,
126 owner_face = ?owner_face,
127 "service record published (owned by face)",
128 );
129 let entry = RecordEntry {
130 record,
131 published_at_ms: ts,
132 expires_at: None,
133 owner_face: Some(owner_face),
134 };
135 if let Some(idx) = existing {
136 records[idx] = entry;
137 } else {
138 records.push(entry);
139 }
140 }
141
142 pub fn withdraw(&self, announced_prefix: &Name) {
144 let mut records = self.local_records.lock().unwrap();
145 let before = records.len();
146 records.retain(|e| &e.record.announced_prefix != announced_prefix);
147 if records.len() < before {
148 info!(prefix = %announced_prefix, "service record withdrawn");
149 } else {
150 debug!(prefix = %announced_prefix, "service record withdraw: prefix not found (no-op)");
151 }
152 }
153
154 pub fn local_records(&self) -> Vec<ServiceRecord> {
156 self.local_records
157 .lock()
158 .unwrap()
159 .iter()
160 .map(|e| e.record.clone())
161 .collect()
162 }
163
164 pub fn all_records(&self) -> Vec<ServiceRecord> {
170 let local = self.local_records.lock().unwrap();
171 let peers = self.peer_records.lock().unwrap();
172
173 let mut out: Vec<ServiceRecord> = local.iter().map(|e| e.record.clone()).collect();
174 for pr in peers.iter() {
175 let already = out
176 .iter()
177 .any(|r| r.announced_prefix == pr.announced_prefix && r.node_name == pr.node_name);
178 if !already {
179 out.push(pr.clone());
180 }
181 }
182 out
183 }
184
185 pub(super) fn is_in_scope(&self, _prefix: &Name) -> bool {
188 match self.config.auto_populate_scope {
189 DiscoveryScope::LinkLocal => {
190 true
192 }
193 DiscoveryScope::Site => true,
194 DiscoveryScope::Global => true,
195 }
196 }
197
198 pub(super) fn check_rate_limit(&self, producer: &Name, now: Instant) -> bool {
199 let key = producer.to_string();
200 let window = self.config.max_registrations_window;
201 let limit = self.config.max_registrations_per_producer;
202
203 let mut limits = self.rate_limits.lock().unwrap();
204 let entry = limits.entry(key).or_insert_with(|| ProducerRateLimit {
205 count: 0,
206 window_start: now,
207 });
208
209 if now.duration_since(entry.window_start) >= window {
210 entry.count = 1;
212 entry.window_start = now;
213 true
214 } else if entry.count < limit {
215 entry.count += 1;
216 true
217 } else {
218 false
219 }
220 }
221}
222
223pub(super) fn current_timestamp_ms() -> u64 {
224 SystemTime::now()
225 .duration_since(UNIX_EPOCH)
226 .map(|d| d.as_millis() as u64)
227 .unwrap_or(0)
228}