1use std::io;
31
32use tokio::io::{AsyncRead, AsyncWrite};
33
34use ndn_transport::{FaceId, FaceKind, StreamFace, TlvCodec};
35
36type DynRead = Box<dyn AsyncRead + Send + Sync + Unpin>;
39type DynWrite = Box<dyn AsyncWrite + Send + Sync + Unpin>;
40
41pub type IpcFace = StreamFace<DynRead, DynWrite, TlvCodec>;
46
47fn make_face(id: FaceId, kind: FaceKind, uri: String, r: DynRead, w: DynWrite) -> IpcFace {
50 StreamFace::new(id, kind, false, None, Some(uri), r, w, TlvCodec)
51}
52
53pub struct IpcListener {
65 inner: PlatformListener,
66}
67
68impl IpcListener {
69 pub fn bind(path: &str) -> io::Result<Self> {
71 Ok(Self {
72 inner: PlatformListener::bind(path)?,
73 })
74 }
75
76 pub async fn accept(&self, face_id: FaceId) -> io::Result<IpcFace> {
80 let (r, w, uri) = self.inner.accept().await?;
81 Ok(make_face(face_id, FaceKind::Management, uri, r, w))
82 }
83
84 pub fn cleanup(&self) {
86 self.inner.cleanup();
87 }
88
89 pub fn uri(&self) -> &str {
91 self.inner.uri()
92 }
93}
94
95pub async fn ipc_face_connect(id: FaceId, path: &str) -> io::Result<IpcFace> {
102 let (r, w, uri) = platform_connect(path).await?;
103 Ok(make_face(id, FaceKind::Unix, uri, r, w))
104}
105
106#[cfg(unix)]
109struct PlatformListener {
110 listener: tokio::net::UnixListener,
111 path: String,
112}
113
114#[cfg(unix)]
115impl PlatformListener {
116 fn bind(path: &str) -> io::Result<Self> {
117 let _ = std::fs::remove_file(path);
118 let listener = tokio::net::UnixListener::bind(path)?;
119 Ok(Self {
120 listener,
121 path: path.to_owned(),
122 })
123 }
124
125 async fn accept(&self) -> io::Result<(DynRead, DynWrite, String)> {
126 let (stream, _) = self.listener.accept().await?;
127 let (r, w) = stream.into_split();
128 let uri = format!("unix://{}", self.path);
129 Ok((Box::new(r), Box::new(w), uri))
130 }
131
132 fn cleanup(&self) {
133 let _ = std::fs::remove_file(&self.path);
134 }
135
136 fn uri(&self) -> &str {
137 &self.path
138 }
139}
140
141#[cfg(unix)]
142async fn platform_connect(path: &str) -> io::Result<(DynRead, DynWrite, String)> {
143 let stream = tokio::net::UnixStream::connect(path).await?;
144 let (r, w) = stream.into_split();
145 let uri = format!("unix://{path}");
146 Ok((Box::new(r), Box::new(w), uri))
147}
148
149#[cfg(windows)]
152struct PlatformListener {
153 path: String,
154 first: std::sync::atomic::AtomicBool,
157}
158
159#[cfg(windows)]
160impl PlatformListener {
161 fn bind(path: &str) -> io::Result<Self> {
162 if !path.starts_with(r"\\.\pipe\") {
163 return Err(io::Error::new(
164 io::ErrorKind::InvalidInput,
165 format!(
166 "Windows IPC path must start with \\\\.\\pipe\\ (got {path:?}). \
167 Use e.g. \\\\.\\pipe\ndn"
168 ),
169 ));
170 }
171 Ok(Self {
172 path: path.to_owned(),
173 first: std::sync::atomic::AtomicBool::new(true),
174 })
175 }
176
177 async fn accept(&self) -> io::Result<(DynRead, DynWrite, String)> {
178 use std::sync::atomic::Ordering;
179 use tokio::net::windows::named_pipe::ServerOptions;
180
181 let first = self.first.swap(false, Ordering::AcqRel);
184 let server = ServerOptions::new()
185 .first_pipe_instance(first)
186 .access_inbound(true)
187 .access_outbound(true)
188 .create(&self.path)?;
189
190 server.connect().await?;
191
192 let (r, w) = tokio::io::split(server);
193 let uri = format!("pipe://{}", self.path);
194 Ok((Box::new(r), Box::new(w), uri))
195 }
196
197 fn cleanup(&self) {
198 }
200
201 fn uri(&self) -> &str {
202 &self.path
203 }
204}
205
206#[cfg(windows)]
207async fn platform_connect(path: &str) -> io::Result<(DynRead, DynWrite, String)> {
208 use tokio::net::windows::named_pipe::ClientOptions;
209
210 let client = loop {
213 match ClientOptions::new().open(path) {
214 Ok(c) => break c,
215 Err(e) if e.raw_os_error() == Some(231) => {
216 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
218 }
219 Err(e) => return Err(e),
220 }
221 };
222
223 let (r, w) = tokio::io::split(client);
224 let uri = format!("pipe://{path}");
225 Ok((Box::new(r), Box::new(w), uri))
226}
227
228#[cfg(not(any(unix, windows)))]
231compile_error!(
232 "ndn-face-local IPC transport requires Unix domain sockets (unix) \
233 or Windows Named Pipes (windows)"
234);