ndn_app/consumer.rs
1use std::path::Path;
2use std::time::Duration;
3
4use bytes::Bytes;
5
6use ndn_faces::local::InProcHandle;
7use ndn_ipc::ForwarderClient;
8use ndn_packet::encode::InterestBuilder;
9use ndn_packet::lp::{LpPacket, is_lp_packet};
10use ndn_packet::{Data, Name};
11use ndn_security::{SafeData, ValidationResult, Validator};
12
13use crate::AppError;
14use crate::connection::NdnConnection;
15
16/// Default Interest lifetime: 4 seconds.
17pub const DEFAULT_INTEREST_LIFETIME: Duration = Duration::from_millis(4000);
18
19/// Default local timeout for waiting on a response.
20///
21/// This is the local safety-net timeout independent of the Interest lifetime
22/// sent on the wire. Set slightly longer than the default Interest lifetime
23/// to account for forwarding and processing delays.
24pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(4500);
25
26/// High-level NDN consumer — fetches Data by name.
27pub struct Consumer {
28 conn: NdnConnection,
29}
30
31impl Consumer {
32 /// Connect to an external router via its face socket.
33 pub async fn connect(socket: impl AsRef<Path>) -> Result<Self, AppError> {
34 let client = ForwarderClient::connect(socket)
35 .await
36 .map_err(AppError::Connection)?;
37 Ok(Self {
38 conn: NdnConnection::External(client),
39 })
40 }
41
42 /// Create from an in-process InProcHandle (embedded engine).
43 pub fn from_handle(handle: InProcHandle) -> Self {
44 Self {
45 conn: NdnConnection::Embedded(handle),
46 }
47 }
48
49 /// Express an Interest by name and return the decoded Data.
50 ///
51 /// Uses [`DEFAULT_INTEREST_LIFETIME`] for the wire Interest and
52 /// [`DEFAULT_TIMEOUT`] for the local wait. To set hop limit,
53 /// application parameters, or forwarding hints, use
54 /// [`fetch_with`](Self::fetch_with).
55 pub async fn fetch(&mut self, name: impl Into<Name>) -> Result<Data, AppError> {
56 let wire = InterestBuilder::new(name)
57 .lifetime(DEFAULT_INTEREST_LIFETIME)
58 .build();
59 self.fetch_wire(wire, DEFAULT_TIMEOUT).await
60 }
61
62 /// Express an Interest built with [`InterestBuilder`] and return the decoded Data.
63 ///
64 /// The local wait timeout is derived from the builder's Interest lifetime
65 /// (+ 500 ms forwarding buffer). This is the right method when you need
66 /// hop limit, forwarding hints, or application parameters:
67 ///
68 /// ```no_run
69 /// # async fn example(mut consumer: ndn_app::Consumer) -> anyhow::Result<()> {
70 /// use ndn_packet::encode::InterestBuilder;
71 ///
72 /// // Hop limit: limit forwarding to 4 hops.
73 /// let data = consumer.fetch_with(
74 /// InterestBuilder::new("/ndn/remote/data").hop_limit(4)
75 /// ).await?;
76 ///
77 /// // Forwarding hint: reach a producer via a delegation prefix.
78 /// let data = consumer.fetch_with(
79 /// InterestBuilder::new("/alice/files/photo.jpg")
80 /// .forwarding_hint(vec!["/campus/ndn-hub".parse()?])
81 /// ).await?;
82 ///
83 /// // Application parameters: parameterised fetch (e.g. RPC / query).
84 /// let data = consumer.fetch_with(
85 /// InterestBuilder::new("/service/query")
86 /// .app_parameters(b"filter=recent&limit=10")
87 /// ).await?;
88 /// # Ok(())
89 /// # }
90 /// ```
91 pub async fn fetch_with(&mut self, builder: InterestBuilder) -> Result<Data, AppError> {
92 let (wire, timeout) = builder.build_with_timeout();
93 self.fetch_wire(wire, timeout).await
94 }
95
96 /// Express a pre-encoded Interest and return the decoded Data.
97 ///
98 /// `timeout` is the local wait duration — set this to at least the
99 /// Interest lifetime encoded in `wire` to avoid timing out before the
100 /// forwarder does.
101 ///
102 /// Returns [`AppError::Nacked`] if the forwarder responds with a Nack
103 /// (e.g. no route to the name prefix).
104 pub async fn fetch_wire(&mut self, wire: Bytes, timeout: Duration) -> Result<Data, AppError> {
105 self.conn.send(wire).await?;
106
107 let reply = tokio::time::timeout(timeout, self.conn.recv())
108 .await
109 .map_err(|_| AppError::Timeout)?
110 .ok_or(AppError::Closed)?;
111
112 // Check for Nack (LpPacket with Nack header).
113 if is_lp_packet(&reply)
114 && let Ok(lp) = LpPacket::decode(reply.clone())
115 {
116 if let Some(reason) = lp.nack {
117 return Err(AppError::Nacked { reason });
118 }
119 // LpPacket without Nack — decode the fragment as Data.
120 if let Some(fragment) = lp.fragment {
121 return Data::decode(fragment).map_err(|e| AppError::Protocol(e.to_string()));
122 }
123 }
124
125 Data::decode(reply).map_err(|e| AppError::Protocol(e.to_string()))
126 }
127
128 /// Fetch and verify against a `Validator`. Returns `SafeData` on success.
129 pub async fn fetch_verified(
130 &mut self,
131 name: impl Into<Name>,
132 validator: &Validator,
133 ) -> Result<SafeData, AppError> {
134 let data = self.fetch(name).await?;
135 match validator.validate(&data).await {
136 ValidationResult::Valid(safe) => Ok(*safe),
137 ValidationResult::Invalid(e) => Err(AppError::Protocol(e.to_string())),
138 ValidationResult::Pending => {
139 Err(AppError::Protocol("certificate chain not resolved".into()))
140 }
141 }
142 }
143
144 /// Convenience: fetch content as raw bytes.
145 pub async fn get(&mut self, name: impl Into<Name>) -> Result<Bytes, AppError> {
146 let data = self.fetch(name).await?;
147 data.content()
148 .map(|b| Bytes::copy_from_slice(b))
149 .ok_or_else(|| AppError::Protocol("Data has no content".into()))
150 }
151
152 /// Fetch multiple names sequentially and collect results.
153 ///
154 /// Each Interest is expressed in order; the result vector preserves the
155 /// input order regardless of which fetches succeed or fail.
156 ///
157 /// # Note
158 ///
159 /// Fetches are sequential because a single [`NdnConnection`] cannot
160 /// correlate concurrent Interests to their responses without PIT tokens.
161 /// For true concurrent fetch, create multiple `Consumer` instances and
162 /// use `tokio::join!`.
163 pub async fn fetch_all(&mut self, names: &[Name]) -> Vec<Result<Data, AppError>> {
164 let mut results = Vec::with_capacity(names.len());
165 for name in names {
166 results.push(self.fetch(name.clone()).await);
167 }
168 results
169 }
170
171 /// Fetch with exponential-backoff retry.
172 ///
173 /// On timeout or connection error, waits `base_delay`, then `2×base_delay`,
174 /// etc., up to `max_attempts` total tries (including the first). Returns the
175 /// last error if all attempts are exhausted.
176 pub async fn fetch_with_retry(
177 &mut self,
178 name: impl Into<Name>,
179 max_attempts: u32,
180 base_delay: std::time::Duration,
181 ) -> Result<Data, AppError> {
182 let name = name.into();
183 let mut delay = base_delay;
184 let attempts = max_attempts.max(1);
185 let mut last_err = AppError::Timeout;
186 for attempt in 0..attempts {
187 match self.fetch(name.clone()).await {
188 Ok(data) => return Ok(data),
189 Err(e) => {
190 last_err = e;
191 if attempt + 1 < attempts {
192 tokio::time::sleep(delay).await;
193 delay *= 2;
194 }
195 }
196 }
197 }
198 Err(last_err)
199 }
200
201 /// Fetch a segmented object produced with [`Producer::publish_large`].
202 ///
203 /// Fetches `/prefix/0`, reads `FinalBlockId` to determine the total segment
204 /// count, then fetches all remaining segments in order. Segments are
205 /// reassembled into a single contiguous buffer.
206 ///
207 /// Segment names are generic NameComponents with ASCII-decimal indices
208 /// (e.g. `/prefix/0`, `/prefix/1`, ...), matching the convention used by
209 /// [`Producer::publish_large`].
210 pub async fn fetch_segmented(&mut self, prefix: impl Into<Name>) -> Result<Bytes, AppError> {
211 let prefix = prefix.into();
212
213 // Fetch segment 0 to discover FinalBlockId.
214 let seg0_name = prefix.clone().append("0");
215 let seg0 = self.fetch(seg0_name).await?;
216
217 let last_seg = parse_final_block_id_seg(&seg0).unwrap_or(0);
218
219 let seg0_content = seg0
220 .content()
221 .map(|b| Bytes::copy_from_slice(b))
222 .unwrap_or_default();
223
224 if last_seg == 0 {
225 return Ok(seg0_content);
226 }
227
228 // Fetch remaining segments sequentially.
229 let mut chunks: Vec<Bytes> = Vec::with_capacity(last_seg + 1);
230 chunks.push(seg0_content);
231 for i in 1..=last_seg {
232 let name = prefix.clone().append(i.to_string());
233 let data = self.fetch(name).await?;
234 chunks.push(
235 data.content()
236 .map(|b| Bytes::copy_from_slice(b))
237 .unwrap_or_default(),
238 );
239 }
240
241 let total: usize = chunks.iter().map(|c| c.len()).sum();
242 let mut out = bytes::BytesMut::with_capacity(total);
243 for chunk in chunks {
244 out.extend_from_slice(&chunk);
245 }
246 Ok(out.freeze())
247 }
248
249 /// Fetch and verify against a `Validator`. Returns `SafeData` on success.
250 ///
251 /// This is a convenience wrapper around [`fetch`](Self::fetch) +
252 /// [`Validator::validate_chain`](ndn_security::Validator).
253 pub async fn get_verified(
254 &mut self,
255 name: impl Into<Name>,
256 validator: &ndn_security::Validator,
257 ) -> Result<ndn_security::SafeData, AppError> {
258 self.fetch_verified(name, validator).await
259 }
260}
261
262/// Parse the segment index from a Data packet's FinalBlockId field.
263///
264/// FinalBlockId encodes a NameComponent TLV. For the convention used by
265/// [`Producer::publish_large`], the value is an ASCII-decimal string
266/// inside a generic NameComponent (TLV type 0x08).
267/// Returns `None` if the field is absent or cannot be parsed.
268fn parse_final_block_id_seg(data: &Data) -> Option<usize> {
269 let meta = data.meta_info()?;
270 let fbi = meta.final_block_id.as_ref()?;
271
272 // FinalBlockId bytes = one NameComponent TLV: type(1-3B) + len(1-3B) + value.
273 // Skip the type byte(s) and length byte(s) to reach the value bytes.
274 // For short components (< 253 bytes), both type and length fit in one byte each.
275 // type 0x08 (< 0xFD) = 1 byte; length < 253 = 1 byte.
276 if fbi.len() < 2 {
277 return None;
278 }
279 // First byte is the TLV type; second is the length (for short components).
280 let len = fbi[1] as usize;
281 let value_start = 2;
282 if fbi.len() < value_start + len {
283 return None;
284 }
285 let value = &fbi[value_start..value_start + len];
286 std::str::from_utf8(value).ok()?.parse::<usize>().ok()
287}