1use std::sync::Arc;
2
3use tokio::sync::{mpsc, oneshot};
4
5use ndn_packet::{Data, Interest, Name};
6use ndn_transport::FaceId;
7
8use crate::AppError;
9
10pub struct AppSink {
18 face_id: FaceId,
19 tx: mpsc::Sender<OutboundRequest>,
21}
22
23pub enum OutboundRequest {
24 Interest {
25 interest: Box<Interest>,
26 reply: oneshot::Sender<Result<Data, AppError>>,
27 },
28 RegisterPrefix {
29 prefix: Arc<Name>,
30 handler: Box<dyn Fn(Interest) + Send + Sync + 'static>,
31 },
32}
33
34impl AppSink {
35 pub fn new(face_id: FaceId, capacity: usize) -> (AppSink, mpsc::Receiver<OutboundRequest>) {
40 let (tx, rx) = mpsc::channel(capacity);
41 (AppSink { face_id, tx }, rx)
42 }
43
44 pub fn face_id(&self) -> FaceId {
45 self.face_id
46 }
47
48 pub async fn express(&self, interest: Interest) -> Result<Data, AppError> {
50 let (tx, rx) = oneshot::channel();
51 self.tx
52 .send(OutboundRequest::Interest {
53 interest: Box::new(interest),
54 reply: tx,
55 })
56 .await
57 .map_err(|_| AppError::Closed)?;
58 rx.await.map_err(|_| AppError::Closed)?
59 }
60
61 pub async fn register_prefix<F>(&self, prefix: Name, handler: F) -> Result<(), AppError>
63 where
64 F: Fn(Interest) + Send + Sync + 'static,
65 {
66 self.tx
67 .send(OutboundRequest::RegisterPrefix {
68 prefix: Arc::new(prefix),
69 handler: Box::new(handler),
70 })
71 .await
72 .map_err(|_| AppError::Closed)?;
73 Ok(())
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80 use bytes::Bytes;
81 use ndn_packet::NameComponent;
82
83 fn make_interest(comp: &'static str) -> Interest {
84 let name =
85 Name::from_components([NameComponent::generic(Bytes::from_static(comp.as_bytes()))]);
86 Interest::new(name)
87 }
88
89 fn make_data() -> Data {
90 use ndn_tlv::TlvWriter;
91 let nc = {
92 let mut w = TlvWriter::new();
93 w.write_tlv(0x08, b"test");
94 w.finish()
95 };
96 let name = {
97 let mut w = TlvWriter::new();
98 w.write_tlv(0x07, &nc);
99 w.finish()
100 };
101 let pkt = {
102 let mut w = TlvWriter::new();
103 w.write_tlv(0x06, &name);
104 w.finish()
105 };
106 Data::decode(pkt).unwrap()
107 }
108
109 #[test]
110 fn face_id_accessor() {
111 let (face, _rx) = AppSink::new(FaceId(42), 8);
112 assert_eq!(face.face_id(), FaceId(42));
113 }
114
115 #[tokio::test]
116 async fn express_sends_interest_to_receiver() {
117 let (face, mut rx) = AppSink::new(FaceId(1), 8);
118 let interest = make_interest("hello");
119 let task = tokio::spawn(async move { face.express(interest).await });
120 if let Some(OutboundRequest::Interest { reply, .. }) = rx.recv().await {
122 reply.send(Ok(make_data())).unwrap();
123 } else {
124 panic!("expected Interest request");
125 }
126 task.await.unwrap().unwrap();
127 }
128
129 #[tokio::test]
130 async fn express_returns_error_when_channel_closed() {
131 let (face, rx) = AppSink::new(FaceId(1), 8);
132 drop(rx); let result = face.express(make_interest("x")).await;
134 assert!(matches!(result, Err(AppError::Closed)));
135 }
136
137 #[tokio::test]
138 async fn express_propagates_nack() {
139 use ndn_packet::NackReason;
140 let (face, mut rx) = AppSink::new(FaceId(1), 8);
141 let task = tokio::spawn(async move { face.express(make_interest("x")).await });
142 if let Some(OutboundRequest::Interest { reply, .. }) = rx.recv().await {
143 reply
144 .send(Err(AppError::Nacked {
145 reason: NackReason::NoRoute,
146 }))
147 .unwrap();
148 }
149 let result = task.await.unwrap();
150 assert!(matches!(
151 result,
152 Err(AppError::Nacked {
153 reason: NackReason::NoRoute
154 })
155 ));
156 }
157
158 #[tokio::test]
159 async fn register_prefix_sends_request() {
160 let (face, mut rx) = AppSink::new(FaceId(1), 8);
161 let prefix =
162 Name::from_components([NameComponent::generic(Bytes::from_static(b"myprefix"))]);
163 face.register_prefix(prefix.clone(), |_| {}).await.unwrap();
164 if let Some(OutboundRequest::RegisterPrefix { prefix: p, .. }) = rx.recv().await {
165 assert_eq!(*p, prefix);
166 } else {
167 panic!("expected RegisterPrefix request");
168 }
169 }
170
171 #[tokio::test]
172 async fn register_prefix_returns_error_when_channel_closed() {
173 let (face, rx) = AppSink::new(FaceId(1), 8);
174 drop(rx);
175 let result = face.register_prefix(Name::root(), |_| {}).await;
176 assert!(result.is_err());
177 }
178}