1use std::io;
31
32use tokio::io::{AsyncRead, AsyncWrite};
33
34use ndn_transport::{FaceId, FaceKind, StreamFace, TlvCodec};
35
36type DynRead = Box<dyn AsyncRead + Send + Sync + Unpin>;
39type DynWrite = Box<dyn AsyncWrite + Send + Sync + Unpin>;
40
41pub type IpcFace = StreamFace<DynRead, DynWrite, TlvCodec>;
46
47fn make_face(id: FaceId, kind: FaceKind, uri: String, r: DynRead, w: DynWrite) -> IpcFace {
50 StreamFace::new(id, kind, false, None, Some(uri), r, w, TlvCodec)
51}
52
53pub struct IpcListener {
65 inner: PlatformListener,
66}
67
68impl IpcListener {
69 pub fn bind(path: &str) -> io::Result<Self> {
71 Ok(Self {
72 inner: PlatformListener::bind(path)?,
73 })
74 }
75
76 pub async fn accept(&self, face_id: FaceId) -> io::Result<IpcFace> {
80 let (r, w, uri) = self.inner.accept().await?;
81 Ok(make_face(face_id, FaceKind::Management, uri, r, w))
82 }
83
84 pub fn cleanup(&self) {
86 self.inner.cleanup();
87 }
88
89 pub fn uri(&self) -> &str {
91 self.inner.uri()
92 }
93}
94
95pub async fn ipc_face_connect(id: FaceId, path: &str) -> io::Result<IpcFace> {
102 let (r, w, uri) = platform_connect(path).await?;
103 Ok(make_face(id, FaceKind::Unix, uri, r, w))
104}
105
106#[cfg(unix)]
109struct PlatformListener {
110 listener: tokio::net::UnixListener,
111 path: String,
112}
113
114#[cfg(unix)]
115impl PlatformListener {
116 fn bind(path: &str) -> io::Result<Self> {
117 if let Some(parent) = std::path::Path::new(path).parent()
119 && !parent.as_os_str().is_empty()
120 {
121 std::fs::create_dir_all(parent)?;
122 }
123 let _ = std::fs::remove_file(path);
124 let listener = tokio::net::UnixListener::bind(path)?;
125 Ok(Self {
126 listener,
127 path: path.to_owned(),
128 })
129 }
130
131 async fn accept(&self) -> io::Result<(DynRead, DynWrite, String)> {
132 let (stream, _) = self.listener.accept().await?;
133 let (r, w) = stream.into_split();
134 let uri = format!("unix://{}", self.path);
135 Ok((Box::new(r), Box::new(w), uri))
136 }
137
138 fn cleanup(&self) {
139 let _ = std::fs::remove_file(&self.path);
140 }
141
142 fn uri(&self) -> &str {
143 &self.path
144 }
145}
146
147#[cfg(unix)]
148async fn platform_connect(path: &str) -> io::Result<(DynRead, DynWrite, String)> {
149 let stream = tokio::net::UnixStream::connect(path).await?;
150 let (r, w) = stream.into_split();
151 let uri = format!("unix://{path}");
152 Ok((Box::new(r), Box::new(w), uri))
153}
154
155#[cfg(windows)]
158struct PlatformListener {
159 path: String,
160 first: std::sync::atomic::AtomicBool,
163}
164
165#[cfg(windows)]
166impl PlatformListener {
167 fn bind(path: &str) -> io::Result<Self> {
168 if !path.starts_with(r"\\.\pipe\") {
169 return Err(io::Error::new(
170 io::ErrorKind::InvalidInput,
171 format!(
172 "Windows IPC path must start with \\\\.\\pipe\\ (got {path:?}). \
173 Use e.g. \\\\.\\pipe\ndn"
174 ),
175 ));
176 }
177 Ok(Self {
178 path: path.to_owned(),
179 first: std::sync::atomic::AtomicBool::new(true),
180 })
181 }
182
183 async fn accept(&self) -> io::Result<(DynRead, DynWrite, String)> {
184 use std::sync::atomic::Ordering;
185 use tokio::net::windows::named_pipe::ServerOptions;
186
187 let first = self.first.swap(false, Ordering::AcqRel);
190 let server = ServerOptions::new()
191 .first_pipe_instance(first)
192 .access_inbound(true)
193 .access_outbound(true)
194 .create(&self.path)?;
195
196 server.connect().await?;
197
198 let (r, w) = tokio::io::split(server);
199 let uri = format!("pipe://{}", self.path);
200 Ok((Box::new(r), Box::new(w), uri))
201 }
202
203 fn cleanup(&self) {
204 }
206
207 fn uri(&self) -> &str {
208 &self.path
209 }
210}
211
212#[cfg(windows)]
213async fn platform_connect(path: &str) -> io::Result<(DynRead, DynWrite, String)> {
214 use tokio::net::windows::named_pipe::ClientOptions;
215
216 let client = loop {
219 match ClientOptions::new().open(path) {
220 Ok(c) => break c,
221 Err(e) if e.raw_os_error() == Some(231) => {
222 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
224 }
225 Err(e) => return Err(e),
226 }
227 };
228
229 let (r, w) = tokio::io::split(client);
230 let uri = format!("pipe://{path}");
231 Ok((Box::new(r), Box::new(w), uri))
232}
233
234#[cfg(not(any(unix, windows)))]
237compile_error!(
238 "ndn-face-local IPC transport requires Unix domain sockets (unix) \
239 or Windows Named Pipes (windows)"
240);