ndn_face_local/
unix.rs

1use std::path::Path;
2
3use tokio::net::UnixStream;
4use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
5
6use ndn_transport::{FaceId, FaceKind, StreamFace, TlvCodec};
7
8/// NDN face over a Unix domain socket with TLV length-prefix framing.
9///
10/// Uses [`StreamFace`] with Unix read/write halves and [`TlvCodec`].
11/// LP-encoding is **disabled** — local transports pass raw NDN packets.
12pub type UnixFace = StreamFace<OwnedReadHalf, OwnedWriteHalf, TlvCodec>;
13
14/// Wrap an accepted or connected `UnixStream` into a [`UnixFace`].
15pub fn unix_face_from_stream(id: FaceId, stream: UnixStream, path: impl AsRef<Path>) -> UnixFace {
16    let uri = format!("unix://{}", path.as_ref().display());
17    let (r, w) = stream.into_split();
18    StreamFace::new(id, FaceKind::Unix, false, None, Some(uri), r, w, TlvCodec)
19}
20
21/// Wrap an accepted `UnixStream` into a management [`UnixFace`].
22///
23/// Identical to [`unix_face_from_stream`] except the face is tagged
24/// `FaceKind::Management`, granting it operator-level implicit trust in
25/// the management handler.  Use this for connections accepted on the router's
26/// NFD management socket.
27pub fn unix_management_face_from_stream(
28    id: FaceId,
29    stream: UnixStream,
30    path: impl AsRef<Path>,
31) -> UnixFace {
32    let uri = format!("unix://{}", path.as_ref().display());
33    let (r, w) = stream.into_split();
34    StreamFace::new(
35        id,
36        FaceKind::Management,
37        false,
38        None,
39        Some(uri),
40        r,
41        w,
42        TlvCodec,
43    )
44}
45
46/// Connect to a Unix socket at `path` and return a [`UnixFace`].
47pub async fn unix_face_connect(id: FaceId, path: impl AsRef<Path>) -> std::io::Result<UnixFace> {
48    let path = path.as_ref();
49    let stream = UnixStream::connect(path).await?;
50    Ok(unix_face_from_stream(id, stream, path))
51}
52
53#[cfg(test)]
54mod tests {
55    use super::*;
56    use bytes::Bytes;
57    use ndn_transport::{Face, FaceError};
58    use std::path::PathBuf;
59    use tokio::net::UnixListener;
60
61    fn make_tlv(tag: u8, value: &[u8]) -> Bytes {
62        use ndn_tlv::TlvWriter;
63        let mut w = TlvWriter::new();
64        w.write_tlv(tag as u64, value);
65        w.finish()
66    }
67
68    fn temp_socket_path() -> PathBuf {
69        use std::sync::atomic::{AtomicU64, Ordering};
70        static COUNTER: AtomicU64 = AtomicU64::new(0);
71        std::env::temp_dir().join(format!(
72            "ndn_unix_test_{}_{}.sock",
73            std::process::id(),
74            COUNTER.fetch_add(1, Ordering::Relaxed),
75        ))
76    }
77
78    async fn loopback_pair(path: &PathBuf) -> (UnixFace, UnixFace) {
79        let listener = UnixListener::bind(path).unwrap();
80        let result = tokio::time::timeout(std::time::Duration::from_secs(5), async {
81            let connect_fut = unix_face_connect(FaceId(0), path);
82            let accept_fut = listener.accept();
83            let (client, accepted) = tokio::join!(connect_fut, accept_fut);
84            let (accepted_stream, _) = accepted.unwrap();
85            let server = unix_face_from_stream(FaceId(1), accepted_stream, path);
86            (client.unwrap(), server)
87        })
88        .await;
89        result.expect("loopback_pair timed out")
90    }
91
92    #[tokio::test]
93    async fn send_recv_single_packet() {
94        let path = temp_socket_path();
95        let (client, server) = loopback_pair(&path).await;
96        let pkt = make_tlv(0x05, b"hello");
97        client.send(pkt.clone()).await.unwrap();
98        assert_eq!(server.recv().await.unwrap(), pkt);
99        let _ = std::fs::remove_file(&path);
100    }
101
102    #[tokio::test]
103    async fn framing_multiple_sequential() {
104        let path = temp_socket_path();
105        let (client, server) = loopback_pair(&path).await;
106        let pkts: Vec<Bytes> = (0u8..5).map(|i| make_tlv(0x05, &[i])).collect();
107        for pkt in &pkts {
108            client.send(pkt.clone()).await.unwrap();
109        }
110        for expected in &pkts {
111            assert_eq!(&server.recv().await.unwrap(), expected);
112        }
113        let _ = std::fs::remove_file(&path);
114    }
115
116    #[tokio::test]
117    async fn bidirectional_exchange() {
118        let path = temp_socket_path();
119        let (client, server) = loopback_pair(&path).await;
120        client.send(make_tlv(0x05, b"interest")).await.unwrap();
121        server.send(make_tlv(0x06, b"data")).await.unwrap();
122        assert_eq!(server.recv().await.unwrap(), make_tlv(0x05, b"interest"));
123        assert_eq!(client.recv().await.unwrap(), make_tlv(0x06, b"data"));
124        let _ = std::fs::remove_file(&path);
125    }
126
127    #[tokio::test]
128    async fn recv_eof_on_closed_stream() {
129        let path = temp_socket_path();
130        let listener = UnixListener::bind(&path).unwrap();
131        let connect_fut = UnixStream::connect(&path);
132        let accept_fut = listener.accept();
133        let (stream, accepted) = tokio::join!(connect_fut, accept_fut);
134        let (accepted_stream, _) = accepted.unwrap();
135        let server = unix_face_from_stream(FaceId(1), accepted_stream, &path);
136        drop(stream.unwrap());
137        assert!(matches!(server.recv().await, Err(FaceError::Closed)));
138        let _ = std::fs::remove_file(&path);
139    }
140}