ndn_face_local/
ipc.rs

1//! Platform-agnostic IPC transport for the NDN management socket.
2//!
3//! Abstracts over Unix domain sockets (Linux / macOS) and Windows Named Pipes
4//! so that `MgmtClient`, `run_face_listener`, and `ndn-ctl` compile and run on
5//! all three platforms without conditional-compilation scaffolding at each call
6//! site.
7//!
8//! # Face type
9//!
10//! [`IpcFace`] uses boxed trait objects for the read / write halves so the
11//! concrete type is identical on every platform:
12//!
13//! ```text
14//! IpcFace = StreamFace<
15//!     Box<dyn AsyncRead + Send + Unpin>,
16//!     Box<dyn AsyncWrite + Send + Unpin>,
17//!     TlvCodec,
18//! >
19//! ```
20//!
21//! The boxing overhead is negligible for management traffic.
22//!
23//! # Default socket paths
24//!
25//! | Platform | Default path |
26//! |----------|-------------|
27//! | Unix     | `/run/ndn/mgmt.sock` (or `/tmp/ndn.sock` in dev) |
28//! | Windows  | `\\.\pipe\ndn` |
29
30use std::io;
31
32use tokio::io::{AsyncRead, AsyncWrite};
33
34use ndn_transport::{FaceId, FaceKind, StreamFace, TlvCodec};
35
36// ─── Type alias ──────────────────────────────────────────────────────────────
37
38type DynRead = Box<dyn AsyncRead + Send + Sync + Unpin>;
39type DynWrite = Box<dyn AsyncWrite + Send + Sync + Unpin>;
40
41/// Platform-agnostic NDN face over the management IPC socket.
42///
43/// On Unix this is backed by a `UnixStream`; on Windows by a Named Pipe.
44/// The concrete type is the same on all platforms.
45pub type IpcFace = StreamFace<DynRead, DynWrite, TlvCodec>;
46
47// ─── Shared helpers ──────────────────────────────────────────────────────────
48
49fn make_face(id: FaceId, kind: FaceKind, uri: String, r: DynRead, w: DynWrite) -> IpcFace {
50    StreamFace::new(id, kind, false, None, Some(uri), r, w, TlvCodec)
51}
52
53// ─── IpcListener ─────────────────────────────────────────────────────────────
54
55/// Listens for IPC connections on the management socket path.
56///
57/// # Unix
58/// Binds a Unix domain socket at `path`, removing any stale file first.
59/// Call [`IpcListener::cleanup`] on shutdown to remove the socket file.
60///
61/// # Windows
62/// `path` must be a named pipe path such as `\\.\pipe\ndn`.
63/// Named pipes are cleaned up automatically when all handles close.
64pub struct IpcListener {
65    inner: PlatformListener,
66}
67
68impl IpcListener {
69    /// Bind to `path` and start listening.
70    pub fn bind(path: &str) -> io::Result<Self> {
71        Ok(Self {
72            inner: PlatformListener::bind(path)?,
73        })
74    }
75
76    /// Accept the next connection.
77    ///
78    /// Returns an `IpcFace` tagged [`FaceKind::Management`].
79    pub async fn accept(&self, face_id: FaceId) -> io::Result<IpcFace> {
80        let (r, w, uri) = self.inner.accept().await?;
81        Ok(make_face(face_id, FaceKind::Management, uri, r, w))
82    }
83
84    /// Remove the socket file (Unix) or perform platform cleanup.
85    pub fn cleanup(&self) {
86        self.inner.cleanup();
87    }
88
89    /// Human-readable URI for logging (e.g. `unix:///tmp/ndn.sock`).
90    pub fn uri(&self) -> &str {
91        self.inner.uri()
92    }
93}
94
95// ─── Client connect ──────────────────────────────────────────────────────────
96
97/// Connect to the IPC socket at `path` and return an [`IpcFace`].
98///
99/// On Unix, `path` is a filesystem path to a Unix domain socket.
100/// On Windows, `path` is a named pipe path such as `\\.\pipe\ndn`.
101pub async fn ipc_face_connect(id: FaceId, path: &str) -> io::Result<IpcFace> {
102    let (r, w, uri) = platform_connect(path).await?;
103    Ok(make_face(id, FaceKind::Unix, uri, r, w))
104}
105
106// ─── Unix implementation ─────────────────────────────────────────────────────
107
108#[cfg(unix)]
109struct PlatformListener {
110    listener: tokio::net::UnixListener,
111    path: String,
112}
113
114#[cfg(unix)]
115impl PlatformListener {
116    fn bind(path: &str) -> io::Result<Self> {
117        let _ = std::fs::remove_file(path);
118        let listener = tokio::net::UnixListener::bind(path)?;
119        Ok(Self {
120            listener,
121            path: path.to_owned(),
122        })
123    }
124
125    async fn accept(&self) -> io::Result<(DynRead, DynWrite, String)> {
126        let (stream, _) = self.listener.accept().await?;
127        let (r, w) = stream.into_split();
128        let uri = format!("unix://{}", self.path);
129        Ok((Box::new(r), Box::new(w), uri))
130    }
131
132    fn cleanup(&self) {
133        let _ = std::fs::remove_file(&self.path);
134    }
135
136    fn uri(&self) -> &str {
137        &self.path
138    }
139}
140
141#[cfg(unix)]
142async fn platform_connect(path: &str) -> io::Result<(DynRead, DynWrite, String)> {
143    let stream = tokio::net::UnixStream::connect(path).await?;
144    let (r, w) = stream.into_split();
145    let uri = format!("unix://{path}");
146    Ok((Box::new(r), Box::new(w), uri))
147}
148
149// ─── Windows Named Pipe implementation ───────────────────────────────────────
150
151#[cfg(windows)]
152struct PlatformListener {
153    path: String,
154    /// True until the first accept() call — creates the pipe with
155    /// FILE_FLAG_FIRST_PIPE_INSTANCE so only one process can own this name.
156    first: std::sync::atomic::AtomicBool,
157}
158
159#[cfg(windows)]
160impl PlatformListener {
161    fn bind(path: &str) -> io::Result<Self> {
162        if !path.starts_with(r"\\.\pipe\") {
163            return Err(io::Error::new(
164                io::ErrorKind::InvalidInput,
165                format!(
166                    "Windows IPC path must start with \\\\.\\pipe\\ (got {path:?}). \
167                     Use e.g. \\\\.\\pipe\ndn"
168                ),
169            ));
170        }
171        Ok(Self {
172            path: path.to_owned(),
173            first: std::sync::atomic::AtomicBool::new(true),
174        })
175    }
176
177    async fn accept(&self) -> io::Result<(DynRead, DynWrite, String)> {
178        use std::sync::atomic::Ordering;
179        use tokio::net::windows::named_pipe::ServerOptions;
180
181        // first_pipe_instance(true) on the very first server instance ensures
182        // only one process can own this pipe name (prevents hijacking).
183        let first = self.first.swap(false, Ordering::AcqRel);
184        let server = ServerOptions::new()
185            .first_pipe_instance(first)
186            .access_inbound(true)
187            .access_outbound(true)
188            .create(&self.path)?;
189
190        server.connect().await?;
191
192        let (r, w) = tokio::io::split(server);
193        let uri = format!("pipe://{}", self.path);
194        Ok((Box::new(r), Box::new(w), uri))
195    }
196
197    fn cleanup(&self) {
198        // Named pipes are cleaned up automatically when all handles are closed.
199    }
200
201    fn uri(&self) -> &str {
202        &self.path
203    }
204}
205
206#[cfg(windows)]
207async fn platform_connect(path: &str) -> io::Result<(DynRead, DynWrite, String)> {
208    use tokio::net::windows::named_pipe::ClientOptions;
209
210    // Named pipe client open is synchronous on Windows.  ERROR_PIPE_BUSY (231)
211    // means all server instances are currently handling a connection — retry.
212    let client = loop {
213        match ClientOptions::new().open(path) {
214            Ok(c) => break c,
215            Err(e) if e.raw_os_error() == Some(231) => {
216                // All server instances busy; wait briefly and retry.
217                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
218            }
219            Err(e) => return Err(e),
220        }
221    };
222
223    let (r, w) = tokio::io::split(client);
224    let uri = format!("pipe://{path}");
225    Ok((Box::new(r), Box::new(w), uri))
226}
227
228// ─── Unsupported platforms ────────────────────────────────────────────────────
229
230#[cfg(not(any(unix, windows)))]
231compile_error!(
232    "ndn-face-local IPC transport requires Unix domain sockets (unix) \
233     or Windows Named Pipes (windows)"
234);