ndn_app/
consumer.rs

1use std::path::Path;
2use std::time::Duration;
3
4use bytes::Bytes;
5
6use ndn_faces::local::InProcHandle;
7use ndn_ipc::ForwarderClient;
8use ndn_packet::encode::InterestBuilder;
9use ndn_packet::lp::{LpPacket, is_lp_packet};
10use ndn_packet::{Data, Name};
11use ndn_security::{SafeData, ValidationResult, Validator};
12
13use crate::AppError;
14use crate::connection::NdnConnection;
15
16/// Default Interest lifetime: 4 seconds.
17pub const DEFAULT_INTEREST_LIFETIME: Duration = Duration::from_millis(4000);
18
19/// Default local timeout for waiting on a response.
20///
21/// This is the local safety-net timeout independent of the Interest lifetime
22/// sent on the wire. Set slightly longer than the default Interest lifetime
23/// to account for forwarding and processing delays.
24pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(4500);
25
26/// High-level NDN consumer — fetches Data by name.
27pub struct Consumer {
28    conn: NdnConnection,
29}
30
31impl Consumer {
32    /// Connect to an external router via its face socket.
33    pub async fn connect(socket: impl AsRef<Path>) -> Result<Self, AppError> {
34        let client = ForwarderClient::connect(socket)
35            .await
36            .map_err(AppError::Connection)?;
37        Ok(Self {
38            conn: NdnConnection::External(client),
39        })
40    }
41
42    /// Create from an in-process InProcHandle (embedded engine).
43    pub fn from_handle(handle: InProcHandle) -> Self {
44        Self {
45            conn: NdnConnection::Embedded(handle),
46        }
47    }
48
49    /// Express an Interest by name and return the decoded Data.
50    ///
51    /// Uses [`DEFAULT_INTEREST_LIFETIME`] for the wire Interest and
52    /// [`DEFAULT_TIMEOUT`] for the local wait. To set hop limit,
53    /// application parameters, or forwarding hints, use
54    /// [`fetch_with`](Self::fetch_with).
55    pub async fn fetch(&mut self, name: impl Into<Name>) -> Result<Data, AppError> {
56        let wire = InterestBuilder::new(name)
57            .lifetime(DEFAULT_INTEREST_LIFETIME)
58            .build();
59        self.fetch_wire(wire, DEFAULT_TIMEOUT).await
60    }
61
62    /// Express an Interest built with [`InterestBuilder`] and return the decoded Data.
63    ///
64    /// The local wait timeout is derived from the builder's Interest lifetime
65    /// (+ 500 ms forwarding buffer). This is the right method when you need
66    /// hop limit, forwarding hints, or application parameters:
67    ///
68    /// ```no_run
69    /// # async fn example(mut consumer: ndn_app::Consumer) -> anyhow::Result<()> {
70    /// use ndn_packet::encode::InterestBuilder;
71    ///
72    /// // Hop limit: limit forwarding to 4 hops.
73    /// let data = consumer.fetch_with(
74    ///     InterestBuilder::new("/ndn/remote/data").hop_limit(4)
75    /// ).await?;
76    ///
77    /// // Forwarding hint: reach a producer via a delegation prefix.
78    /// let data = consumer.fetch_with(
79    ///     InterestBuilder::new("/alice/files/photo.jpg")
80    ///         .forwarding_hint(vec!["/campus/ndn-hub".parse()?])
81    /// ).await?;
82    ///
83    /// // Application parameters: parameterised fetch (e.g. RPC / query).
84    /// let data = consumer.fetch_with(
85    ///     InterestBuilder::new("/service/query")
86    ///         .app_parameters(b"filter=recent&limit=10")
87    /// ).await?;
88    /// # Ok(())
89    /// # }
90    /// ```
91    pub async fn fetch_with(&mut self, builder: InterestBuilder) -> Result<Data, AppError> {
92        let (wire, timeout) = builder.build_with_timeout();
93        self.fetch_wire(wire, timeout).await
94    }
95
96    /// Express a pre-encoded Interest and return the decoded Data.
97    ///
98    /// `timeout` is the local wait duration — set this to at least the
99    /// Interest lifetime encoded in `wire` to avoid timing out before the
100    /// forwarder does.
101    ///
102    /// Returns [`AppError::Nacked`] if the forwarder responds with a Nack
103    /// (e.g. no route to the name prefix).
104    pub async fn fetch_wire(&mut self, wire: Bytes, timeout: Duration) -> Result<Data, AppError> {
105        self.conn.send(wire).await?;
106
107        let reply = tokio::time::timeout(timeout, self.conn.recv())
108            .await
109            .map_err(|_| AppError::Timeout)?
110            .ok_or(AppError::Closed)?;
111
112        // Check for Nack (LpPacket with Nack header).
113        if is_lp_packet(&reply)
114            && let Ok(lp) = LpPacket::decode(reply.clone())
115        {
116            if let Some(reason) = lp.nack {
117                return Err(AppError::Nacked { reason });
118            }
119            // LpPacket without Nack — decode the fragment as Data.
120            if let Some(fragment) = lp.fragment {
121                return Data::decode(fragment).map_err(|e| AppError::Protocol(e.to_string()));
122            }
123        }
124
125        Data::decode(reply).map_err(|e| AppError::Protocol(e.to_string()))
126    }
127
128    /// Fetch and verify against a `Validator`. Returns `SafeData` on success.
129    pub async fn fetch_verified(
130        &mut self,
131        name: impl Into<Name>,
132        validator: &Validator,
133    ) -> Result<SafeData, AppError> {
134        let data = self.fetch(name).await?;
135        match validator.validate(&data).await {
136            ValidationResult::Valid(safe) => Ok(*safe),
137            ValidationResult::Invalid(e) => Err(AppError::Protocol(e.to_string())),
138            ValidationResult::Pending => {
139                Err(AppError::Protocol("certificate chain not resolved".into()))
140            }
141        }
142    }
143
144    /// Convenience: fetch content as raw bytes.
145    pub async fn get(&mut self, name: impl Into<Name>) -> Result<Bytes, AppError> {
146        let data = self.fetch(name).await?;
147        data.content()
148            .map(|b| Bytes::copy_from_slice(b))
149            .ok_or_else(|| AppError::Protocol("Data has no content".into()))
150    }
151
152    /// Fetch multiple names sequentially and collect results.
153    ///
154    /// Each Interest is expressed in order; the result vector preserves the
155    /// input order regardless of which fetches succeed or fail.
156    ///
157    /// # Note
158    ///
159    /// Fetches are sequential because a single [`NdnConnection`] cannot
160    /// correlate concurrent Interests to their responses without PIT tokens.
161    /// For true concurrent fetch, create multiple `Consumer` instances and
162    /// use `tokio::join!`.
163    pub async fn fetch_all(&mut self, names: &[Name]) -> Vec<Result<Data, AppError>> {
164        let mut results = Vec::with_capacity(names.len());
165        for name in names {
166            results.push(self.fetch(name.clone()).await);
167        }
168        results
169    }
170
171    /// Fetch with exponential-backoff retry.
172    ///
173    /// On timeout or connection error, waits `base_delay`, then `2×base_delay`,
174    /// etc., up to `max_attempts` total tries (including the first). Returns the
175    /// last error if all attempts are exhausted.
176    pub async fn fetch_with_retry(
177        &mut self,
178        name: impl Into<Name>,
179        max_attempts: u32,
180        base_delay: std::time::Duration,
181    ) -> Result<Data, AppError> {
182        let name = name.into();
183        let mut delay = base_delay;
184        let attempts = max_attempts.max(1);
185        let mut last_err = AppError::Timeout;
186        for attempt in 0..attempts {
187            match self.fetch(name.clone()).await {
188                Ok(data) => return Ok(data),
189                Err(e) => {
190                    last_err = e;
191                    if attempt + 1 < attempts {
192                        tokio::time::sleep(delay).await;
193                        delay *= 2;
194                    }
195                }
196            }
197        }
198        Err(last_err)
199    }
200
201    /// Fetch a segmented object produced with [`Producer::publish_large`].
202    ///
203    /// Fetches `/prefix/0`, reads `FinalBlockId` to determine the total segment
204    /// count, then fetches all remaining segments in order. Segments are
205    /// reassembled into a single contiguous buffer.
206    ///
207    /// Segment names are generic NameComponents with ASCII-decimal indices
208    /// (e.g. `/prefix/0`, `/prefix/1`, ...), matching the convention used by
209    /// [`Producer::publish_large`].
210    pub async fn fetch_segmented(&mut self, prefix: impl Into<Name>) -> Result<Bytes, AppError> {
211        let prefix = prefix.into();
212
213        // Fetch segment 0 to discover FinalBlockId.
214        let seg0_name = prefix.clone().append("0");
215        let seg0 = self.fetch(seg0_name).await?;
216
217        let last_seg = parse_final_block_id_seg(&seg0).unwrap_or(0);
218
219        let seg0_content = seg0
220            .content()
221            .map(|b| Bytes::copy_from_slice(b))
222            .unwrap_or_default();
223
224        if last_seg == 0 {
225            return Ok(seg0_content);
226        }
227
228        // Fetch remaining segments sequentially.
229        let mut chunks: Vec<Bytes> = Vec::with_capacity(last_seg + 1);
230        chunks.push(seg0_content);
231        for i in 1..=last_seg {
232            let name = prefix.clone().append(i.to_string());
233            let data = self.fetch(name).await?;
234            chunks.push(
235                data.content()
236                    .map(|b| Bytes::copy_from_slice(b))
237                    .unwrap_or_default(),
238            );
239        }
240
241        let total: usize = chunks.iter().map(|c| c.len()).sum();
242        let mut out = bytes::BytesMut::with_capacity(total);
243        for chunk in chunks {
244            out.extend_from_slice(&chunk);
245        }
246        Ok(out.freeze())
247    }
248
249    /// Fetch and verify against a `Validator`. Returns `SafeData` on success.
250    ///
251    /// This is a convenience wrapper around [`fetch`](Self::fetch) +
252    /// [`Validator::validate_chain`](ndn_security::Validator).
253    pub async fn get_verified(
254        &mut self,
255        name: impl Into<Name>,
256        validator: &ndn_security::Validator,
257    ) -> Result<ndn_security::SafeData, AppError> {
258        self.fetch_verified(name, validator).await
259    }
260}
261
262/// Parse the segment index from a Data packet's FinalBlockId field.
263///
264/// FinalBlockId encodes a NameComponent TLV. For the convention used by
265/// [`Producer::publish_large`], the value is an ASCII-decimal string
266/// inside a generic NameComponent (TLV type 0x08).
267/// Returns `None` if the field is absent or cannot be parsed.
268fn parse_final_block_id_seg(data: &Data) -> Option<usize> {
269    let meta = data.meta_info()?;
270    let fbi = meta.final_block_id.as_ref()?;
271
272    // FinalBlockId bytes = one NameComponent TLV: type(1-3B) + len(1-3B) + value.
273    // Skip the type byte(s) and length byte(s) to reach the value bytes.
274    // For short components (< 253 bytes), both type and length fit in one byte each.
275    // type 0x08 (< 0xFD) = 1 byte; length < 253 = 1 byte.
276    if fbi.len() < 2 {
277        return None;
278    }
279    // First byte is the TLV type; second is the length (for short components).
280    let len = fbi[1] as usize;
281    let value_start = 2;
282    if fbi.len() < value_start + len {
283        return None;
284    }
285    let value = &fbi[value_start..value_start + len];
286    std::str::from_utf8(value).ok()?.parse::<usize>().ok()
287}