ndn_app/
app_face.rs

1use std::sync::Arc;
2
3use tokio::sync::{mpsc, oneshot};
4
5use ndn_packet::{Data, Interest, Name};
6use ndn_transport::FaceId;
7
8use crate::AppError;
9
10/// An in-process face connecting application code to the forwarding engine.
11///
12/// `express()` sends an Interest and waits for the matching Data.
13/// `produce()` registers a handler for a name prefix.
14///
15/// Internally uses `tokio::sync::mpsc` channels — zero-copy `Arc<>` passing
16/// for same-process use.
17pub struct AppSink {
18    face_id: FaceId,
19    /// Channel to send outbound Interests to the pipeline runner.
20    tx: mpsc::Sender<OutboundRequest>,
21}
22
23pub enum OutboundRequest {
24    Interest {
25        interest: Box<Interest>,
26        reply: oneshot::Sender<Result<Data, AppError>>,
27    },
28    RegisterPrefix {
29        prefix: Arc<Name>,
30        handler: Box<dyn Fn(Interest) + Send + Sync + 'static>,
31    },
32}
33
34impl AppSink {
35    /// Create a new `AppSink` and the matching request receiver.
36    ///
37    /// The caller (typically the engine) holds the `Receiver` and dispatches
38    /// `OutboundRequest` messages as they arrive.
39    pub fn new(face_id: FaceId, capacity: usize) -> (AppSink, mpsc::Receiver<OutboundRequest>) {
40        let (tx, rx) = mpsc::channel(capacity);
41        (AppSink { face_id, tx }, rx)
42    }
43
44    pub fn face_id(&self) -> FaceId {
45        self.face_id
46    }
47
48    /// Express an Interest and wait for the matching Data.
49    pub async fn express(&self, interest: Interest) -> Result<Data, AppError> {
50        let (tx, rx) = oneshot::channel();
51        self.tx
52            .send(OutboundRequest::Interest {
53                interest: Box::new(interest),
54                reply: tx,
55            })
56            .await
57            .map_err(|_| AppError::Closed)?;
58        rx.await.map_err(|_| AppError::Closed)?
59    }
60
61    /// Register a handler for Interests matching `prefix`.
62    pub async fn register_prefix<F>(&self, prefix: Name, handler: F) -> Result<(), AppError>
63    where
64        F: Fn(Interest) + Send + Sync + 'static,
65    {
66        self.tx
67            .send(OutboundRequest::RegisterPrefix {
68                prefix: Arc::new(prefix),
69                handler: Box::new(handler),
70            })
71            .await
72            .map_err(|_| AppError::Closed)?;
73        Ok(())
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80    use bytes::Bytes;
81    use ndn_packet::NameComponent;
82
83    fn make_interest(comp: &'static str) -> Interest {
84        let name =
85            Name::from_components([NameComponent::generic(Bytes::from_static(comp.as_bytes()))]);
86        Interest::new(name)
87    }
88
89    fn make_data() -> Data {
90        use ndn_tlv::TlvWriter;
91        let nc = {
92            let mut w = TlvWriter::new();
93            w.write_tlv(0x08, b"test");
94            w.finish()
95        };
96        let name = {
97            let mut w = TlvWriter::new();
98            w.write_tlv(0x07, &nc);
99            w.finish()
100        };
101        let pkt = {
102            let mut w = TlvWriter::new();
103            w.write_tlv(0x06, &name);
104            w.finish()
105        };
106        Data::decode(pkt).unwrap()
107    }
108
109    #[test]
110    fn face_id_accessor() {
111        let (face, _rx) = AppSink::new(FaceId(42), 8);
112        assert_eq!(face.face_id(), FaceId(42));
113    }
114
115    #[tokio::test]
116    async fn express_sends_interest_to_receiver() {
117        let (face, mut rx) = AppSink::new(FaceId(1), 8);
118        let interest = make_interest("hello");
119        let task = tokio::spawn(async move { face.express(interest).await });
120        // Engine side: receive the request and reply.
121        if let Some(OutboundRequest::Interest { reply, .. }) = rx.recv().await {
122            reply.send(Ok(make_data())).unwrap();
123        } else {
124            panic!("expected Interest request");
125        }
126        task.await.unwrap().unwrap();
127    }
128
129    #[tokio::test]
130    async fn express_returns_error_when_channel_closed() {
131        let (face, rx) = AppSink::new(FaceId(1), 8);
132        drop(rx); // engine side dropped
133        let result = face.express(make_interest("x")).await;
134        assert!(matches!(result, Err(AppError::Closed)));
135    }
136
137    #[tokio::test]
138    async fn express_propagates_nack() {
139        use ndn_packet::NackReason;
140        let (face, mut rx) = AppSink::new(FaceId(1), 8);
141        let task = tokio::spawn(async move { face.express(make_interest("x")).await });
142        if let Some(OutboundRequest::Interest { reply, .. }) = rx.recv().await {
143            reply
144                .send(Err(AppError::Nacked {
145                    reason: NackReason::NoRoute,
146                }))
147                .unwrap();
148        }
149        let result = task.await.unwrap();
150        assert!(matches!(
151            result,
152            Err(AppError::Nacked {
153                reason: NackReason::NoRoute
154            })
155        ));
156    }
157
158    #[tokio::test]
159    async fn register_prefix_sends_request() {
160        let (face, mut rx) = AppSink::new(FaceId(1), 8);
161        let prefix =
162            Name::from_components([NameComponent::generic(Bytes::from_static(b"myprefix"))]);
163        face.register_prefix(prefix.clone(), |_| {}).await.unwrap();
164        if let Some(OutboundRequest::RegisterPrefix { prefix: p, .. }) = rx.recv().await {
165            assert_eq!(*p, prefix);
166        } else {
167            panic!("expected RegisterPrefix request");
168        }
169    }
170
171    #[tokio::test]
172    async fn register_prefix_returns_error_when_channel_closed() {
173        let (face, rx) = AppSink::new(FaceId(1), 8);
174        drop(rx);
175        let result = face.register_prefix(Name::root(), |_| {}).await;
176        assert!(result.is_err());
177    }
178}