ndn_faces/local/
in_proc.rs1use bytes::Bytes;
2use tokio::sync::{Mutex, mpsc};
3
4use ndn_transport::{Face, FaceError, FaceId, FaceKind};
5
6pub struct InProcFace {
24 id: FaceId,
25 face_rx: Mutex<mpsc::Receiver<Bytes>>,
26 app_tx: mpsc::Sender<Bytes>,
27}
28
29pub struct InProcHandle {
37 face_tx: mpsc::Sender<Bytes>,
38 app_rx: Mutex<mpsc::Receiver<Bytes>>,
39}
40
41impl InProcFace {
42 pub fn new(id: FaceId, buffer: usize) -> (Self, InProcHandle) {
44 let (face_tx, face_rx) = mpsc::channel(buffer);
45 let (app_tx, app_rx) = mpsc::channel(buffer);
46 let face = InProcFace {
47 id,
48 face_rx: Mutex::new(face_rx),
49 app_tx,
50 };
51 let handle = InProcHandle {
52 face_tx,
53 app_rx: Mutex::new(app_rx),
54 };
55 (face, handle)
56 }
57}
58
59impl Face for InProcFace {
60 fn id(&self) -> FaceId {
61 self.id
62 }
63 fn kind(&self) -> FaceKind {
64 FaceKind::App
65 }
66
67 async fn recv(&self) -> Result<Bytes, FaceError> {
69 self.face_rx
70 .lock()
71 .await
72 .recv()
73 .await
74 .ok_or(FaceError::Closed)
75 }
76
77 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
79 self.app_tx.send(pkt).await.map_err(|_| FaceError::Closed)
80 }
81}
82
83impl InProcHandle {
84 pub async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
86 self.face_tx.send(pkt).await.map_err(|_| FaceError::Closed)
87 }
88
89 pub async fn recv(&self) -> Option<Bytes> {
91 self.app_rx.lock().await.recv().await
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98
99 fn test_pkt(tag: u8) -> Bytes {
100 use ndn_tlv::TlvWriter;
101 let mut w = TlvWriter::new();
102 w.write_tlv(0x05, &[tag]);
103 w.finish()
104 }
105
106 #[tokio::test]
107 async fn face_kind_and_id() {
108 let (face, _handle) = InProcFace::new(FaceId(42), 4);
109 assert_eq!(face.id(), FaceId(42));
110 assert_eq!(face.kind(), FaceKind::App);
111 }
112
113 #[tokio::test]
114 async fn app_to_pipeline() {
115 let (face, handle) = InProcFace::new(FaceId(0), 4);
116 handle.send(test_pkt(1)).await.unwrap();
117 let received = face.recv().await.unwrap();
118 assert_eq!(received, test_pkt(1));
119 }
120
121 #[tokio::test]
122 async fn pipeline_to_app() {
123 let (face, handle) = InProcFace::new(FaceId(0), 4);
124 face.send(test_pkt(2)).await.unwrap();
125 let received = handle.recv().await.unwrap();
126 assert_eq!(received, test_pkt(2));
127 }
128
129 #[tokio::test]
130 async fn bidirectional() {
131 let (face, handle) = InProcFace::new(FaceId(0), 4);
132 handle.send(test_pkt(10)).await.unwrap();
133 face.send(test_pkt(20)).await.unwrap();
134 assert_eq!(face.recv().await.unwrap(), test_pkt(10));
135 assert_eq!(handle.recv().await.unwrap(), test_pkt(20));
136 }
137
138 #[tokio::test]
139 async fn closed_when_handle_dropped() {
140 let (face, handle) = InProcFace::new(FaceId(0), 4);
141 drop(handle);
142 assert!(matches!(face.recv().await, Err(FaceError::Closed)));
143 }
144
145 #[tokio::test]
146 async fn closed_when_face_dropped() {
147 let (face, handle) = InProcFace::new(FaceId(0), 4);
148 drop(face);
149 assert!(handle.recv().await.is_none());
150 }
151
152 #[tokio::test]
153 async fn multiple_sequential_packets() {
154 let (face, handle) = InProcFace::new(FaceId(0), 8);
155 for i in 0u8..5 {
156 handle.send(test_pkt(i)).await.unwrap();
157 }
158 for i in 0u8..5 {
159 assert_eq!(face.recv().await.unwrap(), test_pkt(i));
160 }
161 }
162}