ndn_ipc/
forwarder_client.rs

1/// App-side client for connecting to a running `ndn-fwd` forwarder.
2///
3/// `ForwarderClient` handles:
4/// - Connecting to the forwarder's face socket (UnixFace)
5/// - Optionally creating an SHM face for high-performance data plane
6/// - Registering/unregistering prefixes via NFD `rib/register`/`rib/unregister`
7/// - Sending and receiving NDN packets on the data plane
8///
9/// # Mobile (Android / iOS)
10///
11/// On mobile the forwarder runs in-process; there is no separate forwarder daemon
12/// to connect to.  Use [`ndn_engine::ForwarderEngine`] in embedded mode with
13/// an [`ndn_faces::local::AppFace`] instead of `ForwarderClient`.
14///
15/// # Connection flow (SHM preferred)
16///
17/// ```text
18/// 1. Connect to /run/nfd/nfd.sock → UnixFace (control channel)
19/// 2. Send faces/create {Uri:"shm://myapp"} → get FaceId
20/// 3. ShmHandle::connect("myapp") → data plane ready
21/// 4. Send rib/register {Name:"/prefix", FaceId} → route installed
22/// 5. Send/recv packets over SHM
23/// ```
24///
25/// # Connection flow (Unix fallback)
26///
27/// ```text
28/// 1. Connect to /run/nfd/nfd.sock → UnixFace (control + data)
29/// 2. Send rib/register {Name:"/prefix"} → FaceId defaults to requesting face
30/// 3. Send/recv packets over same UnixFace
31/// ```
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
35
36use bytes::Bytes;
37use tokio::sync::Mutex;
38use tokio_util::sync::CancellationToken;
39
40use ndn_faces::local::IpcFace;
41use ndn_packet::Name;
42use ndn_packet::lp::encode_lp_packet;
43use ndn_transport::{Face, FaceId};
44
45/// Error type for `ForwarderClient` operations.
46#[derive(Debug, thiserror::Error)]
47pub enum ForwarderError {
48    #[error("I/O error: {0}")]
49    Io(#[from] std::io::Error),
50    #[error("face error: {0}")]
51    Face(#[from] ndn_transport::FaceError),
52    #[error("management command failed: {code} {text}")]
53    Command { code: u64, text: String },
54    #[error("malformed management response")]
55    MalformedResponse,
56    #[cfg(all(
57        unix,
58        not(any(target_os = "android", target_os = "ios")),
59        feature = "spsc-shm"
60    ))]
61    #[error("SHM error: {0}")]
62    Shm(#[from] ndn_faces::local::ShmError),
63}
64
65/// Data plane transport — either SHM (preferred) or reuse the control UnixFace.
66enum DataTransport {
67    /// High-performance shared-memory data plane.
68    #[cfg(all(
69        unix,
70        not(any(target_os = "android", target_os = "ios")),
71        feature = "spsc-shm"
72    ))]
73    Shm {
74        handle: ndn_faces::local::shm::spsc::SpscHandle,
75        face_id: u64,
76    },
77    /// Fallback: reuse the control UnixFace for data.
78    Unix,
79}
80
81/// Client for connecting to and communicating with a running `ndn-fwd` forwarder.
82pub struct ForwarderClient {
83    /// Control channel (Unix domain socket on Unix, Named Pipe on Windows).
84    control: Arc<IpcFace>,
85    /// Typed management API — shares the control face.
86    pub mgmt: crate::mgmt_client::MgmtClient,
87    /// Mutex for serialising recv on the control face (Unix data path).
88    recv_lock: Mutex<()>,
89    /// Data transport — SHM or reuse control face.
90    transport: DataTransport,
91    /// Cancelled when the router control face disconnects.
92    /// Propagates to SHM handle so recv/send abort promptly.
93    cancel: CancellationToken,
94    /// Set when the control face health monitor detects disconnection.
95    dead: Arc<AtomicBool>,
96    /// Guards single-start of the disconnect monitor (0 = not started, 1 = started).
97    monitor_started: AtomicU8,
98}
99
100impl ForwarderClient {
101    /// Connect to the router's face socket.
102    ///
103    /// Automatically attempts SHM data plane with an auto-generated name;
104    /// falls back to Unix socket if SHM is unavailable or fails.
105    pub async fn connect(face_socket: impl AsRef<Path>) -> Result<Self, ForwarderError> {
106        Self::connect_with_mtu(face_socket, None).await
107    }
108
109    /// Connect with an explicit MTU hint for the SHM data plane.
110    ///
111    /// `mtu` is passed to the router's `faces/create` so the SHM ring
112    /// is sized to carry Data packets whose content body can be up
113    /// to `mtu` bytes. Pass `None` to use the default slot size
114    /// (enough for ~256 KiB content bodies). Producers that plan to
115    /// emit larger segments — e.g. chunked transfers at 1 MiB per
116    /// segment — should pass `Some(chunk_size)` here.
117    pub async fn connect_with_mtu(
118        face_socket: impl AsRef<Path>,
119        mtu: Option<usize>,
120    ) -> Result<Self, ForwarderError> {
121        let auto_name = format!("app-{}-{}", std::process::id(), next_shm_id());
122        Self::connect_with_name(face_socket, Some(&auto_name), mtu).await
123    }
124
125    /// Connect using only the Unix socket for data (no SHM attempt).
126    pub async fn connect_unix_only(face_socket: impl AsRef<Path>) -> Result<Self, ForwarderError> {
127        Self::connect_with_name(face_socket, None, None).await
128    }
129
130    /// Connect with an explicit SHM name for the data plane.
131    ///
132    /// If `shm_name` is `Some`, creates an SHM face with that name.
133    /// If `None` or SHM creation fails, falls back to Unix-only mode.
134    /// `mtu` sizes the SHM ring slot for the expected max Data body.
135    pub async fn connect_with_name(
136        face_socket: impl AsRef<Path>,
137        shm_name: Option<&str>,
138        mtu: Option<usize>,
139    ) -> Result<Self, ForwarderError> {
140        let path = face_socket.as_ref().to_str().unwrap_or_default().to_owned();
141        let control = Arc::new(ndn_faces::local::ipc_face_connect(FaceId(0), &path).await?);
142        let cancel = CancellationToken::new();
143        let dead = Arc::new(AtomicBool::new(false));
144
145        // Try SHM data plane if a name is provided.
146        #[cfg(all(
147            unix,
148            not(any(target_os = "android", target_os = "ios")),
149            feature = "spsc-shm"
150        ))]
151        if let Some(name) = shm_name {
152            match Self::setup_shm(&control, name, mtu, cancel.child_token()).await {
153                Ok(transport) => {
154                    let mgmt = crate::mgmt_client::MgmtClient::from_face(Arc::clone(&control));
155                    return Ok(Self {
156                        control,
157                        mgmt,
158                        recv_lock: Mutex::new(()),
159                        transport,
160                        cancel,
161                        dead,
162                        monitor_started: AtomicU8::new(0),
163                    });
164                }
165                Err(e) => {
166                    tracing::warn!(error = %e, "SHM setup failed, falling back to Unix");
167                }
168            }
169        }
170
171        let mgmt = crate::mgmt_client::MgmtClient::from_face(Arc::clone(&control));
172        Ok(Self {
173            control,
174            mgmt,
175            recv_lock: Mutex::new(()),
176            transport: DataTransport::Unix,
177            cancel,
178            dead,
179            monitor_started: AtomicU8::new(0),
180        })
181    }
182
183    /// Set up SHM data plane by sending `faces/create` to the router.
184    #[cfg(all(
185        unix,
186        not(any(target_os = "android", target_os = "ios")),
187        feature = "spsc-shm"
188    ))]
189    async fn setup_shm(
190        control: &Arc<IpcFace>,
191        shm_name: &str,
192        mtu: Option<usize>,
193        cancel: CancellationToken,
194    ) -> Result<DataTransport, ForwarderError> {
195        let mgmt = crate::mgmt_client::MgmtClient::from_face(Arc::clone(control));
196        let resp = mgmt
197            .face_create_with_mtu(&format!("shm://{shm_name}"), mtu.map(|m| m as u64))
198            .await?;
199        let face_id = resp.face_id.ok_or(ForwarderError::MalformedResponse)?;
200
201        // Connect the app-side SHM handle with cancellation from control face.
202        let mut handle = ndn_faces::local::shm::spsc::SpscHandle::connect(shm_name)?;
203        handle.set_cancel(cancel);
204
205        Ok(DataTransport::Shm { handle, face_id })
206    }
207
208    /// Register a prefix with the router via `rib/register`.
209    pub async fn register_prefix(&self, prefix: &Name) -> Result<(), ForwarderError> {
210        // In SHM mode, route traffic to the SHM face.  In Unix mode pass None
211        // so the router uses the requesting face — passing 0 would create a
212        // FIB entry for a non-existent face, silently dropping all packets.
213        let face_id = self.shm_face_id();
214        let resp = self.mgmt.route_add(prefix, face_id, 0).await?;
215        tracing::debug!(
216            face_id = ?resp.face_id,
217            cost = ?resp.cost,
218            "rib/register succeeded"
219        );
220        Ok(())
221    }
222
223    /// Unregister a prefix from the router via `rib/unregister`.
224    pub async fn unregister_prefix(&self, prefix: &Name) -> Result<(), ForwarderError> {
225        let face_id = self.shm_face_id();
226        self.mgmt.route_remove(prefix, face_id).await?;
227        Ok(())
228    }
229
230    /// Gracefully tear down this client: cancel ongoing ops, destroy the SHM
231    /// face (if any) via `faces/destroy`, then close the control socket.
232    ///
233    /// Call this before dropping the client to ensure the router removes the
234    /// SHM face immediately rather than waiting for GC.
235    pub async fn close(self) {
236        self.cancel.cancel();
237        #[cfg(all(
238            unix,
239            not(any(target_os = "android", target_os = "ios")),
240            feature = "spsc-shm"
241        ))]
242        if let DataTransport::Shm { face_id, .. } = &self.transport {
243            let _ = self.mgmt.face_destroy(*face_id).await;
244        }
245        // Dropping self here closes the control socket.
246    }
247
248    /// Get the SHM face ID if using SHM transport.
249    fn shm_face_id(&self) -> Option<u64> {
250        #[cfg(all(
251            unix,
252            not(any(target_os = "android", target_os = "ios")),
253            feature = "spsc-shm"
254        ))]
255        if let DataTransport::Shm { face_id, .. } = &self.transport {
256            return Some(*face_id);
257        }
258        None
259    }
260
261    /// Send a packet on the data plane.
262    ///
263    /// On the Unix transport, packets are wrapped in a minimal NDNLPv2 LpPacket
264    /// before sending.  External forwarders (yanfd/ndnd, NFD) always use LP
265    /// framing on their Unix socket faces and reject bare TLV packets;
266    /// `encode_lp_packet` is idempotent so already-wrapped packets pass through
267    /// unchanged.  SHM transport does not use LP — the engine handles framing
268    /// internally.
269    pub async fn send(&self, pkt: Bytes) -> Result<(), ForwarderError> {
270        match &self.transport {
271            #[cfg(all(
272                unix,
273                not(any(target_os = "android", target_os = "ios")),
274                feature = "spsc-shm"
275            ))]
276            DataTransport::Shm { handle, .. } => {
277                handle.send(pkt).await.map_err(ForwarderError::Shm)
278            }
279            DataTransport::Unix => {
280                let wire = encode_lp_packet(&pkt);
281                self.control.send(wire).await.map_err(ForwarderError::Face)
282            }
283        }
284    }
285
286    /// Send multiple packets on the data plane in one synchronisation.
287    ///
288    /// On the SHM transport this goes through [`SpscHandle::send_batch`],
289    /// which publishes all `pkts` with a single atomic tail advance and
290    /// at most one wakeup — the primary reason this API exists. On the
291    /// Unix transport this is a plain loop over [`send`](Self::send);
292    /// the socket path has no equivalent batch primitive and per-packet
293    /// cost dominates anyway.
294    ///
295    /// A pipelined consumer filling a segmented-fetch window (e.g.
296    /// `ndn-peek`) should prefer `send_batch` over a loop of `send`
297    /// calls: it collapses the per-Interest scheduler round-trips into
298    /// a single ring transition and measurably reduces the small-
299    /// segment overhead (see `docs/notes/throughput-roadmap.md`).
300    pub async fn send_batch(&self, pkts: &[Bytes]) -> Result<(), ForwarderError> {
301        if pkts.is_empty() {
302            return Ok(());
303        }
304        match &self.transport {
305            #[cfg(all(
306                unix,
307                not(any(target_os = "android", target_os = "ios")),
308                feature = "spsc-shm"
309            ))]
310            DataTransport::Shm { handle, .. } => {
311                handle.send_batch(pkts).await.map_err(ForwarderError::Shm)
312            }
313            DataTransport::Unix => {
314                for pkt in pkts {
315                    let wire = encode_lp_packet(pkt);
316                    self.control
317                        .send(wire)
318                        .await
319                        .map_err(ForwarderError::Face)?;
320                }
321                Ok(())
322            }
323        }
324    }
325
326    /// Receive a packet from the data plane.
327    ///
328    /// Returns `None` if the data channel is closed or the router has
329    /// disconnected.  On the first call, automatically starts the disconnect
330    /// monitor (see [`ForwarderClient::spawn_disconnect_monitor`]) so that callers
331    /// do not need to start it explicitly.
332    pub async fn recv(&self) -> Option<Bytes> {
333        self.start_monitor_once();
334        match &self.transport {
335            #[cfg(all(
336                unix,
337                not(any(target_os = "android", target_os = "ios")),
338                feature = "spsc-shm"
339            ))]
340            DataTransport::Shm { handle, .. } => handle.recv().await,
341            DataTransport::Unix => {
342                let _guard = self.recv_lock.lock().await;
343                self.control.recv().await.ok().map(strip_lp)
344            }
345        }
346    }
347
348    /// Start the disconnect monitor the first time it is needed.
349    ///
350    /// In **SHM mode** the data plane reads from shared memory and does not
351    /// observe socket closure directly.  This starts a background task that
352    /// drains the control socket (which is otherwise idle after setup) and
353    /// fires the internal [`CancellationToken`] when the socket closes.
354    ///
355    /// In **Unix mode** the data `recv()` already returns `None` on socket
356    /// closure, so no additional monitor is needed.
357    ///
358    /// Safe to call multiple times — only one monitor is ever started.
359    fn start_monitor_once(&self) {
360        if self
361            .monitor_started
362            .compare_exchange(0, 1, Ordering::AcqRel, Ordering::Relaxed)
363            .is_err()
364        {
365            return; // already started
366        }
367
368        #[cfg(all(
369            unix,
370            not(any(target_os = "android", target_os = "ios")),
371            feature = "spsc-shm"
372        ))]
373        if matches!(&self.transport, DataTransport::Shm { .. }) {
374            let control = Arc::clone(&self.control);
375            let cancel = self.cancel.clone();
376            let dead = Arc::clone(&self.dead);
377            tokio::spawn(async move {
378                // In SHM mode the control socket is used only for management
379                // commands.  After setup, no traffic is expected on it.  Any
380                // recv error means the socket was closed (router died).
381                // Stray successful reads (e.g. unsolicited router messages) are
382                // drained harmlessly; only errors trigger cancellation.
383                loop {
384                    tokio::select! {
385                        _ = cancel.cancelled() => break,
386                        result = control.recv() => {
387                            match result {
388                                Ok(_) => {
389                                    // Stray data on control socket — drain it.
390                                }
391                                Err(_) => {
392                                    dead.store(true, Ordering::Relaxed);
393                                    cancel.cancel();
394                                    break;
395                                }
396                            }
397                        }
398                    }
399                }
400            });
401        }
402    }
403
404    /// Whether this client is using SHM for data transport.
405    pub fn is_shm(&self) -> bool {
406        #[cfg(all(
407            unix,
408            not(any(target_os = "android", target_os = "ios")),
409            feature = "spsc-shm"
410        ))]
411        if matches!(&self.transport, DataTransport::Shm { .. }) {
412            return true;
413        }
414        false
415    }
416
417    /// Whether the router connection has been lost.
418    pub fn is_dead(&self) -> bool {
419        self.dead.load(Ordering::Relaxed)
420    }
421
422    /// Explicitly start the disconnect monitor.
423    ///
424    /// This is called automatically on the first [`ForwarderClient::recv`] call,
425    /// so most applications do not need to call this directly.
426    ///
427    /// In **SHM mode** the monitor watches the control socket for closure
428    /// (no probes are sent; the control socket is idle after setup).  In
429    /// **Unix mode** the data `recv()` already returns `None` on closure, so
430    /// this is a no-op.
431    ///
432    /// Safe to call multiple times — only one monitor is ever started.
433    pub fn spawn_disconnect_monitor(&self) {
434        self.start_monitor_once();
435    }
436
437    /// Check if the control face is still connected by attempting a
438    /// non-blocking management probe.  Returns `true` if the router is alive.
439    ///
440    /// Called lazily by applications that detect SHM stalls.
441    pub async fn probe_alive(&self) -> bool {
442        if self.dead.load(Ordering::Relaxed) {
443            return false;
444        }
445        // Try sending a trivial Interest on the control face.
446        // If the socket is closed, send will fail immediately.
447        let probe = ndn_packet::encode::InterestBuilder::new("/localhost/nfd/status/general")
448            .sign_digest_sha256();
449        match self.control.send(probe).await {
450            Ok(_) => true,
451            Err(_) => {
452                self.dead.store(true, Ordering::Relaxed);
453                self.cancel.cancel();
454                false
455            }
456        }
457    }
458}
459
460impl Drop for ForwarderClient {
461    fn drop(&mut self) {
462        // Cancel the cancel token so the disconnect-monitor task (which holds
463        // a clone of Arc<IpcFace>) exits promptly.  Once the task drops its
464        // clone, the Arc refcount reaches zero, the Unix socket is closed, and
465        // the router detects the disconnect → cleans up the SHM face.
466        self.cancel.cancel();
467    }
468}
469
470/// Process-local counter for auto-generated SHM names.
471fn next_shm_id() -> u32 {
472    static COUNTER: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
473    COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
474}
475
476/// Strip NDNLPv2 wrapper (type 0x64) if present.
477///
478/// External forwarders (yanfd, NFD) always wrap packets in LP framing on Unix
479/// socket faces.  Unwrap the `Fragment` field and discard LP headers (PIT
480/// tokens, face IDs, congestion marks, etc.).
481///
482/// Nack LP packets (LP with a Nack header) are returned as-is — the caller
483/// will see the raw LP bytes (type 0x64) and handle them gracefully rather
484/// than mistaking the nacked Interest fragment (type 0x05) for a Data packet.
485///
486/// Returns the original bytes unchanged if the packet is not LP-wrapped.
487pub(crate) fn strip_lp(raw: Bytes) -> Bytes {
488    use ndn_packet::lp::{LpPacket, is_lp_packet};
489    if is_lp_packet(&raw)
490        && let Ok(lp) = LpPacket::decode(raw.clone())
491    {
492        // Do NOT strip Nack packets: the fragment is the nacked Interest
493        // (type 0x05), not Data.  Return the raw LP bytes so callers
494        // receive a recognisable LP type (0x64) instead of an Interest.
495        if lp.nack.is_some() {
496            return raw;
497        }
498        if let Some(fragment) = lp.fragment {
499            return fragment;
500        }
501    }
502    raw
503}