ndn_faces/local/
in_proc.rs

1use bytes::Bytes;
2use tokio::sync::{Mutex, mpsc};
3
4use ndn_transport::{Face, FaceError, FaceId, FaceKind};
5
6/// In-process NDN face backed by a pair of `tokio::sync::mpsc` channels.
7///
8/// `InProcFace` is held by the forwarder pipeline; [`InProcHandle`] is given
9/// to the application (library user). The forwarder sends packets to the app
10/// via `InProcFace::send` → `app_tx`; the app sends packets to the forwarder
11/// via `InProcHandle::send` → `face_tx`.
12///
13/// ```text
14///   pipeline                 application
15///   ────────                 ───────────
16///   InProcFace::recv()  ←  InProcHandle::send()   (face_rx ← face_tx)
17///   InProcFace::send()  →  InProcHandle::recv()   (app_tx  → app_rx)
18/// ```
19///
20/// `face_rx` is wrapped in a `Mutex` to satisfy the `&self` requirement of the
21/// `Face` trait; the pipeline's single-consumer contract means it never
22/// actually contends.
23pub struct InProcFace {
24    id: FaceId,
25    face_rx: Mutex<mpsc::Receiver<Bytes>>,
26    app_tx: mpsc::Sender<Bytes>,
27}
28
29/// Application-side handle to an [`InProcFace`].
30///
31/// Send Interests with [`send`][InProcHandle::send]; receive Data/Nacks with
32/// [`recv`][InProcHandle::recv].
33///
34/// The receiver is wrapped in a `Mutex` so that `recv()` takes `&self`,
35/// enabling shared ownership (e.g. concurrent send/recv from different tasks).
36pub struct InProcHandle {
37    face_tx: mpsc::Sender<Bytes>,
38    app_rx: Mutex<mpsc::Receiver<Bytes>>,
39}
40
41impl InProcFace {
42    /// Create a linked (`InProcFace`, `InProcHandle`) pair with `buffer` slots each.
43    pub fn new(id: FaceId, buffer: usize) -> (Self, InProcHandle) {
44        let (face_tx, face_rx) = mpsc::channel(buffer);
45        let (app_tx, app_rx) = mpsc::channel(buffer);
46        let face = InProcFace {
47            id,
48            face_rx: Mutex::new(face_rx),
49            app_tx,
50        };
51        let handle = InProcHandle {
52            face_tx,
53            app_rx: Mutex::new(app_rx),
54        };
55        (face, handle)
56    }
57}
58
59impl Face for InProcFace {
60    fn id(&self) -> FaceId {
61        self.id
62    }
63    fn kind(&self) -> FaceKind {
64        FaceKind::App
65    }
66
67    /// Receive a packet sent by the application via `InProcHandle::send`.
68    async fn recv(&self) -> Result<Bytes, FaceError> {
69        self.face_rx
70            .lock()
71            .await
72            .recv()
73            .await
74            .ok_or(FaceError::Closed)
75    }
76
77    /// Forward a packet to the application (readable via `InProcHandle::recv`).
78    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
79        self.app_tx.send(pkt).await.map_err(|_| FaceError::Closed)
80    }
81}
82
83impl InProcHandle {
84    /// Send a packet to the forwarder (readable via `InProcFace::recv`).
85    pub async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
86        self.face_tx.send(pkt).await.map_err(|_| FaceError::Closed)
87    }
88
89    /// Receive a packet from the forwarder (sent via `InProcFace::send`).
90    pub async fn recv(&self) -> Option<Bytes> {
91        self.app_rx.lock().await.recv().await
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98
99    fn test_pkt(tag: u8) -> Bytes {
100        use ndn_tlv::TlvWriter;
101        let mut w = TlvWriter::new();
102        w.write_tlv(0x05, &[tag]);
103        w.finish()
104    }
105
106    #[tokio::test]
107    async fn face_kind_and_id() {
108        let (face, _handle) = InProcFace::new(FaceId(42), 4);
109        assert_eq!(face.id(), FaceId(42));
110        assert_eq!(face.kind(), FaceKind::App);
111    }
112
113    #[tokio::test]
114    async fn app_to_pipeline() {
115        let (face, handle) = InProcFace::new(FaceId(0), 4);
116        handle.send(test_pkt(1)).await.unwrap();
117        let received = face.recv().await.unwrap();
118        assert_eq!(received, test_pkt(1));
119    }
120
121    #[tokio::test]
122    async fn pipeline_to_app() {
123        let (face, handle) = InProcFace::new(FaceId(0), 4);
124        face.send(test_pkt(2)).await.unwrap();
125        let received = handle.recv().await.unwrap();
126        assert_eq!(received, test_pkt(2));
127    }
128
129    #[tokio::test]
130    async fn bidirectional() {
131        let (face, handle) = InProcFace::new(FaceId(0), 4);
132        handle.send(test_pkt(10)).await.unwrap();
133        face.send(test_pkt(20)).await.unwrap();
134        assert_eq!(face.recv().await.unwrap(), test_pkt(10));
135        assert_eq!(handle.recv().await.unwrap(), test_pkt(20));
136    }
137
138    #[tokio::test]
139    async fn closed_when_handle_dropped() {
140        let (face, handle) = InProcFace::new(FaceId(0), 4);
141        drop(handle);
142        assert!(matches!(face.recv().await, Err(FaceError::Closed)));
143    }
144
145    #[tokio::test]
146    async fn closed_when_face_dropped() {
147        let (face, handle) = InProcFace::new(FaceId(0), 4);
148        drop(face);
149        assert!(handle.recv().await.is_none());
150    }
151
152    #[tokio::test]
153    async fn multiple_sequential_packets() {
154        let (face, handle) = InProcFace::new(FaceId(0), 8);
155        for i in 0u8..5 {
156            handle.send(test_pkt(i)).await.unwrap();
157        }
158        for i in 0u8..5 {
159            assert_eq!(face.recv().await.unwrap(), test_pkt(i));
160        }
161    }
162}