ndn_app/
queryable.rs

1//! Queryable — register a prefix and respond to incoming queries (Interests).
2//!
3//! Like [`Producer`](crate::Producer) but returns a stream of [`Query`] objects
4//! that the application responds to explicitly, matching Zenoh's queryable pattern.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! # async fn example() -> Result<(), ndn_app::AppError> {
10//! use ndn_app::Queryable;
11//! use ndn_packet::encode::DataBuilder;
12//!
13//! let mut q = Queryable::connect("/run/nfd/nfd.sock", "/sensors/temp").await?;
14//!
15//! while let Some(query) = q.recv().await {
16//!     let data = DataBuilder::new((*query.interest.name).clone(), b"22.5").build();
17//!     query.reply(data).await.ok();
18//! }
19//! # Ok(())
20//! # }
21//! ```
22
23use std::path::Path;
24use std::sync::Arc;
25
26use bytes::Bytes;
27
28use ndn_faces::local::InProcHandle;
29use ndn_ipc::ForwarderClient;
30use ndn_packet::{Interest, Name};
31
32use crate::AppError;
33use crate::connection::NdnConnection;
34
35/// A query received by a [`Queryable`] — the application replies via [`Query::reply`].
36pub struct Query {
37    /// The incoming Interest.
38    pub interest: Interest,
39    /// Sender for the reply Data.
40    conn: Arc<NdnConnection>,
41}
42
43impl Query {
44    /// Send a Data reply for this query.
45    pub async fn reply(&self, data: Bytes) -> Result<(), AppError> {
46        self.conn.send(data).await
47    }
48}
49
50/// A queryable endpoint — receives Interests and lets the application reply.
51pub struct Queryable {
52    conn: Arc<NdnConnection>,
53    prefix: Name,
54}
55
56impl Queryable {
57    /// Connect to an external router and register a prefix.
58    pub async fn connect(
59        socket: impl AsRef<Path>,
60        prefix: impl Into<Name>,
61    ) -> Result<Self, AppError> {
62        let prefix = prefix.into();
63        let client = ForwarderClient::connect(socket)
64            .await
65            .map_err(AppError::Connection)?;
66        client
67            .register_prefix(&prefix)
68            .await
69            .map_err(AppError::Connection)?;
70        Ok(Self {
71            conn: Arc::new(NdnConnection::External(client)),
72            prefix,
73        })
74    }
75
76    /// Create from an in-process InProcHandle (embedded engine).
77    pub fn from_handle(handle: InProcHandle, prefix: Name) -> Self {
78        Self {
79            conn: Arc::new(NdnConnection::Embedded(handle)),
80            prefix,
81        }
82    }
83
84    /// The registered prefix.
85    pub fn prefix(&self) -> &Name {
86        &self.prefix
87    }
88
89    /// Receive the next query. Returns `None` when the connection closes.
90    ///
91    /// Each returned [`Query`] carries a sender so the application can reply
92    /// asynchronously — even from a different task.
93    pub async fn recv(&self) -> Option<Query> {
94        loop {
95            let raw = self.conn.recv().await?;
96            let interest = match Interest::decode(raw) {
97                Ok(i) => i,
98                Err(_) => continue,
99            };
100            return Some(Query {
101                interest,
102                conn: Arc::clone(&self.conn),
103            });
104        }
105    }
106
107    /// Run a query handler loop.
108    ///
109    /// The handler receives each Interest and returns `Some(wire_data)` to
110    /// respond or `None` to silently drop.
111    pub async fn serve<F, Fut>(&self, handler: F) -> Result<(), AppError>
112    where
113        F: Fn(Interest) -> Fut + Send + Sync,
114        Fut: std::future::Future<Output = Option<Bytes>> + Send,
115    {
116        loop {
117            let raw = match self.conn.recv().await {
118                Some(b) => b,
119                None => break,
120            };
121            let interest = match Interest::decode(raw) {
122                Ok(i) => i,
123                Err(_) => continue,
124            };
125            if let Some(data) = handler(interest).await {
126                self.conn.send(data).await?;
127            }
128        }
129        Ok(())
130    }
131}