ndn_app/
producer.rs

1use std::future::Future;
2use std::path::Path;
3use std::sync::Arc;
4
5use bytes::Bytes;
6
7use ndn_faces::local::InProcHandle;
8use ndn_ipc::{ChunkedProducer, ForwarderClient, NDN_DEFAULT_SEGMENT_SIZE};
9use ndn_packet::encode::DataBuilder;
10use ndn_packet::{Interest, Name};
11
12use crate::AppError;
13use crate::connection::NdnConnection;
14use crate::responder::Responder;
15
16/// High-level NDN producer — serves Data in response to Interests.
17pub struct Producer {
18    conn: Arc<NdnConnection>,
19    prefix: Name,
20}
21
22impl Producer {
23    /// Connect to an external router and register a prefix.
24    pub async fn connect(
25        socket: impl AsRef<Path>,
26        prefix: impl Into<Name>,
27    ) -> Result<Self, AppError> {
28        let prefix = prefix.into();
29        let client = ForwarderClient::connect(socket)
30            .await
31            .map_err(AppError::Connection)?;
32        client
33            .register_prefix(&prefix)
34            .await
35            .map_err(AppError::Connection)?;
36        Ok(Self {
37            conn: Arc::new(NdnConnection::External(client)),
38            prefix,
39        })
40    }
41
42    /// Create from an in-process InProcHandle (embedded engine).
43    pub fn from_handle(handle: InProcHandle, prefix: Name) -> Self {
44        Self {
45            conn: Arc::new(NdnConnection::Embedded(handle)),
46            prefix,
47        }
48    }
49
50    /// Run the producer loop with an async handler.
51    ///
52    /// The handler receives each `(Interest, Responder)` pair and must call
53    /// one of [`Responder::respond`], [`Responder::respond_bytes`], or
54    /// [`Responder::nack`] to reply. Dropping the `Responder` without replying
55    /// silently discards the Interest.
56    ///
57    /// # Example
58    ///
59    /// ```rust,no_run
60    /// # async fn example(mut producer: ndn_app::Producer) -> Result<(), ndn_app::AppError> {
61    /// use ndn_packet::encode::DataBuilder;
62    ///
63    /// producer.serve(|interest, responder| async move {
64    ///     let data = DataBuilder::new((*interest.name).clone(), b"42").build();
65    ///     responder.respond_bytes(data).await.ok();
66    /// }).await
67    /// # }
68    /// ```
69    pub async fn serve<F, Fut>(&self, handler: F) -> Result<(), AppError>
70    where
71        F: Fn(Interest, Responder) -> Fut + Send + Sync,
72        Fut: Future<Output = ()> + Send,
73    {
74        loop {
75            let raw = match self.conn.recv().await {
76                Some(b) => b,
77                None => break,
78            };
79
80            let interest = match Interest::decode(raw.clone()) {
81                Ok(i) => i,
82                Err(_) => continue,
83            };
84
85            let responder = Responder::new(Arc::clone(&self.conn), raw);
86            handler(interest, responder).await;
87        }
88        Ok(())
89    }
90
91    /// The registered prefix.
92    pub fn prefix(&self) -> &Name {
93        &self.prefix
94    }
95
96    /// Publish a large payload as a segmented NDN object.
97    ///
98    /// Splits `content` into chunks of at most `chunk_size` bytes (default:
99    /// [`NDN_DEFAULT_SEGMENT_SIZE`] = 8 KiB), then serves each chunk as a
100    /// separate Data packet at `/<prefix>/<n>` where `n` is the ASCII-decimal
101    /// segment index. The last segment carries the `FinalBlockId` field so
102    /// consumers can determine when reassembly is complete.
103    ///
104    /// Use [`Consumer::fetch_segmented`](crate::Consumer::fetch_segmented) on
105    /// the receiving side to fetch and reassemble the payload.
106    ///
107    /// # Note
108    ///
109    /// This method serves one round of Interests — enough for one consumer to
110    /// fetch all segments sequentially. For persistent serving (multiple
111    /// consumers), embed the [`ChunkedProducer`] in a custom [`serve`](Self::serve) loop.
112    pub async fn publish_large(
113        &self,
114        prefix: &Name,
115        content: Bytes,
116        chunk_size: usize,
117    ) -> Result<(), AppError> {
118        let seg_size = if chunk_size == 0 {
119            NDN_DEFAULT_SEGMENT_SIZE
120        } else {
121            chunk_size
122        };
123        let chunked = ChunkedProducer::new(prefix.clone(), content, seg_size);
124        let last_seg = chunked.segment_count().saturating_sub(1);
125
126        for seg_idx in 0..=last_seg {
127            let payload = chunked.segment(seg_idx).cloned().unwrap_or_default();
128            let seg_name = prefix.clone().append(seg_idx.to_string());
129
130            // Wait for an Interest, then reply with this segment.
131            let _raw = self.conn.recv().await.ok_or(AppError::Closed)?;
132
133            let data = if seg_idx == last_seg {
134                DataBuilder::new(seg_name, &payload)
135                    .final_block_id_seg(last_seg)
136                    .build()
137            } else {
138                DataBuilder::new(seg_name, &payload).build()
139            };
140            self.conn.send(data).await?;
141        }
142        Ok(())
143    }
144}