ndn_transport/
stream_face.rs

1use bytes::Bytes;
2use futures::{SinkExt, StreamExt};
3use tokio::io::{AsyncRead, AsyncWrite};
4use tokio::sync::Mutex;
5use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
6
7use crate::{Face, FaceError, FaceId, FaceKind};
8
9/// Generic stream-based NDN face.
10///
11/// Wraps any async read/write pair with a codec into a full `Face`
12/// implementation.  The stream is split into independent read and write halves,
13/// each behind its own `Mutex`:
14///
15/// - `reader`: locked only by the face's receive task (single consumer, never
16///   actually contends).
17/// - `writer`: locked by whichever pipeline task calls `send()`, serialising
18///   concurrent sends on the same stream.
19///
20/// The `lp_encode` flag controls whether `send()` wraps outgoing packets in
21/// NDNLPv2 `LpPacket` framing before writing.  Network-facing transports (TCP,
22/// Serial) set this to `true`; local transports (Unix) set it to `false`.
23pub struct StreamFace<R, W, C: Clone> {
24    id: FaceId,
25    kind: FaceKind,
26    lp_encode: bool,
27    remote_uri: Option<String>,
28    local_uri: Option<String>,
29    reader: Mutex<FramedRead<R, C>>,
30    writer: Mutex<FramedWrite<W, C>>,
31}
32
33impl<R, W, C: Clone> StreamFace<R, W, C> {
34    /// Create a new `StreamFace` from pre-split read/write halves.
35    #[allow(clippy::too_many_arguments)]
36    pub fn new(
37        id: FaceId,
38        kind: FaceKind,
39        lp_encode: bool,
40        remote_uri: Option<String>,
41        local_uri: Option<String>,
42        reader: R,
43        writer: W,
44        codec: C,
45    ) -> Self {
46        Self {
47            id,
48            kind,
49            lp_encode,
50            remote_uri,
51            local_uri,
52            reader: Mutex::new(FramedRead::new(reader, codec.clone())),
53            writer: Mutex::new(FramedWrite::new(writer, codec)),
54        }
55    }
56}
57
58impl<R, W, C> Face for StreamFace<R, W, C>
59where
60    R: AsyncRead + Unpin + Send + Sync + 'static,
61    W: AsyncWrite + Unpin + Send + Sync + 'static,
62    C: Decoder<Item = Bytes, Error = std::io::Error>
63        + Encoder<Bytes, Error = std::io::Error>
64        + Clone
65        + Send
66        + Sync
67        + 'static,
68{
69    fn id(&self) -> FaceId {
70        self.id
71    }
72    fn kind(&self) -> FaceKind {
73        self.kind
74    }
75
76    fn remote_uri(&self) -> Option<String> {
77        self.remote_uri.clone()
78    }
79    fn local_uri(&self) -> Option<String> {
80        self.local_uri.clone()
81    }
82
83    async fn recv(&self) -> Result<Bytes, FaceError> {
84        let mut reader = self.reader.lock().await;
85        reader
86            .next()
87            .await
88            .ok_or(FaceError::Closed)?
89            .map_err(FaceError::Io)
90    }
91
92    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
93        let wire = if self.lp_encode {
94            ndn_packet::lp::encode_lp_packet(&pkt)
95        } else {
96            pkt
97        };
98        let mut writer = self.writer.lock().await;
99        writer.send(wire).await.map_err(FaceError::Io)
100    }
101}