ndn_transport/
stream_face.rs1use bytes::Bytes;
2use futures::{SinkExt, StreamExt};
3use tokio::io::{AsyncRead, AsyncWrite};
4use tokio::sync::Mutex;
5use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
6
7use crate::{Face, FaceError, FaceId, FaceKind};
8
9pub struct StreamFace<R, W, C: Clone> {
24 id: FaceId,
25 kind: FaceKind,
26 lp_encode: bool,
27 remote_uri: Option<String>,
28 local_uri: Option<String>,
29 reader: Mutex<FramedRead<R, C>>,
30 writer: Mutex<FramedWrite<W, C>>,
31}
32
33impl<R, W, C: Clone> StreamFace<R, W, C> {
34 #[allow(clippy::too_many_arguments)]
36 pub fn new(
37 id: FaceId,
38 kind: FaceKind,
39 lp_encode: bool,
40 remote_uri: Option<String>,
41 local_uri: Option<String>,
42 reader: R,
43 writer: W,
44 codec: C,
45 ) -> Self {
46 Self {
47 id,
48 kind,
49 lp_encode,
50 remote_uri,
51 local_uri,
52 reader: Mutex::new(FramedRead::new(reader, codec.clone())),
53 writer: Mutex::new(FramedWrite::new(writer, codec)),
54 }
55 }
56}
57
58impl<R, W, C> Face for StreamFace<R, W, C>
59where
60 R: AsyncRead + Unpin + Send + Sync + 'static,
61 W: AsyncWrite + Unpin + Send + Sync + 'static,
62 C: Decoder<Item = Bytes, Error = std::io::Error>
63 + Encoder<Bytes, Error = std::io::Error>
64 + Clone
65 + Send
66 + Sync
67 + 'static,
68{
69 fn id(&self) -> FaceId {
70 self.id
71 }
72 fn kind(&self) -> FaceKind {
73 self.kind
74 }
75
76 fn remote_uri(&self) -> Option<String> {
77 self.remote_uri.clone()
78 }
79 fn local_uri(&self) -> Option<String> {
80 self.local_uri.clone()
81 }
82
83 async fn recv(&self) -> Result<Bytes, FaceError> {
84 let mut reader = self.reader.lock().await;
85 reader
86 .next()
87 .await
88 .ok_or(FaceError::Closed)?
89 .map_err(FaceError::Io)
90 }
91
92 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
93 let wire = if self.lp_encode {
94 ndn_packet::lp::encode_lp_packet(&pkt)
95 } else {
96 pkt
97 };
98 let mut writer = self.writer.lock().await;
99 writer.send(wire).await.map_err(FaceError::Io)
100 }
101}