ndn_app/producer.rs
1use std::future::Future;
2use std::path::Path;
3use std::sync::Arc;
4
5use bytes::Bytes;
6
7use ndn_faces::local::InProcHandle;
8use ndn_ipc::{ChunkedProducer, ForwarderClient, NDN_DEFAULT_SEGMENT_SIZE};
9use ndn_packet::encode::DataBuilder;
10use ndn_packet::{Interest, Name};
11
12use crate::AppError;
13use crate::connection::NdnConnection;
14use crate::responder::Responder;
15
16/// High-level NDN producer — serves Data in response to Interests.
17pub struct Producer {
18 conn: Arc<NdnConnection>,
19 prefix: Name,
20}
21
22impl Producer {
23 /// Connect to an external router and register a prefix.
24 pub async fn connect(
25 socket: impl AsRef<Path>,
26 prefix: impl Into<Name>,
27 ) -> Result<Self, AppError> {
28 let prefix = prefix.into();
29 let client = ForwarderClient::connect(socket)
30 .await
31 .map_err(AppError::Connection)?;
32 client
33 .register_prefix(&prefix)
34 .await
35 .map_err(AppError::Connection)?;
36 Ok(Self {
37 conn: Arc::new(NdnConnection::External(client)),
38 prefix,
39 })
40 }
41
42 /// Create from an in-process InProcHandle (embedded engine).
43 pub fn from_handle(handle: InProcHandle, prefix: Name) -> Self {
44 Self {
45 conn: Arc::new(NdnConnection::Embedded(handle)),
46 prefix,
47 }
48 }
49
50 /// Run the producer loop with an async handler.
51 ///
52 /// The handler receives each `(Interest, Responder)` pair and must call
53 /// one of [`Responder::respond`], [`Responder::respond_bytes`], or
54 /// [`Responder::nack`] to reply. Dropping the `Responder` without replying
55 /// silently discards the Interest.
56 ///
57 /// # Example
58 ///
59 /// ```rust,no_run
60 /// # async fn example(mut producer: ndn_app::Producer) -> Result<(), ndn_app::AppError> {
61 /// use ndn_packet::encode::DataBuilder;
62 ///
63 /// producer.serve(|interest, responder| async move {
64 /// let data = DataBuilder::new((*interest.name).clone(), b"42").build();
65 /// responder.respond_bytes(data).await.ok();
66 /// }).await
67 /// # }
68 /// ```
69 pub async fn serve<F, Fut>(&self, handler: F) -> Result<(), AppError>
70 where
71 F: Fn(Interest, Responder) -> Fut + Send + Sync,
72 Fut: Future<Output = ()> + Send,
73 {
74 loop {
75 let raw = match self.conn.recv().await {
76 Some(b) => b,
77 None => break,
78 };
79
80 let interest = match Interest::decode(raw.clone()) {
81 Ok(i) => i,
82 Err(_) => continue,
83 };
84
85 let responder = Responder::new(Arc::clone(&self.conn), raw);
86 handler(interest, responder).await;
87 }
88 Ok(())
89 }
90
91 /// The registered prefix.
92 pub fn prefix(&self) -> &Name {
93 &self.prefix
94 }
95
96 /// Publish a large payload as a segmented NDN object.
97 ///
98 /// Splits `content` into chunks of at most `chunk_size` bytes (default:
99 /// [`NDN_DEFAULT_SEGMENT_SIZE`] = 8 KiB), then serves each chunk as a
100 /// separate Data packet at `/<prefix>/<n>` where `n` is the ASCII-decimal
101 /// segment index. The last segment carries the `FinalBlockId` field so
102 /// consumers can determine when reassembly is complete.
103 ///
104 /// Use [`Consumer::fetch_segmented`](crate::Consumer::fetch_segmented) on
105 /// the receiving side to fetch and reassemble the payload.
106 ///
107 /// # Note
108 ///
109 /// This method serves one round of Interests — enough for one consumer to
110 /// fetch all segments sequentially. For persistent serving (multiple
111 /// consumers), embed the [`ChunkedProducer`] in a custom [`serve`](Self::serve) loop.
112 pub async fn publish_large(
113 &self,
114 prefix: &Name,
115 content: Bytes,
116 chunk_size: usize,
117 ) -> Result<(), AppError> {
118 let seg_size = if chunk_size == 0 {
119 NDN_DEFAULT_SEGMENT_SIZE
120 } else {
121 chunk_size
122 };
123 let chunked = ChunkedProducer::new(prefix.clone(), content, seg_size);
124 let last_seg = chunked.segment_count().saturating_sub(1);
125
126 for seg_idx in 0..=last_seg {
127 let payload = chunked.segment(seg_idx).cloned().unwrap_or_default();
128 let seg_name = prefix.clone().append(seg_idx.to_string());
129
130 // Wait for an Interest, then reply with this segment.
131 let _raw = self.conn.recv().await.ok_or(AppError::Closed)?;
132
133 let data = if seg_idx == last_seg {
134 DataBuilder::new(seg_name, &payload)
135 .final_block_id_seg(last_seg)
136 .build()
137 } else {
138 DataBuilder::new(seg_name, &payload).build()
139 };
140 self.conn.send(data).await?;
141 }
142 Ok(())
143 }
144}