ndn_face_local/shm/
spsc.rs

1//! Custom SPSC shared-memory face (Unix only, `spsc-shm` feature).
2//!
3//! A named POSIX SHM region holds two lock-free single-producer/single-consumer
4//! ring buffers — one for each direction.  Wakeup uses a named FIFO (pipe)
5//! pair on all platforms: the engine creates two FIFOs
6//! (`/tmp/.ndn-{name}.a2e.pipe` and `.e2a.pipe`); both sides open them
7//! `O_RDWR | O_NONBLOCK` (avoids the blocking-open problem).  The consumer
8//! awaits readability via `tokio::io::unix::AsyncFd`; the producer writes 1
9//! non-blocking byte.  The parked flag in SHM is still used to avoid
10//! unnecessary pipe writes when the consumer is active.
11//!
12//! This design integrates directly into Tokio's epoll/kqueue loop with zero
13//! thread transitions, unlike the previous Linux futex + `spawn_blocking`
14//! approach which routed every park through Tokio's blocking thread pool.
15//!
16//! # SHM layout
17//!
18//! ```text
19//! Cache line 0 (off   0–63):  magic u64 | capacity u32 | slot_size u32 | pad
20//! Cache line 1 (off  64–127): a2e_tail AtomicU32  — app writes, engine reads
21//! Cache line 2 (off 128–191): a2e_head AtomicU32  — engine writes, app reads
22//! Cache line 3 (off 192–255): e2a_tail AtomicU32  — engine writes, app reads
23//! Cache line 4 (off 256–319): e2a_head AtomicU32  — app writes, engine reads
24//! Cache line 5 (off 320–383): a2e_parked AtomicU32 — set by engine before sleeping on a2e ring
25//! Cache line 6 (off 384–447): e2a_parked AtomicU32 — set by app before sleeping on e2a ring
26//! Data block (off 448–N):     a2e ring (capacity × slot_stride bytes)
27//! Data block (off N–end):     e2a ring (capacity × slot_stride bytes)
28//!   slot_stride = 4 (length prefix) + slot_size (payload area)
29//! ```
30//!
31//! # Conditional wakeup protocol
32//!
33//! The producer checks the parked flag with `SeqCst` after writing to the
34//! ring; the consumer stores the parked flag with `SeqCst` before its second
35//! ring check.  This total-order guarantee prevents the producer from missing
36//! a sleeping consumer.
37use std::ffi::CString;
38use std::path::PathBuf;
39use std::sync::atomic::{AtomicU32, Ordering};
40
41use bytes::Bytes;
42
43use ndn_transport::{Face, FaceError, FaceId, FaceKind};
44
45use crate::shm::ShmError;
46
47// ─── Named FIFO wakeup helpers ───────────────────────────────────────────────
48//
49// Both Linux and macOS use named FIFOs (pipes) for cross-process wakeup,
50// wrapped in Tokio's AsyncFd for zero-thread-transition async integration.
51// The previous Linux path used futex + spawn_blocking which routed every
52// park through Tokio's blocking thread pool — expensive at 100K+ pkt/s
53// and responsible for the 2.5× throughput gap vs macOS.
54
55// ─── Constants ───────────────────────────────────────────────────────────────
56
57const MAGIC: u64 = 0x4E44_4E5F_5348_4D00; // b"NDN_SHM\0"
58
59/// Default number of slots per ring.
60pub const DEFAULT_CAPACITY: u32 = 256;
61/// Default slot payload size in bytes (~8.75 KiB, covers typical NDN packets).
62pub const DEFAULT_SLOT_SIZE: u32 = 8960;
63
64// Cache-line–aligned offsets for the four ring index atomics.
65const OFF_A2E_TAIL: usize = 64; // app writes (producer)
66const OFF_A2E_HEAD: usize = 128; // engine writes (consumer)
67const OFF_E2A_TAIL: usize = 192; // engine writes (producer)
68const OFF_E2A_HEAD: usize = 256; // app writes (consumer)
69// Parked flags: consumer sets to 1 before sleeping, clears on wake.
70const OFF_A2E_PARKED: usize = 320; // engine (a2e consumer) parked flag
71const OFF_E2A_PARKED: usize = 384; // app (e2a consumer) parked flag
72const HEADER_SIZE: usize = 448; // 7 × 64-byte cache lines
73
74fn slot_stride(slot_size: u32) -> usize {
75    4 + slot_size as usize
76}
77
78/// Number of spin-loop iterations before falling through to the pipe
79/// wakeup path.  64 iterations ≈ sub-µs
80/// on modern hardware — enough to catch back-to-back packets without
81/// causing thermal throttling from sustained spinning across multiple faces.
82const SPIN_ITERS: u32 = 64;
83
84fn shm_total_size(capacity: u32, slot_size: u32) -> usize {
85    HEADER_SIZE + 2 * capacity as usize * slot_stride(slot_size)
86}
87
88fn a2e_ring_offset() -> usize {
89    HEADER_SIZE
90}
91fn e2a_ring_offset(capacity: u32, slot_size: u32) -> usize {
92    HEADER_SIZE + capacity as usize * slot_stride(slot_size)
93}
94
95// ─── Path helpers ─────────────────────────────────────────────────────────────
96
97fn posix_shm_name(name: &str) -> String {
98    format!("/ndn-shm-{name}")
99}
100
101/// Path of the FIFO the *engine* reads from (app writes to wake engine).
102fn a2e_pipe_path(name: &str) -> PathBuf {
103    PathBuf::from(format!("/tmp/.ndn-{name}.a2e.pipe"))
104}
105
106/// Path of the FIFO the *app* reads from (engine writes to wake app).
107fn e2a_pipe_path(name: &str) -> PathBuf {
108    PathBuf::from(format!("/tmp/.ndn-{name}.e2a.pipe"))
109}
110
111// ─── POSIX SHM region ────────────────────────────────────────────────────────
112
113/// Owns a POSIX SHM mapping. The creator unlinks the name on drop.
114struct ShmRegion {
115    ptr: *mut u8,
116    size: usize,
117    /// Present when this process created the region; drives shm_unlink on drop.
118    shm_name: Option<CString>,
119}
120
121unsafe impl Send for ShmRegion {}
122unsafe impl Sync for ShmRegion {}
123
124impl ShmRegion {
125    /// Create and zero-initialise a new named SHM region.
126    fn create(shm_name: &str, size: usize) -> Result<Self, ShmError> {
127        let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
128        let ptr = unsafe {
129            let fd = libc::shm_open(
130                cname.as_ptr(),
131                libc::O_CREAT | libc::O_RDWR | libc::O_TRUNC,
132                // 0o666: readable/writable by all users so an unprivileged app
133                // can connect to a router running as root.  The SHM name is
134                // unique per app instance, limiting exposure.
135                0o666 as libc::mode_t as libc::c_uint,
136            );
137            if fd == -1 {
138                return Err(ShmError::Io(std::io::Error::last_os_error()));
139            }
140
141            if libc::ftruncate(fd, size as libc::off_t) == -1 {
142                libc::close(fd);
143                return Err(ShmError::Io(std::io::Error::last_os_error()));
144            }
145
146            let p = libc::mmap(
147                std::ptr::null_mut(),
148                size,
149                libc::PROT_READ | libc::PROT_WRITE,
150                libc::MAP_SHARED,
151                fd,
152                0,
153            );
154            libc::close(fd);
155            if p == libc::MAP_FAILED {
156                return Err(ShmError::Io(std::io::Error::last_os_error()));
157            }
158            p as *mut u8
159        };
160        Ok(ShmRegion {
161            ptr,
162            size,
163            shm_name: Some(cname),
164        })
165    }
166
167    /// Open an existing named SHM region created by `ShmRegion::create`.
168    fn open(shm_name: &str, size: usize) -> Result<Self, ShmError> {
169        let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
170        let ptr = unsafe {
171            let fd = libc::shm_open(cname.as_ptr(), libc::O_RDWR, 0);
172            if fd == -1 {
173                return Err(ShmError::Io(std::io::Error::last_os_error()));
174            }
175
176            let p = libc::mmap(
177                std::ptr::null_mut(),
178                size,
179                libc::PROT_READ | libc::PROT_WRITE,
180                libc::MAP_SHARED,
181                fd,
182                0,
183            );
184            libc::close(fd);
185            if p == libc::MAP_FAILED {
186                return Err(ShmError::Io(std::io::Error::last_os_error()));
187            }
188            p as *mut u8
189        };
190        Ok(ShmRegion {
191            ptr,
192            size,
193            shm_name: None,
194        })
195    }
196
197    fn as_ptr(&self) -> *mut u8 {
198        self.ptr
199    }
200
201    /// Write the header fields (magic, capacity, slot_size).
202    ///
203    /// # Safety
204    /// Must be called exactly once immediately after `create()`, before any
205    /// other process opens the region.
206    unsafe fn write_header(&self, capacity: u32, slot_size: u32) {
207        unsafe {
208            (self.ptr as *mut u64).write_unaligned(MAGIC);
209            (self.ptr.add(8) as *mut u32).write_unaligned(capacity);
210            (self.ptr.add(12) as *mut u32).write_unaligned(slot_size);
211        }
212        // Ring indices start at zero (mmap of new ftruncated fd is zero-initialised).
213    }
214
215    /// Read and validate the header. Returns `(capacity, slot_size)`.
216    ///
217    /// # Safety
218    /// The region must have been initialised by `write_header`.
219    unsafe fn read_header(&self) -> Result<(u32, u32), ShmError> {
220        unsafe {
221            let magic = (self.ptr as *const u64).read_unaligned();
222            if magic != MAGIC {
223                return Err(ShmError::InvalidMagic);
224            }
225            let capacity = (self.ptr.add(8) as *const u32).read_unaligned();
226            let slot_size = (self.ptr.add(12) as *const u32).read_unaligned();
227            Ok((capacity, slot_size))
228        }
229    }
230}
231
232impl Drop for ShmRegion {
233    fn drop(&mut self) {
234        unsafe {
235            libc::munmap(self.ptr as *mut libc::c_void, self.size);
236            if let Some(ref n) = self.shm_name {
237                libc::shm_unlink(n.as_ptr());
238            }
239        }
240    }
241}
242
243// ─── SPSC ring operations ─────────────────────────────────────────────────────
244
245/// Push `data` into the ring at [`ring_off`] using the tail at [`tail_off`] and
246/// head at [`head_off`]. Returns `false` if the ring is full.
247///
248/// # Safety
249/// `base` must be a valid, exclusively-written SHM mapping of sufficient size.
250/// `data.len() <= slot_size` must hold.
251unsafe fn ring_push(
252    base: *mut u8,
253    ring_off: usize,
254    tail_off: usize,
255    head_off: usize,
256    capacity: u32,
257    slot_size: u32,
258    data: &[u8],
259) -> bool {
260    debug_assert!(data.len() <= slot_size as usize);
261
262    let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
263    let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
264
265    let t = tail_a.load(Ordering::Relaxed);
266    let h = head_a.load(Ordering::Acquire);
267    if t.wrapping_sub(h) >= capacity {
268        return false;
269    }
270
271    let idx = (t % capacity) as usize;
272    let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
273
274    unsafe {
275        (slot as *mut u32).write_unaligned(data.len() as u32);
276        std::ptr::copy_nonoverlapping(data.as_ptr(), slot.add(4), data.len());
277    }
278    tail_a.store(t.wrapping_add(1), Ordering::Release);
279    true
280}
281
282/// Pop one packet from the ring. Returns `None` if empty.
283///
284/// # Safety
285/// Same as [`ring_push`].
286unsafe fn ring_pop(
287    base: *mut u8,
288    ring_off: usize,
289    tail_off: usize,
290    head_off: usize,
291    capacity: u32,
292    slot_size: u32,
293) -> Option<Bytes> {
294    let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
295    let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
296
297    let h = head_a.load(Ordering::Relaxed);
298    let t = tail_a.load(Ordering::Acquire);
299    if h == t {
300        return None;
301    }
302
303    let idx = (h % capacity) as usize;
304    let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
305
306    let len = unsafe { (slot as *const u32).read_unaligned() as usize };
307    // Clamp to prevent out-of-bounds read if SHM is corrupted.
308    let len = len.min(slot_size as usize);
309    let data = unsafe { Bytes::copy_from_slice(std::slice::from_raw_parts(slot.add(4), len)) };
310
311    head_a.store(h.wrapping_add(1), Ordering::Release);
312    Some(data)
313}
314
315// ─── FIFO wakeup helpers ─────────────────────────────────────────────────────
316
317/// Open a named FIFO (must already exist) with `O_RDWR | O_NONBLOCK`.
318///
319/// `O_RDWR` avoids the blocking-open problem: the open succeeds immediately
320/// even if the other end has not yet opened the FIFO.  Both sides only use
321/// the fd in the direction they own (reads or writes), so no cross-reading
322/// occurs.
323fn open_fifo_rdwr(path: &std::path::Path) -> Result<std::os::unix::io::OwnedFd, ShmError> {
324    use std::os::unix::io::{FromRawFd, OwnedFd};
325    let cpath = CString::new(path.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
326    let fd = unsafe { libc::open(cpath.as_ptr(), libc::O_RDWR | libc::O_NONBLOCK) };
327    if fd == -1 {
328        return Err(ShmError::Io(std::io::Error::last_os_error()));
329    }
330    Ok(unsafe { OwnedFd::from_raw_fd(fd) })
331}
332
333/// Await readability on the pipe fd, then drain all buffered bytes.
334///
335/// Returns `Err` on EOF (peer died) or any I/O error.
336async fn pipe_await(
337    rx: &tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
338) -> std::io::Result<()> {
339    use std::os::unix::io::AsRawFd;
340    loop {
341        let mut guard = rx.readable().await?;
342        let mut buf = [0u8; 64];
343        let fd = rx.get_ref().as_raw_fd();
344        let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
345        guard.clear_ready();
346        if n > 0 {
347            return Ok(());
348        }
349        if n == 0 {
350            // EOF — peer closed their end.
351            return Err(std::io::Error::new(
352                std::io::ErrorKind::UnexpectedEof,
353                "SHM wakeup pipe closed (peer died)",
354            ));
355        }
356        if n == -1 {
357            let err = std::io::Error::last_os_error();
358            if err.kind() != std::io::ErrorKind::WouldBlock {
359                return Err(err);
360            }
361        }
362    }
363}
364
365/// Write one wakeup byte to a non-blocking pipe fd.
366///
367/// Silently ignores `EAGAIN` (pipe buffer full): if the buffer is full the
368/// consumer is already being woken by a previous byte.
369fn pipe_write(tx: &std::os::unix::io::OwnedFd) {
370    use std::os::unix::io::AsRawFd;
371    let b = [1u8];
372    unsafe {
373        libc::write(tx.as_raw_fd(), b.as_ptr() as *const libc::c_void, 1);
374    }
375}
376
377// ─── SpscFace (engine side) ───────────────────────────────────────────────────
378
379/// Engine-side SPSC SHM face.
380///
381/// Create with [`SpscFace::create`]; register with the engine via
382/// `ForwarderEngine::add_face`. Give the `name` to the application so it can
383/// call [`SpscHandle::connect`].
384pub struct SpscFace {
385    id: FaceId,
386    shm: ShmRegion,
387    capacity: u32,
388    slot_size: u32,
389    a2e_off: usize,
390    e2a_off: usize,
391    /// FIFO the engine awaits readability on (app writes here to wake engine).
392    a2e_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
393    /// FIFO the engine writes to (to wake the app).
394    e2a_tx: std::os::unix::io::OwnedFd,
395    /// Paths of the FIFOs created by the engine — removed on drop.
396    a2e_pipe_path: PathBuf,
397    e2a_pipe_path: PathBuf,
398}
399
400impl SpscFace {
401    /// Create the SHM region and set up the wakeup mechanism.
402    ///
403    /// `name` identifies this face (e.g. `"sensor-0"`); pass it to
404    /// [`SpscHandle::connect`] in the application process.
405    pub fn create(id: FaceId, name: &str) -> Result<Self, ShmError> {
406        Self::create_with(id, name, DEFAULT_CAPACITY, DEFAULT_SLOT_SIZE)
407    }
408
409    /// Create with explicit ring parameters.
410    pub fn create_with(
411        id: FaceId,
412        name: &str,
413        capacity: u32,
414        slot_size: u32,
415    ) -> Result<Self, ShmError> {
416        let size = shm_total_size(capacity, slot_size);
417        let shm = ShmRegion::create(&posix_shm_name(name), size)?;
418        unsafe {
419            shm.write_header(capacity, slot_size);
420        }
421
422        let a2e_off = a2e_ring_offset();
423        let e2a_off = e2a_ring_offset(capacity, slot_size);
424
425        use tokio::io::unix::AsyncFd;
426
427        let a2e_path = a2e_pipe_path(name);
428        let e2a_path = e2a_pipe_path(name);
429
430        // Remove stale FIFOs from a previous run.
431        let _ = std::fs::remove_file(&a2e_path);
432        let _ = std::fs::remove_file(&e2a_path);
433
434        // Create the named FIFOs.
435        for p in [&a2e_path, &e2a_path] {
436            let cp = CString::new(p.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
437            if unsafe { libc::mkfifo(cp.as_ptr(), 0o600) } == -1 {
438                return Err(ShmError::Io(std::io::Error::last_os_error()));
439            }
440        }
441
442        // Engine reads from a2e (awaits wakeup from app).
443        let a2e_fd = open_fifo_rdwr(&a2e_path)?;
444        let a2e_rx = AsyncFd::new(a2e_fd).map_err(ShmError::Io)?;
445
446        // Engine writes to e2a (sends wakeup to app).
447        let e2a_tx = open_fifo_rdwr(&e2a_path)?;
448
449        Ok(SpscFace {
450            id,
451            shm,
452            capacity,
453            slot_size,
454            a2e_off,
455            e2a_off,
456            a2e_rx,
457            e2a_tx,
458            a2e_pipe_path: a2e_path,
459            e2a_pipe_path: e2a_path,
460        })
461    }
462
463    fn try_pop_a2e(&self) -> Option<Bytes> {
464        unsafe {
465            ring_pop(
466                self.shm.as_ptr(),
467                self.a2e_off,
468                OFF_A2E_TAIL,
469                OFF_A2E_HEAD,
470                self.capacity,
471                self.slot_size,
472            )
473        }
474    }
475
476    fn try_push_e2a(&self, data: &[u8]) -> bool {
477        unsafe {
478            ring_push(
479                self.shm.as_ptr(),
480                self.e2a_off,
481                OFF_E2A_TAIL,
482                OFF_E2A_HEAD,
483                self.capacity,
484                self.slot_size,
485                data,
486            )
487        }
488    }
489}
490
491impl Drop for SpscFace {
492    fn drop(&mut self) {
493        let _ = std::fs::remove_file(&self.a2e_pipe_path);
494        let _ = std::fs::remove_file(&self.e2a_pipe_path);
495    }
496}
497
498impl Face for SpscFace {
499    fn id(&self) -> FaceId {
500        self.id
501    }
502    fn kind(&self) -> FaceKind {
503        FaceKind::Shm
504    }
505
506    async fn recv(&self) -> Result<Bytes, FaceError> {
507        // SAFETY: parked flag is within the mapped SHM region.
508        let parked =
509            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
510        loop {
511            if let Some(pkt) = self.try_pop_a2e() {
512                return Ok(pkt);
513            }
514            // Spin before parking — avoids expensive pipe syscall
515            // when packets arrive within microseconds of each other.
516            for _ in 0..SPIN_ITERS {
517                std::hint::spin_loop();
518                if let Some(pkt) = self.try_pop_a2e() {
519                    return Ok(pkt);
520                }
521            }
522            // Announce intent to sleep with SeqCst so the app's next SeqCst
523            // load on the parked flag observes this before or after it pushes
524            // to the ring — never concurrently missed.
525            parked.store(1, Ordering::SeqCst);
526            // Second ring check: if the app already pushed between our first
527            // check and the flag store, we see it here and avoid sleeping.
528            if let Some(pkt) = self.try_pop_a2e() {
529                parked.store(0, Ordering::Relaxed);
530                return Ok(pkt);
531            }
532
533            // Sleep until the app sends a wakeup via the FIFO.
534            pipe_await(&self.a2e_rx)
535                .await
536                .map_err(|_| FaceError::Closed)?;
537
538            parked.store(0, Ordering::Relaxed);
539        }
540    }
541
542    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
543        if pkt.len() > self.slot_size as usize {
544            return Err(FaceError::Io(std::io::Error::new(
545                std::io::ErrorKind::InvalidInput,
546                "packet exceeds SHM slot size",
547            )));
548        }
549        // SAFETY: parked flag within mapped SHM region.
550        let parked =
551            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
552        // Yield until there is space in the e2a ring (backpressure).
553        loop {
554            if self.try_push_e2a(&pkt) {
555                break;
556            }
557            tokio::task::yield_now().await;
558        }
559        // Only send a wakeup if the app is actually sleeping.
560        if parked.load(Ordering::SeqCst) != 0 {
561            pipe_write(&self.e2a_tx);
562        }
563        Ok(())
564    }
565}
566
567// ─── SpscHandle (application side) ───────────────────────────────────────────
568
569/// Application-side SPSC SHM handle.
570///
571/// Connect with [`SpscHandle::connect`] using the same `name` passed to
572/// [`SpscFace::create`] in the engine process.
573///
574/// Set a `CancellationToken` via [`set_cancel`] to abort `recv`/`send` when
575/// the router's control face disconnects (the O_RDWR FIFO trick means EOF
576/// detection alone is unreliable).
577pub struct SpscHandle {
578    shm: ShmRegion,
579    capacity: u32,
580    slot_size: u32,
581    a2e_off: usize,
582    e2a_off: usize,
583    /// FIFO the app awaits readability on (engine writes here to wake app).
584    e2a_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
585    /// FIFO the app writes to (to wake the engine).
586    a2e_tx: std::os::unix::io::OwnedFd,
587    /// Cancelled when the router control face dies.
588    cancel: tokio_util::sync::CancellationToken,
589}
590
591impl SpscHandle {
592    /// Open the SHM region created by the engine and set up the wakeup mechanism.
593    pub fn connect(name: &str) -> Result<Self, ShmError> {
594        let shm_name_str = posix_shm_name(name);
595        let cname = CString::new(shm_name_str.as_str()).map_err(|_| ShmError::InvalidName)?;
596
597        // Phase 1: open just the header to read capacity and slot_size.
598        let (capacity, slot_size) = unsafe {
599            let fd = libc::shm_open(cname.as_ptr(), libc::O_RDONLY, 0);
600            if fd == -1 {
601                return Err(ShmError::Io(std::io::Error::last_os_error()));
602            }
603            let p = libc::mmap(
604                std::ptr::null_mut(),
605                HEADER_SIZE,
606                libc::PROT_READ,
607                libc::MAP_SHARED,
608                fd,
609                0,
610            );
611            libc::close(fd);
612            if p == libc::MAP_FAILED {
613                return Err(ShmError::Io(std::io::Error::last_os_error()));
614            }
615            let base = p as *const u8;
616            let magic = (base as *const u64).read_unaligned();
617            if magic != MAGIC {
618                libc::munmap(p, HEADER_SIZE);
619                return Err(ShmError::InvalidMagic);
620            }
621            let cap = (base.add(8) as *const u32).read_unaligned();
622            let slen = (base.add(12) as *const u32).read_unaligned();
623            libc::munmap(p, HEADER_SIZE);
624            (cap, slen)
625        };
626
627        // Phase 2: open the full region read-write.
628        let size = shm_total_size(capacity, slot_size);
629        let shm = ShmRegion::open(&shm_name_str, size)?;
630        unsafe { shm.read_header()? };
631
632        let a2e_off = a2e_ring_offset();
633        let e2a_off = e2a_ring_offset(capacity, slot_size);
634
635        use tokio::io::unix::AsyncFd;
636
637        let a2e_path = a2e_pipe_path(name); // app writes here to wake engine
638        let e2a_path = e2a_pipe_path(name); // app reads here (engine wakes app)
639
640        // App writes to a2e FIFO (to wake engine).
641        let a2e_tx = open_fifo_rdwr(&a2e_path)?;
642
643        // App reads from e2a FIFO (awaits wakeup from engine).
644        let e2a_fd = open_fifo_rdwr(&e2a_path)?;
645        let e2a_rx = AsyncFd::new(e2a_fd).map_err(ShmError::Io)?;
646
647        Ok(SpscHandle {
648            shm,
649            capacity,
650            slot_size,
651            a2e_off,
652            e2a_off,
653            e2a_rx,
654            a2e_tx,
655            cancel: tokio_util::sync::CancellationToken::new(),
656        })
657    }
658
659    /// Attach a cancellation token (typically a child of the control face's
660    /// lifecycle token).  When cancelled, `recv()` returns `None` and `send()`
661    /// returns `Err`.
662    pub fn set_cancel(&mut self, cancel: tokio_util::sync::CancellationToken) {
663        self.cancel = cancel;
664    }
665
666    fn try_push_a2e(&self, data: &[u8]) -> bool {
667        unsafe {
668            ring_push(
669                self.shm.as_ptr(),
670                self.a2e_off,
671                OFF_A2E_TAIL,
672                OFF_A2E_HEAD,
673                self.capacity,
674                self.slot_size,
675                data,
676            )
677        }
678    }
679
680    fn try_pop_e2a(&self) -> Option<Bytes> {
681        unsafe {
682            ring_pop(
683                self.shm.as_ptr(),
684                self.e2a_off,
685                OFF_E2A_TAIL,
686                OFF_E2A_HEAD,
687                self.capacity,
688                self.slot_size,
689            )
690        }
691    }
692
693    /// Send a packet to the engine (enqueue in the a2e ring).
694    ///
695    /// Yields cooperatively if the ring is full (backpressure from the engine).
696    /// Returns `Err(Closed)` if the cancellation token fires (engine dead).
697    ///
698    /// Uses a wall-clock deadline so backpressure tolerance is independent
699    /// of system scheduling speed (the old yield-counter approach returned
700    /// `Closed` after ~100k yields ≈ 1s on fast machines, but could be much
701    /// shorter under heavy Tokio contention — falsely killing the caller).
702    pub async fn send(&self, pkt: Bytes) -> Result<(), ShmError> {
703        if self.cancel.is_cancelled() {
704            return Err(ShmError::Closed);
705        }
706        if pkt.len() > self.slot_size as usize {
707            return Err(ShmError::PacketTooLarge);
708        }
709        // SAFETY: parked flag within mapped SHM region.
710        let parked =
711            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
712        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
713        loop {
714            if self.try_push_a2e(&pkt) {
715                break;
716            }
717            if self.cancel.is_cancelled() {
718                return Err(ShmError::Closed);
719            }
720            if tokio::time::Instant::now() >= deadline {
721                return Err(ShmError::Closed);
722            }
723            tokio::task::yield_now().await;
724        }
725        // Only send a wakeup if the engine is sleeping on the a2e ring.
726        if parked.load(Ordering::SeqCst) != 0 {
727            pipe_write(&self.a2e_tx);
728        }
729        Ok(())
730    }
731
732    /// Receive a packet from the engine (dequeue from the e2a ring).
733    ///
734    /// Returns `None` when the engine face has been dropped or the
735    /// cancellation token fires.
736    pub async fn recv(&self) -> Option<Bytes> {
737        if self.cancel.is_cancelled() {
738            return None;
739        }
740        // SAFETY: parked flag within mapped SHM region.
741        let parked =
742            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
743        loop {
744            if let Some(pkt) = self.try_pop_e2a() {
745                return Some(pkt);
746            }
747            // Spin before parking — avoids expensive pipe syscall
748            // when packets arrive within microseconds of each other.
749            for _ in 0..SPIN_ITERS {
750                std::hint::spin_loop();
751                if let Some(pkt) = self.try_pop_e2a() {
752                    return Some(pkt);
753                }
754            }
755            parked.store(1, Ordering::SeqCst);
756            if let Some(pkt) = self.try_pop_e2a() {
757                parked.store(0, Ordering::Relaxed);
758                return Some(pkt);
759            }
760
761            // Wait for pipe wakeup or cancellation.  We rely on the
762            // CancellationToken (propagated from the control face) rather
763            // than timeouts — idle waits are legitimate (e.g. iperf server
764            // waiting for a client).
765            tokio::select! {
766                result = pipe_await(&self.e2a_rx) => {
767                    parked.store(0, Ordering::Relaxed);
768                    if result.is_err() { return None; }
769                }
770                _ = self.cancel.cancelled() => {
771                    parked.store(0, Ordering::Relaxed);
772                    return None;
773                }
774            }
775        }
776    }
777}
778
779// SpscHandle has no Drop impl: ShmRegion handles munmap, OwnedFd closes pipe
780// fds, and the FIFOs are created/removed by SpscFace (engine side).
781
782// ─── Tests ───────────────────────────────────────────────────────────────────
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787    use ndn_transport::Face;
788
789    fn test_name() -> String {
790        // Use PID to avoid collisions when tests run concurrently.
791        format!("test-spsc-{}", std::process::id())
792    }
793
794    // Tests use multi_thread because AsyncFd needs the runtime's I/O driver.
795    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
796    async fn face_kind_and_id() {
797        let name = test_name();
798        let face = SpscFace::create(FaceId(7), &name).unwrap();
799        assert_eq!(face.id(), FaceId(7));
800        assert_eq!(face.kind(), FaceKind::Shm);
801    }
802
803    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
804    async fn app_to_engine_roundtrip() {
805        let name = format!("{}-ae", test_name());
806        let face = SpscFace::create(FaceId(1), &name).unwrap();
807        let handle = SpscHandle::connect(&name).unwrap();
808
809        let pkt = Bytes::from_static(b"\x05\x03\x01\x02\x03");
810        handle.send(pkt.clone()).await.unwrap();
811
812        let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
813            .await
814            .expect("timed out")
815            .unwrap();
816
817        assert_eq!(received, pkt);
818    }
819
820    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
821    async fn engine_to_app_roundtrip() {
822        let name = format!("{}-ea", test_name());
823        let face = SpscFace::create(FaceId(2), &name).unwrap();
824        let handle = SpscHandle::connect(&name).unwrap();
825
826        let pkt = Bytes::from_static(b"\x06\x03\xAA\xBB\xCC");
827        face.send(pkt.clone()).await.unwrap();
828
829        let received = tokio::time::timeout(std::time::Duration::from_secs(2), handle.recv())
830            .await
831            .expect("timed out")
832            .unwrap();
833
834        assert_eq!(received, pkt);
835    }
836
837    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
838    async fn multiple_packets_both_directions() {
839        let name = format!("{}-bi", test_name());
840        let face = SpscFace::create(FaceId(3), &name).unwrap();
841        let handle = SpscHandle::connect(&name).unwrap();
842
843        // App → Engine: 4 packets
844        for i in 0u8..4 {
845            handle.send(Bytes::from(vec![i; 64])).await.unwrap();
846        }
847        for i in 0u8..4 {
848            let pkt = face.recv().await.unwrap();
849            assert_eq!(&pkt[..], &vec![i; 64][..]);
850        }
851
852        // Engine → App: 4 packets
853        for i in 0u8..4 {
854            face.send(Bytes::from(vec![i + 10; 128])).await.unwrap();
855        }
856        for i in 0u8..4 {
857            let pkt = handle.recv().await.unwrap();
858            assert_eq!(&pkt[..], &vec![i + 10; 128][..]);
859        }
860    }
861}