ndn_ipc/forwarder_client.rs
1/// App-side client for connecting to a running `ndn-fwd` forwarder.
2///
3/// `ForwarderClient` handles:
4/// - Connecting to the forwarder's face socket (UnixFace)
5/// - Optionally creating an SHM face for high-performance data plane
6/// - Registering/unregistering prefixes via NFD `rib/register`/`rib/unregister`
7/// - Sending and receiving NDN packets on the data plane
8///
9/// # Mobile (Android / iOS)
10///
11/// On mobile the forwarder runs in-process; there is no separate forwarder daemon
12/// to connect to. Use [`ndn_engine::ForwarderEngine`] in embedded mode with
13/// an [`ndn_faces::local::AppFace`] instead of `ForwarderClient`.
14///
15/// # Connection flow (SHM preferred)
16///
17/// ```text
18/// 1. Connect to /run/nfd/nfd.sock → UnixFace (control channel)
19/// 2. Send faces/create {Uri:"shm://myapp"} → get FaceId
20/// 3. ShmHandle::connect("myapp") → data plane ready
21/// 4. Send rib/register {Name:"/prefix", FaceId} → route installed
22/// 5. Send/recv packets over SHM
23/// ```
24///
25/// # Connection flow (Unix fallback)
26///
27/// ```text
28/// 1. Connect to /run/nfd/nfd.sock → UnixFace (control + data)
29/// 2. Send rib/register {Name:"/prefix"} → FaceId defaults to requesting face
30/// 3. Send/recv packets over same UnixFace
31/// ```
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
35
36use bytes::Bytes;
37use tokio::sync::Mutex;
38use tokio_util::sync::CancellationToken;
39
40use ndn_faces::local::IpcFace;
41use ndn_packet::Name;
42use ndn_packet::lp::encode_lp_packet;
43use ndn_transport::{Face, FaceId};
44
45/// Error type for `ForwarderClient` operations.
46#[derive(Debug, thiserror::Error)]
47pub enum ForwarderError {
48 #[error("I/O error: {0}")]
49 Io(#[from] std::io::Error),
50 #[error("face error: {0}")]
51 Face(#[from] ndn_transport::FaceError),
52 #[error("management command failed: {code} {text}")]
53 Command { code: u64, text: String },
54 #[error("malformed management response")]
55 MalformedResponse,
56 #[cfg(all(
57 unix,
58 not(any(target_os = "android", target_os = "ios")),
59 feature = "spsc-shm"
60 ))]
61 #[error("SHM error: {0}")]
62 Shm(#[from] ndn_faces::local::ShmError),
63}
64
65/// Data plane transport — either SHM (preferred) or reuse the control UnixFace.
66enum DataTransport {
67 /// High-performance shared-memory data plane.
68 #[cfg(all(
69 unix,
70 not(any(target_os = "android", target_os = "ios")),
71 feature = "spsc-shm"
72 ))]
73 Shm {
74 handle: ndn_faces::local::shm::spsc::SpscHandle,
75 face_id: u64,
76 },
77 /// Fallback: reuse the control UnixFace for data.
78 Unix,
79}
80
81/// Client for connecting to and communicating with a running `ndn-fwd` forwarder.
82pub struct ForwarderClient {
83 /// Control channel (Unix domain socket on Unix, Named Pipe on Windows).
84 control: Arc<IpcFace>,
85 /// Typed management API — shares the control face.
86 pub mgmt: crate::mgmt_client::MgmtClient,
87 /// Mutex for serialising recv on the control face (Unix data path).
88 recv_lock: Mutex<()>,
89 /// Data transport — SHM or reuse control face.
90 transport: DataTransport,
91 /// Cancelled when the router control face disconnects.
92 /// Propagates to SHM handle so recv/send abort promptly.
93 cancel: CancellationToken,
94 /// Set when the control face health monitor detects disconnection.
95 dead: Arc<AtomicBool>,
96 /// Guards single-start of the disconnect monitor (0 = not started, 1 = started).
97 monitor_started: AtomicU8,
98}
99
100impl ForwarderClient {
101 /// Connect to the router's face socket.
102 ///
103 /// Automatically attempts SHM data plane with an auto-generated name;
104 /// falls back to Unix socket if SHM is unavailable or fails.
105 pub async fn connect(face_socket: impl AsRef<Path>) -> Result<Self, ForwarderError> {
106 Self::connect_with_mtu(face_socket, None).await
107 }
108
109 /// Connect with an explicit MTU hint for the SHM data plane.
110 ///
111 /// `mtu` is passed to the router's `faces/create` so the SHM ring
112 /// is sized to carry Data packets whose content body can be up
113 /// to `mtu` bytes. Pass `None` to use the default slot size
114 /// (enough for ~256 KiB content bodies). Producers that plan to
115 /// emit larger segments — e.g. chunked transfers at 1 MiB per
116 /// segment — should pass `Some(chunk_size)` here.
117 pub async fn connect_with_mtu(
118 face_socket: impl AsRef<Path>,
119 mtu: Option<usize>,
120 ) -> Result<Self, ForwarderError> {
121 let auto_name = format!("app-{}-{}", std::process::id(), next_shm_id());
122 Self::connect_with_name(face_socket, Some(&auto_name), mtu).await
123 }
124
125 /// Connect using only the Unix socket for data (no SHM attempt).
126 pub async fn connect_unix_only(face_socket: impl AsRef<Path>) -> Result<Self, ForwarderError> {
127 Self::connect_with_name(face_socket, None, None).await
128 }
129
130 /// Connect with an explicit SHM name for the data plane.
131 ///
132 /// If `shm_name` is `Some`, creates an SHM face with that name.
133 /// If `None` or SHM creation fails, falls back to Unix-only mode.
134 /// `mtu` sizes the SHM ring slot for the expected max Data body.
135 pub async fn connect_with_name(
136 face_socket: impl AsRef<Path>,
137 shm_name: Option<&str>,
138 mtu: Option<usize>,
139 ) -> Result<Self, ForwarderError> {
140 let path = face_socket.as_ref().to_str().unwrap_or_default().to_owned();
141 let control = Arc::new(ndn_faces::local::ipc_face_connect(FaceId(0), &path).await?);
142 let cancel = CancellationToken::new();
143 let dead = Arc::new(AtomicBool::new(false));
144
145 // Try SHM data plane if a name is provided.
146 #[cfg(all(
147 unix,
148 not(any(target_os = "android", target_os = "ios")),
149 feature = "spsc-shm"
150 ))]
151 if let Some(name) = shm_name {
152 match Self::setup_shm(&control, name, mtu, cancel.child_token()).await {
153 Ok(transport) => {
154 let mgmt = crate::mgmt_client::MgmtClient::from_face(Arc::clone(&control));
155 return Ok(Self {
156 control,
157 mgmt,
158 recv_lock: Mutex::new(()),
159 transport,
160 cancel,
161 dead,
162 monitor_started: AtomicU8::new(0),
163 });
164 }
165 Err(e) => {
166 tracing::warn!(error = %e, "SHM setup failed, falling back to Unix");
167 }
168 }
169 }
170
171 let mgmt = crate::mgmt_client::MgmtClient::from_face(Arc::clone(&control));
172 Ok(Self {
173 control,
174 mgmt,
175 recv_lock: Mutex::new(()),
176 transport: DataTransport::Unix,
177 cancel,
178 dead,
179 monitor_started: AtomicU8::new(0),
180 })
181 }
182
183 /// Set up SHM data plane by sending `faces/create` to the router.
184 #[cfg(all(
185 unix,
186 not(any(target_os = "android", target_os = "ios")),
187 feature = "spsc-shm"
188 ))]
189 async fn setup_shm(
190 control: &Arc<IpcFace>,
191 shm_name: &str,
192 mtu: Option<usize>,
193 cancel: CancellationToken,
194 ) -> Result<DataTransport, ForwarderError> {
195 let mgmt = crate::mgmt_client::MgmtClient::from_face(Arc::clone(control));
196 let resp = mgmt
197 .face_create_with_mtu(&format!("shm://{shm_name}"), mtu.map(|m| m as u64))
198 .await?;
199 let face_id = resp.face_id.ok_or(ForwarderError::MalformedResponse)?;
200
201 // Connect the app-side SHM handle with cancellation from control face.
202 let mut handle = ndn_faces::local::shm::spsc::SpscHandle::connect(shm_name)?;
203 handle.set_cancel(cancel);
204
205 Ok(DataTransport::Shm { handle, face_id })
206 }
207
208 /// Register a prefix with the router via `rib/register`.
209 pub async fn register_prefix(&self, prefix: &Name) -> Result<(), ForwarderError> {
210 // In SHM mode, route traffic to the SHM face. In Unix mode pass None
211 // so the router uses the requesting face — passing 0 would create a
212 // FIB entry for a non-existent face, silently dropping all packets.
213 let face_id = self.shm_face_id();
214 let resp = self.mgmt.route_add(prefix, face_id, 0).await?;
215 tracing::debug!(
216 face_id = ?resp.face_id,
217 cost = ?resp.cost,
218 "rib/register succeeded"
219 );
220 Ok(())
221 }
222
223 /// Unregister a prefix from the router via `rib/unregister`.
224 pub async fn unregister_prefix(&self, prefix: &Name) -> Result<(), ForwarderError> {
225 let face_id = self.shm_face_id();
226 self.mgmt.route_remove(prefix, face_id).await?;
227 Ok(())
228 }
229
230 /// Gracefully tear down this client: cancel ongoing ops, destroy the SHM
231 /// face (if any) via `faces/destroy`, then close the control socket.
232 ///
233 /// Call this before dropping the client to ensure the router removes the
234 /// SHM face immediately rather than waiting for GC.
235 pub async fn close(self) {
236 self.cancel.cancel();
237 #[cfg(all(
238 unix,
239 not(any(target_os = "android", target_os = "ios")),
240 feature = "spsc-shm"
241 ))]
242 if let DataTransport::Shm { face_id, .. } = &self.transport {
243 let _ = self.mgmt.face_destroy(*face_id).await;
244 }
245 // Dropping self here closes the control socket.
246 }
247
248 /// Get the SHM face ID if using SHM transport.
249 fn shm_face_id(&self) -> Option<u64> {
250 #[cfg(all(
251 unix,
252 not(any(target_os = "android", target_os = "ios")),
253 feature = "spsc-shm"
254 ))]
255 if let DataTransport::Shm { face_id, .. } = &self.transport {
256 return Some(*face_id);
257 }
258 None
259 }
260
261 /// Send a packet on the data plane.
262 ///
263 /// On the Unix transport, packets are wrapped in a minimal NDNLPv2 LpPacket
264 /// before sending. External forwarders (yanfd/ndnd, NFD) always use LP
265 /// framing on their Unix socket faces and reject bare TLV packets;
266 /// `encode_lp_packet` is idempotent so already-wrapped packets pass through
267 /// unchanged. SHM transport does not use LP — the engine handles framing
268 /// internally.
269 pub async fn send(&self, pkt: Bytes) -> Result<(), ForwarderError> {
270 match &self.transport {
271 #[cfg(all(
272 unix,
273 not(any(target_os = "android", target_os = "ios")),
274 feature = "spsc-shm"
275 ))]
276 DataTransport::Shm { handle, .. } => {
277 handle.send(pkt).await.map_err(ForwarderError::Shm)
278 }
279 DataTransport::Unix => {
280 let wire = encode_lp_packet(&pkt);
281 self.control.send(wire).await.map_err(ForwarderError::Face)
282 }
283 }
284 }
285
286 /// Send multiple packets on the data plane in one synchronisation.
287 ///
288 /// On the SHM transport this goes through [`SpscHandle::send_batch`],
289 /// which publishes all `pkts` with a single atomic tail advance and
290 /// at most one wakeup — the primary reason this API exists. On the
291 /// Unix transport this is a plain loop over [`send`](Self::send);
292 /// the socket path has no equivalent batch primitive and per-packet
293 /// cost dominates anyway.
294 ///
295 /// A pipelined consumer filling a segmented-fetch window (e.g.
296 /// `ndn-peek`) should prefer `send_batch` over a loop of `send`
297 /// calls: it collapses the per-Interest scheduler round-trips into
298 /// a single ring transition and measurably reduces the small-
299 /// segment overhead (see `docs/notes/throughput-roadmap.md`).
300 pub async fn send_batch(&self, pkts: &[Bytes]) -> Result<(), ForwarderError> {
301 if pkts.is_empty() {
302 return Ok(());
303 }
304 match &self.transport {
305 #[cfg(all(
306 unix,
307 not(any(target_os = "android", target_os = "ios")),
308 feature = "spsc-shm"
309 ))]
310 DataTransport::Shm { handle, .. } => {
311 handle.send_batch(pkts).await.map_err(ForwarderError::Shm)
312 }
313 DataTransport::Unix => {
314 for pkt in pkts {
315 let wire = encode_lp_packet(pkt);
316 self.control
317 .send(wire)
318 .await
319 .map_err(ForwarderError::Face)?;
320 }
321 Ok(())
322 }
323 }
324 }
325
326 /// Receive a packet from the data plane.
327 ///
328 /// Returns `None` if the data channel is closed or the router has
329 /// disconnected. On the first call, automatically starts the disconnect
330 /// monitor (see [`ForwarderClient::spawn_disconnect_monitor`]) so that callers
331 /// do not need to start it explicitly.
332 pub async fn recv(&self) -> Option<Bytes> {
333 self.start_monitor_once();
334 match &self.transport {
335 #[cfg(all(
336 unix,
337 not(any(target_os = "android", target_os = "ios")),
338 feature = "spsc-shm"
339 ))]
340 DataTransport::Shm { handle, .. } => handle.recv().await,
341 DataTransport::Unix => {
342 let _guard = self.recv_lock.lock().await;
343 self.control.recv().await.ok().map(strip_lp)
344 }
345 }
346 }
347
348 /// Start the disconnect monitor the first time it is needed.
349 ///
350 /// In **SHM mode** the data plane reads from shared memory and does not
351 /// observe socket closure directly. This starts a background task that
352 /// drains the control socket (which is otherwise idle after setup) and
353 /// fires the internal [`CancellationToken`] when the socket closes.
354 ///
355 /// In **Unix mode** the data `recv()` already returns `None` on socket
356 /// closure, so no additional monitor is needed.
357 ///
358 /// Safe to call multiple times — only one monitor is ever started.
359 fn start_monitor_once(&self) {
360 if self
361 .monitor_started
362 .compare_exchange(0, 1, Ordering::AcqRel, Ordering::Relaxed)
363 .is_err()
364 {
365 return; // already started
366 }
367
368 #[cfg(all(
369 unix,
370 not(any(target_os = "android", target_os = "ios")),
371 feature = "spsc-shm"
372 ))]
373 if matches!(&self.transport, DataTransport::Shm { .. }) {
374 let control = Arc::clone(&self.control);
375 let cancel = self.cancel.clone();
376 let dead = Arc::clone(&self.dead);
377 tokio::spawn(async move {
378 // In SHM mode the control socket is used only for management
379 // commands. After setup, no traffic is expected on it. Any
380 // recv error means the socket was closed (router died).
381 // Stray successful reads (e.g. unsolicited router messages) are
382 // drained harmlessly; only errors trigger cancellation.
383 loop {
384 tokio::select! {
385 _ = cancel.cancelled() => break,
386 result = control.recv() => {
387 match result {
388 Ok(_) => {
389 // Stray data on control socket — drain it.
390 }
391 Err(_) => {
392 dead.store(true, Ordering::Relaxed);
393 cancel.cancel();
394 break;
395 }
396 }
397 }
398 }
399 }
400 });
401 }
402 }
403
404 /// Whether this client is using SHM for data transport.
405 pub fn is_shm(&self) -> bool {
406 #[cfg(all(
407 unix,
408 not(any(target_os = "android", target_os = "ios")),
409 feature = "spsc-shm"
410 ))]
411 if matches!(&self.transport, DataTransport::Shm { .. }) {
412 return true;
413 }
414 false
415 }
416
417 /// Whether the router connection has been lost.
418 pub fn is_dead(&self) -> bool {
419 self.dead.load(Ordering::Relaxed)
420 }
421
422 /// Explicitly start the disconnect monitor.
423 ///
424 /// This is called automatically on the first [`ForwarderClient::recv`] call,
425 /// so most applications do not need to call this directly.
426 ///
427 /// In **SHM mode** the monitor watches the control socket for closure
428 /// (no probes are sent; the control socket is idle after setup). In
429 /// **Unix mode** the data `recv()` already returns `None` on closure, so
430 /// this is a no-op.
431 ///
432 /// Safe to call multiple times — only one monitor is ever started.
433 pub fn spawn_disconnect_monitor(&self) {
434 self.start_monitor_once();
435 }
436
437 /// Check if the control face is still connected by attempting a
438 /// non-blocking management probe. Returns `true` if the router is alive.
439 ///
440 /// Called lazily by applications that detect SHM stalls.
441 pub async fn probe_alive(&self) -> bool {
442 if self.dead.load(Ordering::Relaxed) {
443 return false;
444 }
445 // Try sending a trivial Interest on the control face.
446 // If the socket is closed, send will fail immediately.
447 let probe = ndn_packet::encode::InterestBuilder::new("/localhost/nfd/status/general")
448 .sign_digest_sha256();
449 match self.control.send(probe).await {
450 Ok(_) => true,
451 Err(_) => {
452 self.dead.store(true, Ordering::Relaxed);
453 self.cancel.cancel();
454 false
455 }
456 }
457 }
458}
459
460impl Drop for ForwarderClient {
461 fn drop(&mut self) {
462 // Cancel the cancel token so the disconnect-monitor task (which holds
463 // a clone of Arc<IpcFace>) exits promptly. Once the task drops its
464 // clone, the Arc refcount reaches zero, the Unix socket is closed, and
465 // the router detects the disconnect → cleans up the SHM face.
466 self.cancel.cancel();
467 }
468}
469
470/// Process-local counter for auto-generated SHM names.
471fn next_shm_id() -> u32 {
472 static COUNTER: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
473 COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
474}
475
476/// Strip NDNLPv2 wrapper (type 0x64) if present.
477///
478/// External forwarders (yanfd, NFD) always wrap packets in LP framing on Unix
479/// socket faces. Unwrap the `Fragment` field and discard LP headers (PIT
480/// tokens, face IDs, congestion marks, etc.).
481///
482/// Nack LP packets (LP with a Nack header) are returned as-is — the caller
483/// will see the raw LP bytes (type 0x64) and handle them gracefully rather
484/// than mistaking the nacked Interest fragment (type 0x05) for a Data packet.
485///
486/// Returns the original bytes unchanged if the packet is not LP-wrapped.
487pub(crate) fn strip_lp(raw: Bytes) -> Bytes {
488 use ndn_packet::lp::{LpPacket, is_lp_packet};
489 if is_lp_packet(&raw)
490 && let Ok(lp) = LpPacket::decode(raw.clone())
491 {
492 // Do NOT strip Nack packets: the fragment is the nacked Interest
493 // (type 0x05), not Data. Return the raw LP bytes so callers
494 // receive a recognisable LP type (0x64) instead of an Interest.
495 if lp.nack.is_some() {
496 return raw;
497 }
498 if let Some(fragment) = lp.fragment {
499 return fragment;
500 }
501 }
502 raw
503}