ndn_faces/local/shm/
spsc.rs

1//! Custom SPSC shared-memory face (Unix only, `spsc-shm` feature).
2//!
3//! A named POSIX SHM region holds two lock-free single-producer/single-consumer
4//! ring buffers — one for each direction.  Wakeup uses a named FIFO (pipe)
5//! pair on all platforms: the engine creates two FIFOs
6//! (`/tmp/.ndn-{name}.a2e.pipe` and `.e2a.pipe`); both sides open them
7//! `O_RDWR | O_NONBLOCK` (avoids the blocking-open problem).  The consumer
8//! awaits readability via `tokio::io::unix::AsyncFd`; the producer writes 1
9//! non-blocking byte.  The parked flag in SHM is still used to avoid
10//! unnecessary pipe writes when the consumer is active.
11//!
12//! This design integrates directly into Tokio's epoll/kqueue loop with zero
13//! thread transitions, unlike the previous Linux futex + `spawn_blocking`
14//! approach which routed every park through Tokio's blocking thread pool.
15//!
16//! # SHM layout
17//!
18//! ```text
19//! Cache line 0 (off   0–63):  magic u64 | capacity u32 | slot_size u32 | pad
20//! Cache line 1 (off  64–127): a2e_tail AtomicU32  — app writes, engine reads
21//! Cache line 2 (off 128–191): a2e_head AtomicU32  — engine writes, app reads
22//! Cache line 3 (off 192–255): e2a_tail AtomicU32  — engine writes, app reads
23//! Cache line 4 (off 256–319): e2a_head AtomicU32  — app writes, engine reads
24//! Cache line 5 (off 320–383): a2e_parked AtomicU32 — set by engine before sleeping on a2e ring
25//! Cache line 6 (off 384–447): e2a_parked AtomicU32 — set by app before sleeping on e2a ring
26//! Data block (off 448–N):     a2e ring (capacity × slot_stride bytes)
27//! Data block (off N–end):     e2a ring (capacity × slot_stride bytes)
28//!   slot_stride = 4 (length prefix) + slot_size (payload area)
29//! ```
30//!
31//! # Conditional wakeup protocol
32//!
33//! The producer checks the parked flag with `SeqCst` after writing to the
34//! ring; the consumer stores the parked flag with `SeqCst` before its second
35//! ring check.  This total-order guarantee prevents the producer from missing
36//! a sleeping consumer.
37use std::ffi::CString;
38use std::path::PathBuf;
39use std::sync::atomic::{AtomicU32, Ordering};
40
41use bytes::Bytes;
42
43use ndn_transport::{Face, FaceError, FaceId, FaceKind};
44
45use crate::local::shm::ShmError;
46
47// ─── Named FIFO wakeup helpers ───────────────────────────────────────────────
48//
49// Both Linux and macOS use named FIFOs (pipes) for cross-process wakeup,
50// wrapped in Tokio's AsyncFd for zero-thread-transition async integration.
51// The previous Linux path used futex + spawn_blocking which routed every
52// park through Tokio's blocking thread pool — expensive at 100K+ pkt/s
53// and responsible for the 2.5× throughput gap vs macOS.
54
55// ─── Constants ───────────────────────────────────────────────────────────────
56
57const MAGIC: u64 = 0x4E44_4E5F_5348_4D00; // b"NDN_SHM\0"
58
59/// Default number of slots per ring.
60///
61/// Paired with [`DEFAULT_SLOT_SIZE`] so the total default ring memory
62/// is ~4.4 MiB per face (256 × 8960 × 2) — the same budget as the
63/// original v1 design. Tools that need larger Data packets (chunked
64/// producers at 64+ KiB segments) call `faces/create` with an explicit
65/// `mtu`, and the router creates a face with a proportionally larger
66/// slot_size and smaller capacity via [`capacity_for_slot`].
67pub const DEFAULT_CAPACITY: u32 = 256;
68
69/// Default slot payload size in bytes (~8.75 KiB). Covers a standard
70/// NDN Data packet (≤8800 bytes on most link types) with a small
71/// headroom margin. This is the right default for the majority of
72/// NDN traffic: Interests (~60–200 bytes), single-packet Data
73/// (~4–8 KiB content), iperf probes, ping, management, etc.
74///
75/// Tools that produce larger Data (chunked file transfers at 64 KiB,
76/// 256 KiB, or 1 MiB segments) negotiate a bigger slot via the `mtu`
77/// field of `faces/create` ControlParameters. `ndn-put` does this
78/// automatically from `--chunk-size`; `ndn-peek` accepts `--mtu`.
79pub const DEFAULT_SLOT_SIZE: u32 = 8960;
80
81/// Target SHM ring memory budget per face, in bytes. Used by
82/// [`capacity_for_slot`] to scale capacity inversely with slot_size
83/// so that faces with large slots (256 KiB for chunked producers)
84/// don't blow up memory, and faces with small slots (8960 for iperf)
85/// get a deep ring that stays in L2 cache.
86const SHM_BUDGET: usize = 2 * DEFAULT_CAPACITY as usize * slot_stride(DEFAULT_SLOT_SIZE);
87
88/// NDN Data packet wire overhead above the raw content bytes:
89/// Data TLV + Name + MetaInfo + SignatureInfo + SignatureValue. 16 KiB
90/// is generous enough to cover a Data whose content is `mtu` bytes of
91/// payload plus a long name and a large signature (Ed25519, ECDSA with
92/// key locator, or a Merkle proof up to a few hundred hashes).
93pub const SHM_SLOT_OVERHEAD: usize = 16 * 1024;
94
95/// Pick a slot size for a face that needs to carry NDN Data whose
96/// *content* can be up to `mtu` bytes. Rounds up to the next multiple
97/// of 64 bytes so the per-slot stride stays cache-line aligned.
98/// Does **not** clamp to a floor — an explicit `mtu = 8800` gets a
99/// ~25 KiB slot, not the 272 KiB default. If no `mtu` is specified,
100/// the caller should use `DEFAULT_SLOT_SIZE` directly.
101pub fn slot_size_for_mtu(mtu: usize) -> u32 {
102    let raw = mtu.saturating_add(SHM_SLOT_OVERHEAD);
103    let aligned = raw.div_ceil(64) * 64;
104    aligned.min(u32::MAX as usize) as u32
105}
106
107/// Compute ring capacity for a given slot_size, keeping total ring
108/// memory within [`SHM_BUDGET`]. Returns at least 16 (to keep some
109/// pipeline buffering even for very large slots).
110pub fn capacity_for_slot(slot_size: u32) -> u32 {
111    let stride = slot_stride(slot_size);
112    let cap = SHM_BUDGET / (2 * stride);
113    (cap as u32).max(16)
114}
115
116// Cache-line–aligned offsets for the four ring index atomics.
117const OFF_A2E_TAIL: usize = 64; // app writes (producer)
118const OFF_A2E_HEAD: usize = 128; // engine writes (consumer)
119const OFF_E2A_TAIL: usize = 192; // engine writes (producer)
120const OFF_E2A_HEAD: usize = 256; // app writes (consumer)
121// Parked flags: consumer sets to 1 before sleeping, clears on wake.
122const OFF_A2E_PARKED: usize = 320; // engine (a2e consumer) parked flag
123const OFF_E2A_PARKED: usize = 384; // app (e2a consumer) parked flag
124const HEADER_SIZE: usize = 448; // 7 × 64-byte cache lines
125
126const fn slot_stride(slot_size: u32) -> usize {
127    4 + slot_size as usize
128}
129
130/// Number of spin-loop iterations before falling through to the pipe
131/// wakeup path.  64 iterations ≈ sub-µs
132/// on modern hardware — enough to catch back-to-back packets without
133/// causing thermal throttling from sustained spinning across multiple faces.
134const SPIN_ITERS: u32 = 64;
135
136fn shm_total_size(capacity: u32, slot_size: u32) -> usize {
137    HEADER_SIZE + 2 * capacity as usize * slot_stride(slot_size)
138}
139
140fn a2e_ring_offset() -> usize {
141    HEADER_SIZE
142}
143fn e2a_ring_offset(capacity: u32, slot_size: u32) -> usize {
144    HEADER_SIZE + capacity as usize * slot_stride(slot_size)
145}
146
147// ─── Path helpers ─────────────────────────────────────────────────────────────
148
149fn posix_shm_name(name: &str) -> String {
150    format!("/ndn-shm-{name}")
151}
152
153/// Path of the FIFO the *engine* reads from (app writes to wake engine).
154fn a2e_pipe_path(name: &str) -> PathBuf {
155    PathBuf::from(format!("/tmp/.ndn-{name}.a2e.pipe"))
156}
157
158/// Path of the FIFO the *app* reads from (engine writes to wake app).
159fn e2a_pipe_path(name: &str) -> PathBuf {
160    PathBuf::from(format!("/tmp/.ndn-{name}.e2a.pipe"))
161}
162
163// ─── POSIX SHM region ────────────────────────────────────────────────────────
164
165/// Owns a POSIX SHM mapping. The creator unlinks the name on drop.
166struct ShmRegion {
167    ptr: *mut u8,
168    size: usize,
169    /// Present when this process created the region; drives shm_unlink on drop.
170    shm_name: Option<CString>,
171}
172
173unsafe impl Send for ShmRegion {}
174unsafe impl Sync for ShmRegion {}
175
176impl ShmRegion {
177    /// Create and zero-initialise a new named SHM region.
178    fn create(shm_name: &str, size: usize) -> Result<Self, ShmError> {
179        let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
180        let ptr = unsafe {
181            let fd = libc::shm_open(
182                cname.as_ptr(),
183                libc::O_CREAT | libc::O_RDWR | libc::O_TRUNC,
184                // 0o666: readable/writable by all users so an unprivileged app
185                // can connect to a router running as root.  The SHM name is
186                // unique per app instance, limiting exposure.
187                0o666 as libc::mode_t as libc::c_uint,
188            );
189            if fd == -1 {
190                return Err(ShmError::Io(std::io::Error::last_os_error()));
191            }
192
193            if libc::ftruncate(fd, size as libc::off_t) == -1 {
194                libc::close(fd);
195                return Err(ShmError::Io(std::io::Error::last_os_error()));
196            }
197
198            let p = libc::mmap(
199                std::ptr::null_mut(),
200                size,
201                libc::PROT_READ | libc::PROT_WRITE,
202                libc::MAP_SHARED,
203                fd,
204                0,
205            );
206            libc::close(fd);
207            if p == libc::MAP_FAILED {
208                return Err(ShmError::Io(std::io::Error::last_os_error()));
209            }
210            p as *mut u8
211        };
212        Ok(ShmRegion {
213            ptr,
214            size,
215            shm_name: Some(cname),
216        })
217    }
218
219    /// Open an existing named SHM region created by `ShmRegion::create`.
220    fn open(shm_name: &str, size: usize) -> Result<Self, ShmError> {
221        let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
222        let ptr = unsafe {
223            let fd = libc::shm_open(cname.as_ptr(), libc::O_RDWR, 0);
224            if fd == -1 {
225                return Err(ShmError::Io(std::io::Error::last_os_error()));
226            }
227
228            let p = libc::mmap(
229                std::ptr::null_mut(),
230                size,
231                libc::PROT_READ | libc::PROT_WRITE,
232                libc::MAP_SHARED,
233                fd,
234                0,
235            );
236            libc::close(fd);
237            if p == libc::MAP_FAILED {
238                return Err(ShmError::Io(std::io::Error::last_os_error()));
239            }
240            p as *mut u8
241        };
242        Ok(ShmRegion {
243            ptr,
244            size,
245            shm_name: None,
246        })
247    }
248
249    fn as_ptr(&self) -> *mut u8 {
250        self.ptr
251    }
252
253    /// Write the header fields (magic, capacity, slot_size).
254    ///
255    /// # Safety
256    /// Must be called exactly once immediately after `create()`, before any
257    /// other process opens the region.
258    unsafe fn write_header(&self, capacity: u32, slot_size: u32) {
259        unsafe {
260            (self.ptr as *mut u64).write_unaligned(MAGIC);
261            (self.ptr.add(8) as *mut u32).write_unaligned(capacity);
262            (self.ptr.add(12) as *mut u32).write_unaligned(slot_size);
263        }
264        // Ring indices start at zero (mmap of new ftruncated fd is zero-initialised).
265    }
266
267    /// Read and validate the header. Returns `(capacity, slot_size)`.
268    ///
269    /// # Safety
270    /// The region must have been initialised by `write_header`.
271    unsafe fn read_header(&self) -> Result<(u32, u32), ShmError> {
272        unsafe {
273            let magic = (self.ptr as *const u64).read_unaligned();
274            if magic != MAGIC {
275                return Err(ShmError::InvalidMagic);
276            }
277            let capacity = (self.ptr.add(8) as *const u32).read_unaligned();
278            let slot_size = (self.ptr.add(12) as *const u32).read_unaligned();
279            Ok((capacity, slot_size))
280        }
281    }
282}
283
284impl Drop for ShmRegion {
285    fn drop(&mut self) {
286        unsafe {
287            libc::munmap(self.ptr as *mut libc::c_void, self.size);
288            if let Some(ref n) = self.shm_name {
289                libc::shm_unlink(n.as_ptr());
290            }
291        }
292    }
293}
294
295// ─── SPSC ring operations ─────────────────────────────────────────────────────
296
297/// Push `data` into the ring at [`ring_off`] using the tail at [`tail_off`] and
298/// head at [`head_off`]. Returns `false` if the ring is full.
299///
300/// # Safety
301/// `base` must be a valid, exclusively-written SHM mapping of sufficient size.
302/// `data.len() <= slot_size` must hold.
303unsafe fn ring_push(
304    base: *mut u8,
305    ring_off: usize,
306    tail_off: usize,
307    head_off: usize,
308    capacity: u32,
309    slot_size: u32,
310    data: &[u8],
311) -> bool {
312    debug_assert!(data.len() <= slot_size as usize);
313
314    let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
315    let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
316
317    let t = tail_a.load(Ordering::Relaxed);
318    let h = head_a.load(Ordering::Acquire);
319    if t.wrapping_sub(h) >= capacity {
320        return false;
321    }
322
323    let idx = (t % capacity) as usize;
324    let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
325
326    unsafe {
327        (slot as *mut u32).write_unaligned(data.len() as u32);
328        std::ptr::copy_nonoverlapping(data.as_ptr(), slot.add(4), data.len());
329    }
330    tail_a.store(t.wrapping_add(1), Ordering::Release);
331    true
332}
333
334/// Push up to `pkts.len()` packets into the ring in one tail advance.
335///
336/// Loads the head once (Acquire), determines the free-slot count, writes
337/// as many packets as fit into consecutive slots, and publishes the tail
338/// with a single Release store. Returns the number of packets pushed
339/// (0 if the ring was full, `pkts.len()` on complete success, or a
340/// partial count in between).
341///
342/// Compared to calling [`ring_push`] N times, this function:
343/// - Loads the peer head *once* instead of N times (N−1 atomic loads
344///   saved).
345/// - Publishes a single Release store instead of N (N−1 release fences
346///   saved; the consumer sees all pushed packets at once).
347/// - Keeps the same per-slot length+memcpy work — batching does not
348///   save per-byte cost, only per-packet synchronisation cost.
349///
350/// The caller is responsible for handling the "ring full" case (yield
351/// and retry, same as single-packet `ring_push`).
352///
353/// # Safety
354/// Same as [`ring_push`]. Every packet must satisfy
355/// `pkt.len() <= slot_size`.
356unsafe fn ring_push_batch(
357    base: *mut u8,
358    ring_off: usize,
359    tail_off: usize,
360    head_off: usize,
361    capacity: u32,
362    slot_size: u32,
363    pkts: &[&[u8]],
364) -> usize {
365    if pkts.is_empty() {
366        return 0;
367    }
368    let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
369    let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
370
371    let mut t = tail_a.load(Ordering::Relaxed);
372    let h = head_a.load(Ordering::Acquire);
373    let free = capacity.wrapping_sub(t.wrapping_sub(h));
374    let to_push = (free as usize).min(pkts.len());
375    if to_push == 0 {
376        return 0;
377    }
378
379    for pkt in &pkts[..to_push] {
380        debug_assert!(pkt.len() <= slot_size as usize);
381        let idx = (t % capacity) as usize;
382        let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
383        unsafe {
384            (slot as *mut u32).write_unaligned(pkt.len() as u32);
385            std::ptr::copy_nonoverlapping(pkt.as_ptr(), slot.add(4), pkt.len());
386        }
387        t = t.wrapping_add(1);
388    }
389    // Single Release store publishes all `to_push` slots to the consumer.
390    tail_a.store(t, Ordering::Release);
391    to_push
392}
393
394/// Pop one packet from the ring. Returns `None` if empty.
395///
396/// # Safety
397/// Same as [`ring_push`].
398unsafe fn ring_pop(
399    base: *mut u8,
400    ring_off: usize,
401    tail_off: usize,
402    head_off: usize,
403    capacity: u32,
404    slot_size: u32,
405) -> Option<Bytes> {
406    let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
407    let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
408
409    let h = head_a.load(Ordering::Relaxed);
410    let t = tail_a.load(Ordering::Acquire);
411    if h == t {
412        return None;
413    }
414
415    let idx = (h % capacity) as usize;
416    let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
417
418    let len = unsafe { (slot as *const u32).read_unaligned() as usize };
419    // Clamp to prevent out-of-bounds read if SHM is corrupted.
420    let len = len.min(slot_size as usize);
421    let data = unsafe { Bytes::copy_from_slice(std::slice::from_raw_parts(slot.add(4), len)) };
422
423    head_a.store(h.wrapping_add(1), Ordering::Release);
424    Some(data)
425}
426
427// ─── FIFO wakeup helpers ─────────────────────────────────────────────────────
428
429/// Open a named FIFO (must already exist) with `O_RDWR | O_NONBLOCK`.
430///
431/// `O_RDWR` avoids the blocking-open problem: the open succeeds immediately
432/// even if the other end has not yet opened the FIFO.  Both sides only use
433/// the fd in the direction they own (reads or writes), so no cross-reading
434/// occurs.
435fn open_fifo_rdwr(path: &std::path::Path) -> Result<std::os::unix::io::OwnedFd, ShmError> {
436    use std::os::unix::io::{FromRawFd, OwnedFd};
437    let cpath = CString::new(path.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
438    let fd = unsafe { libc::open(cpath.as_ptr(), libc::O_RDWR | libc::O_NONBLOCK) };
439    if fd == -1 {
440        return Err(ShmError::Io(std::io::Error::last_os_error()));
441    }
442    Ok(unsafe { OwnedFd::from_raw_fd(fd) })
443}
444
445/// Await readability on the pipe fd, then drain all buffered bytes.
446///
447/// Returns `Err` on EOF (peer died) or any I/O error.
448async fn pipe_await(
449    rx: &tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
450) -> std::io::Result<()> {
451    use std::os::unix::io::AsRawFd;
452    loop {
453        let mut guard = rx.readable().await?;
454        let mut buf = [0u8; 64];
455        let fd = rx.get_ref().as_raw_fd();
456        let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
457        guard.clear_ready();
458        if n > 0 {
459            return Ok(());
460        }
461        if n == 0 {
462            // EOF — peer closed their end.
463            return Err(std::io::Error::new(
464                std::io::ErrorKind::UnexpectedEof,
465                "SHM wakeup pipe closed (peer died)",
466            ));
467        }
468        if n == -1 {
469            let err = std::io::Error::last_os_error();
470            if err.kind() != std::io::ErrorKind::WouldBlock {
471                return Err(err);
472            }
473        }
474    }
475}
476
477/// Write one wakeup byte to a non-blocking pipe fd.
478///
479/// Silently ignores `EAGAIN` (pipe buffer full): if the buffer is full the
480/// consumer is already being woken by a previous byte.
481fn pipe_write(tx: &std::os::unix::io::OwnedFd) {
482    use std::os::unix::io::AsRawFd;
483    let b = [1u8];
484    unsafe {
485        libc::write(tx.as_raw_fd(), b.as_ptr() as *const libc::c_void, 1);
486    }
487}
488
489// ─── SpscFace (engine side) ───────────────────────────────────────────────────
490
491/// Engine-side SPSC SHM face.
492///
493/// Create with [`SpscFace::create`]; register with the engine via
494/// `ForwarderEngine::add_face`. Give the `name` to the application so it can
495/// call [`SpscHandle::connect`].
496pub struct SpscFace {
497    id: FaceId,
498    shm: ShmRegion,
499    capacity: u32,
500    slot_size: u32,
501    a2e_off: usize,
502    e2a_off: usize,
503    /// FIFO the engine awaits readability on (app writes here to wake engine).
504    a2e_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
505    /// FIFO the engine writes to (to wake the app).
506    e2a_tx: std::os::unix::io::OwnedFd,
507    /// Paths of the FIFOs created by the engine — removed on drop.
508    a2e_pipe_path: PathBuf,
509    e2a_pipe_path: PathBuf,
510}
511
512impl SpscFace {
513    /// Create the SHM region and set up the wakeup mechanism.
514    ///
515    /// `name` identifies this face (e.g. `"sensor-0"`); pass it to
516    /// [`SpscHandle::connect`] in the application process.
517    pub fn create(id: FaceId, name: &str) -> Result<Self, ShmError> {
518        Self::create_with(id, name, DEFAULT_CAPACITY, DEFAULT_SLOT_SIZE)
519    }
520
521    /// Create a face sized for Data packets whose content can be up
522    /// to `mtu` bytes. Picks `slot_size = slot_size_for_mtu(mtu)` and
523    /// scales `capacity` inversely so the total ring memory stays
524    /// within [`SHM_BUDGET`]. Use this when an application has
525    /// announced its expected packet size via `faces/create`'s `mtu`
526    /// ControlParameter.
527    pub fn create_for_mtu(id: FaceId, name: &str, mtu: usize) -> Result<Self, ShmError> {
528        let ss = slot_size_for_mtu(mtu);
529        Self::create_with(id, name, capacity_for_slot(ss), ss)
530    }
531
532    /// Create with explicit ring parameters.
533    pub fn create_with(
534        id: FaceId,
535        name: &str,
536        capacity: u32,
537        slot_size: u32,
538    ) -> Result<Self, ShmError> {
539        let size = shm_total_size(capacity, slot_size);
540        let shm = ShmRegion::create(&posix_shm_name(name), size)?;
541        unsafe {
542            shm.write_header(capacity, slot_size);
543        }
544
545        let a2e_off = a2e_ring_offset();
546        let e2a_off = e2a_ring_offset(capacity, slot_size);
547
548        use tokio::io::unix::AsyncFd;
549
550        let a2e_path = a2e_pipe_path(name);
551        let e2a_path = e2a_pipe_path(name);
552
553        // Remove stale FIFOs from a previous run.
554        let _ = std::fs::remove_file(&a2e_path);
555        let _ = std::fs::remove_file(&e2a_path);
556
557        // Create the named FIFOs.
558        for p in [&a2e_path, &e2a_path] {
559            let cp = CString::new(p.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
560            if unsafe { libc::mkfifo(cp.as_ptr(), 0o600) } == -1 {
561                return Err(ShmError::Io(std::io::Error::last_os_error()));
562            }
563        }
564
565        // Engine reads from a2e (awaits wakeup from app).
566        let a2e_fd = open_fifo_rdwr(&a2e_path)?;
567        let a2e_rx = AsyncFd::new(a2e_fd).map_err(ShmError::Io)?;
568
569        // Engine writes to e2a (sends wakeup to app).
570        let e2a_tx = open_fifo_rdwr(&e2a_path)?;
571
572        Ok(SpscFace {
573            id,
574            shm,
575            capacity,
576            slot_size,
577            a2e_off,
578            e2a_off,
579            a2e_rx,
580            e2a_tx,
581            a2e_pipe_path: a2e_path,
582            e2a_pipe_path: e2a_path,
583        })
584    }
585
586    fn try_pop_a2e(&self) -> Option<Bytes> {
587        unsafe {
588            ring_pop(
589                self.shm.as_ptr(),
590                self.a2e_off,
591                OFF_A2E_TAIL,
592                OFF_A2E_HEAD,
593                self.capacity,
594                self.slot_size,
595            )
596        }
597    }
598
599    fn try_push_e2a(&self, data: &[u8]) -> bool {
600        unsafe {
601            ring_push(
602                self.shm.as_ptr(),
603                self.e2a_off,
604                OFF_E2A_TAIL,
605                OFF_E2A_HEAD,
606                self.capacity,
607                self.slot_size,
608                data,
609            )
610        }
611    }
612
613    fn try_push_batch_e2a(&self, pkts: &[&[u8]]) -> usize {
614        unsafe {
615            ring_push_batch(
616                self.shm.as_ptr(),
617                self.e2a_off,
618                OFF_E2A_TAIL,
619                OFF_E2A_HEAD,
620                self.capacity,
621                self.slot_size,
622                pkts,
623            )
624        }
625    }
626
627    /// Send multiple Data/Interest packets to the app in a single tail
628    /// advance. See [`SpscHandle::send_batch`] for the semantics — this
629    /// is the engine-side mirror.
630    pub async fn send_batch(&self, pkts: &[Bytes]) -> Result<(), FaceError> {
631        if pkts.is_empty() {
632            return Ok(());
633        }
634        for pkt in pkts {
635            if pkt.len() > self.slot_size as usize {
636                return Err(FaceError::Io(std::io::Error::new(
637                    std::io::ErrorKind::InvalidInput,
638                    "packet exceeds SHM slot size",
639                )));
640            }
641        }
642        // SAFETY: parked flag within mapped SHM region.
643        let parked =
644            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
645        // Build a `&[&[u8]]` view once — cheaper than re-slicing inside
646        // the yield loop.
647        let views: Vec<&[u8]> = pkts.iter().map(|p| p.as_ref()).collect();
648        let mut start = 0usize;
649        while start < views.len() {
650            let pushed = self.try_push_batch_e2a(&views[start..]);
651            if pushed == 0 {
652                tokio::task::yield_now().await;
653                continue;
654            }
655            start += pushed;
656            if parked.load(Ordering::SeqCst) != 0 {
657                pipe_write(&self.e2a_tx);
658            }
659        }
660        Ok(())
661    }
662}
663
664impl Drop for SpscFace {
665    fn drop(&mut self) {
666        let _ = std::fs::remove_file(&self.a2e_pipe_path);
667        let _ = std::fs::remove_file(&self.e2a_pipe_path);
668    }
669}
670
671impl Face for SpscFace {
672    fn id(&self) -> FaceId {
673        self.id
674    }
675    fn kind(&self) -> FaceKind {
676        FaceKind::Shm
677    }
678
679    async fn recv(&self) -> Result<Bytes, FaceError> {
680        // SAFETY: parked flag is within the mapped SHM region.
681        let parked =
682            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
683        loop {
684            if let Some(pkt) = self.try_pop_a2e() {
685                return Ok(pkt);
686            }
687            // Spin before parking — avoids expensive pipe syscall
688            // when packets arrive within microseconds of each other.
689            for _ in 0..SPIN_ITERS {
690                std::hint::spin_loop();
691                if let Some(pkt) = self.try_pop_a2e() {
692                    return Ok(pkt);
693                }
694            }
695            // Announce intent to sleep with SeqCst so the app's next SeqCst
696            // load on the parked flag observes this before or after it pushes
697            // to the ring — never concurrently missed.
698            parked.store(1, Ordering::SeqCst);
699            // Second ring check: if the app already pushed between our first
700            // check and the flag store, we see it here and avoid sleeping.
701            if let Some(pkt) = self.try_pop_a2e() {
702                parked.store(0, Ordering::Relaxed);
703                return Ok(pkt);
704            }
705
706            // Sleep until the app sends a wakeup via the FIFO.
707            pipe_await(&self.a2e_rx)
708                .await
709                .map_err(|_| FaceError::Closed)?;
710
711            parked.store(0, Ordering::Relaxed);
712        }
713    }
714
715    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
716        if pkt.len() > self.slot_size as usize {
717            return Err(FaceError::Io(std::io::Error::new(
718                std::io::ErrorKind::InvalidInput,
719                "packet exceeds SHM slot size",
720            )));
721        }
722        // SAFETY: parked flag within mapped SHM region.
723        let parked =
724            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
725        // Yield until there is space in the e2a ring (backpressure).
726        loop {
727            if self.try_push_e2a(&pkt) {
728                break;
729            }
730            tokio::task::yield_now().await;
731        }
732        // Only send a wakeup if the app is actually sleeping.
733        if parked.load(Ordering::SeqCst) != 0 {
734            pipe_write(&self.e2a_tx);
735        }
736        Ok(())
737    }
738}
739
740// ─── SpscHandle (application side) ───────────────────────────────────────────
741
742/// Application-side SPSC SHM handle.
743///
744/// Connect with [`SpscHandle::connect`] using the same `name` passed to
745/// [`SpscFace::create`] in the engine process.
746///
747/// Set a `CancellationToken` via [`set_cancel`] to abort `recv`/`send` when
748/// the router's control face disconnects (the O_RDWR FIFO trick means EOF
749/// detection alone is unreliable).
750pub struct SpscHandle {
751    shm: ShmRegion,
752    capacity: u32,
753    slot_size: u32,
754    a2e_off: usize,
755    e2a_off: usize,
756    /// FIFO the app awaits readability on (engine writes here to wake app).
757    e2a_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
758    /// FIFO the app writes to (to wake the engine).
759    a2e_tx: std::os::unix::io::OwnedFd,
760    /// Cancelled when the router control face dies.
761    cancel: tokio_util::sync::CancellationToken,
762}
763
764impl SpscHandle {
765    /// Open the SHM region created by the engine and set up the wakeup mechanism.
766    pub fn connect(name: &str) -> Result<Self, ShmError> {
767        let shm_name_str = posix_shm_name(name);
768        let cname = CString::new(shm_name_str.as_str()).map_err(|_| ShmError::InvalidName)?;
769
770        // Phase 1: open just the header to read capacity and slot_size.
771        let (capacity, slot_size) = unsafe {
772            let fd = libc::shm_open(cname.as_ptr(), libc::O_RDONLY, 0);
773            if fd == -1 {
774                return Err(ShmError::Io(std::io::Error::last_os_error()));
775            }
776            let p = libc::mmap(
777                std::ptr::null_mut(),
778                HEADER_SIZE,
779                libc::PROT_READ,
780                libc::MAP_SHARED,
781                fd,
782                0,
783            );
784            libc::close(fd);
785            if p == libc::MAP_FAILED {
786                return Err(ShmError::Io(std::io::Error::last_os_error()));
787            }
788            let base = p as *const u8;
789            let magic = (base as *const u64).read_unaligned();
790            if magic != MAGIC {
791                libc::munmap(p, HEADER_SIZE);
792                return Err(ShmError::InvalidMagic);
793            }
794            let cap = (base.add(8) as *const u32).read_unaligned();
795            let slen = (base.add(12) as *const u32).read_unaligned();
796            libc::munmap(p, HEADER_SIZE);
797            (cap, slen)
798        };
799
800        // Phase 2: open the full region read-write.
801        let size = shm_total_size(capacity, slot_size);
802        let shm = ShmRegion::open(&shm_name_str, size)?;
803        unsafe { shm.read_header()? };
804
805        let a2e_off = a2e_ring_offset();
806        let e2a_off = e2a_ring_offset(capacity, slot_size);
807
808        use tokio::io::unix::AsyncFd;
809
810        let a2e_path = a2e_pipe_path(name); // app writes here to wake engine
811        let e2a_path = e2a_pipe_path(name); // app reads here (engine wakes app)
812
813        // App writes to a2e FIFO (to wake engine).
814        let a2e_tx = open_fifo_rdwr(&a2e_path)?;
815
816        // App reads from e2a FIFO (awaits wakeup from engine).
817        let e2a_fd = open_fifo_rdwr(&e2a_path)?;
818        let e2a_rx = AsyncFd::new(e2a_fd).map_err(ShmError::Io)?;
819
820        Ok(SpscHandle {
821            shm,
822            capacity,
823            slot_size,
824            a2e_off,
825            e2a_off,
826            e2a_rx,
827            a2e_tx,
828            cancel: tokio_util::sync::CancellationToken::new(),
829        })
830    }
831
832    /// Attach a cancellation token (typically a child of the control face's
833    /// lifecycle token).  When cancelled, `recv()` returns `None` and `send()`
834    /// returns `Err`.
835    pub fn set_cancel(&mut self, cancel: tokio_util::sync::CancellationToken) {
836        self.cancel = cancel;
837    }
838
839    fn try_push_a2e(&self, data: &[u8]) -> bool {
840        unsafe {
841            ring_push(
842                self.shm.as_ptr(),
843                self.a2e_off,
844                OFF_A2E_TAIL,
845                OFF_A2E_HEAD,
846                self.capacity,
847                self.slot_size,
848                data,
849            )
850        }
851    }
852
853    fn try_pop_e2a(&self) -> Option<Bytes> {
854        unsafe {
855            ring_pop(
856                self.shm.as_ptr(),
857                self.e2a_off,
858                OFF_E2A_TAIL,
859                OFF_E2A_HEAD,
860                self.capacity,
861                self.slot_size,
862            )
863        }
864    }
865
866    fn try_push_batch_a2e(&self, pkts: &[&[u8]]) -> usize {
867        unsafe {
868            ring_push_batch(
869                self.shm.as_ptr(),
870                self.a2e_off,
871                OFF_A2E_TAIL,
872                OFF_A2E_HEAD,
873                self.capacity,
874                self.slot_size,
875                pkts,
876            )
877        }
878    }
879
880    /// Send multiple packets to the engine in one tail advance.
881    ///
882    /// Equivalent in outcome to calling [`send`](Self::send) `pkts.len()`
883    /// times in order, but pays the synchronisation cost once rather than
884    /// N times:
885    ///
886    /// - One head load (Acquire) per ring-full check instead of N.
887    /// - One tail store (Release) per successful push batch instead of N.
888    /// - One wakeup pipe write at the end instead of up to N.
889    /// - Far fewer `tokio::task::yield_now().await` points when the ring
890    ///   is not contended: a window-fill of 16 Interests becomes one call,
891    ///   one atomic ring transition, and zero scheduler round trips in the
892    ///   common case (instead of 16 awaits and potentially 16 yields).
893    ///
894    /// Like `send`, this yields cooperatively if the ring fills mid-batch
895    /// and waits (up to a 5 s wall-clock deadline) for the engine to drain
896    /// enough slots. Partial progress is preserved across yields — the
897    /// caller is guaranteed that all `pkts` eventually reach the ring or
898    /// the call returns `Err(Closed)`.
899    ///
900    /// Every packet in `pkts` must satisfy `pkt.len() <= slot_size`, or
901    /// the call returns `Err(PacketTooLarge)` before publishing anything.
902    /// The size check is performed up-front so a batch is either fully
903    /// publishable or rejected atomically for the over-size case.
904    pub async fn send_batch(&self, pkts: &[Bytes]) -> Result<(), ShmError> {
905        if self.cancel.is_cancelled() {
906            return Err(ShmError::Closed);
907        }
908        if pkts.is_empty() {
909            return Ok(());
910        }
911        for pkt in pkts {
912            if pkt.len() > self.slot_size as usize {
913                return Err(ShmError::PacketTooLarge);
914            }
915        }
916        // SAFETY: parked flag within mapped SHM region.
917        let parked =
918            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
919        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
920        let views: Vec<&[u8]> = pkts.iter().map(|p| p.as_ref()).collect();
921        let mut start = 0usize;
922        while start < views.len() {
923            let pushed = self.try_push_batch_a2e(&views[start..]);
924            if pushed == 0 {
925                if self.cancel.is_cancelled() {
926                    return Err(ShmError::Closed);
927                }
928                if tokio::time::Instant::now() >= deadline {
929                    return Err(ShmError::Closed);
930                }
931                tokio::task::yield_now().await;
932                continue;
933            }
934            start += pushed;
935            // Wake the engine after each partial push so it can drain
936            // the ring. Without this, a batch larger than ring capacity
937            // deadlocks: we yield waiting for space, the engine parks
938            // because nobody woke it to consume the first partial push.
939            if parked.load(Ordering::SeqCst) != 0 {
940                pipe_write(&self.a2e_tx);
941            }
942        }
943        Ok(())
944    }
945
946    /// Send a packet to the engine (enqueue in the a2e ring).
947    ///
948    /// Yields cooperatively if the ring is full (backpressure from the engine).
949    /// Returns `Err(Closed)` if the cancellation token fires (engine dead).
950    ///
951    /// Uses a wall-clock deadline so backpressure tolerance is independent
952    /// of system scheduling speed (the old yield-counter approach returned
953    /// `Closed` after ~100k yields ≈ 1s on fast machines, but could be much
954    /// shorter under heavy Tokio contention — falsely killing the caller).
955    pub async fn send(&self, pkt: Bytes) -> Result<(), ShmError> {
956        if self.cancel.is_cancelled() {
957            return Err(ShmError::Closed);
958        }
959        if pkt.len() > self.slot_size as usize {
960            return Err(ShmError::PacketTooLarge);
961        }
962        // SAFETY: parked flag within mapped SHM region.
963        let parked =
964            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
965        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
966        loop {
967            if self.try_push_a2e(&pkt) {
968                break;
969            }
970            if self.cancel.is_cancelled() {
971                return Err(ShmError::Closed);
972            }
973            if tokio::time::Instant::now() >= deadline {
974                return Err(ShmError::Closed);
975            }
976            tokio::task::yield_now().await;
977        }
978        // Only send a wakeup if the engine is sleeping on the a2e ring.
979        if parked.load(Ordering::SeqCst) != 0 {
980            pipe_write(&self.a2e_tx);
981        }
982        Ok(())
983    }
984
985    /// Receive a packet from the engine (dequeue from the e2a ring).
986    ///
987    /// Returns `None` when the engine face has been dropped or the
988    /// cancellation token fires.
989    pub async fn recv(&self) -> Option<Bytes> {
990        if self.cancel.is_cancelled() {
991            return None;
992        }
993        // SAFETY: parked flag within mapped SHM region.
994        let parked =
995            unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
996        loop {
997            if let Some(pkt) = self.try_pop_e2a() {
998                return Some(pkt);
999            }
1000            // Spin before parking — avoids expensive pipe syscall
1001            // when packets arrive within microseconds of each other.
1002            for _ in 0..SPIN_ITERS {
1003                std::hint::spin_loop();
1004                if let Some(pkt) = self.try_pop_e2a() {
1005                    return Some(pkt);
1006                }
1007            }
1008            parked.store(1, Ordering::SeqCst);
1009            if let Some(pkt) = self.try_pop_e2a() {
1010                parked.store(0, Ordering::Relaxed);
1011                return Some(pkt);
1012            }
1013
1014            // Wait for pipe wakeup or cancellation.  We rely on the
1015            // CancellationToken (propagated from the control face) rather
1016            // than timeouts — idle waits are legitimate (e.g. iperf server
1017            // waiting for a client).
1018            tokio::select! {
1019                result = pipe_await(&self.e2a_rx) => {
1020                    parked.store(0, Ordering::Relaxed);
1021                    if result.is_err() { return None; }
1022                }
1023                _ = self.cancel.cancelled() => {
1024                    parked.store(0, Ordering::Relaxed);
1025                    return None;
1026                }
1027            }
1028        }
1029    }
1030}
1031
1032// SpscHandle has no Drop impl: ShmRegion handles munmap, OwnedFd closes pipe
1033// fds, and the FIFOs are created/removed by SpscFace (engine side).
1034
1035// ─── Tests ───────────────────────────────────────────────────────────────────
1036
1037#[cfg(test)]
1038mod tests {
1039    use super::*;
1040    use ndn_transport::Face;
1041
1042    fn test_name() -> String {
1043        // Use PID to avoid collisions when tests run concurrently.
1044        format!("test-spsc-{}", std::process::id())
1045    }
1046
1047    // Tests use multi_thread because AsyncFd needs the runtime's I/O driver.
1048    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1049    async fn face_kind_and_id() {
1050        let name = test_name();
1051        let face = SpscFace::create(FaceId(7), &name).unwrap();
1052        assert_eq!(face.id(), FaceId(7));
1053        assert_eq!(face.kind(), FaceKind::Shm);
1054    }
1055
1056    #[test]
1057    fn slot_size_for_mtu_no_floor_clamp() {
1058        // slot_size_for_mtu does NOT clamp to DEFAULT_SLOT_SIZE anymore.
1059        // mtu=1024 → 1024 + 16384 = 17408, aligned to 64 = 17408.
1060        let small = slot_size_for_mtu(1024);
1061        assert_eq!(small, 17408);
1062        assert!(small < DEFAULT_SLOT_SIZE + SHM_SLOT_OVERHEAD as u32);
1063
1064        // mtu=0 → 0 + 16384 = 16384, aligned = 16384.
1065        assert_eq!(slot_size_for_mtu(0), 16384);
1066    }
1067
1068    #[test]
1069    fn slot_size_for_mtu_scales_up_for_large_mtu() {
1070        let one_mib = slot_size_for_mtu(1024 * 1024);
1071        assert!(one_mib >= 1024 * 1024 + SHM_SLOT_OVERHEAD as u32);
1072        assert_eq!(one_mib % 64, 0);
1073    }
1074
1075    #[test]
1076    fn capacity_for_slot_inversely_scales() {
1077        // Default slot → default capacity.
1078        assert_eq!(capacity_for_slot(DEFAULT_SLOT_SIZE), DEFAULT_CAPACITY);
1079        // 256 KiB slot → much smaller capacity.
1080        let cap_256k = capacity_for_slot(272_384);
1081        assert!(cap_256k < DEFAULT_CAPACITY);
1082        assert!(cap_256k >= 16);
1083        // 1 MiB slot → minimum capacity (16).
1084        let cap_1m = capacity_for_slot(1_064_960);
1085        assert_eq!(cap_1m, 16);
1086    }
1087
1088    #[test]
1089    fn slot_size_for_mtu_is_cache_line_aligned() {
1090        for mtu in [256_000, 512_000, 768_000, 1_000_000, 2_000_000] {
1091            let s = slot_size_for_mtu(mtu);
1092            assert_eq!(s % 64, 0, "slot_size_for_mtu({mtu}) = {s} not 64-aligned");
1093        }
1094    }
1095
1096    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1097    async fn create_for_mtu_large_segment_roundtrip() {
1098        // Reproduce the symptom that motivated the slot-size change:
1099        // a Data packet carrying a ~256 KiB content body must pass
1100        // through the SHM face without hitting "packet exceeds SHM
1101        // slot size".
1102        let name = format!("{}-big", test_name());
1103        let face = SpscFace::create_for_mtu(FaceId(42), &name, 256 * 1024).unwrap();
1104        let handle = SpscHandle::connect(&name).unwrap();
1105
1106        let payload = Bytes::from(vec![0xABu8; 260_000]);
1107        handle.send(payload.clone()).await.unwrap();
1108        let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
1109            .await
1110            .expect("timed out")
1111            .unwrap();
1112        assert_eq!(received.len(), payload.len());
1113        assert_eq!(&received[..16], &payload[..16]);
1114    }
1115
1116    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1117    async fn send_batch_app_to_engine() {
1118        let name = format!("{}-bae", test_name());
1119        let face = SpscFace::create(FaceId(20), &name).unwrap();
1120        let handle = SpscHandle::connect(&name).unwrap();
1121
1122        let pkts: Vec<Bytes> = (0u8..16)
1123            .map(|i| Bytes::from(vec![i; 64]))
1124            .collect();
1125        handle.send_batch(&pkts).await.unwrap();
1126
1127        for i in 0u8..16 {
1128            let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
1129                .await
1130                .expect("timed out")
1131                .unwrap();
1132            assert_eq!(received.len(), 64);
1133            assert_eq!(received[0], i);
1134        }
1135    }
1136
1137    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1138    async fn send_batch_engine_to_app() {
1139        let name = format!("{}-bea", test_name());
1140        let face = SpscFace::create(FaceId(21), &name).unwrap();
1141        let handle = SpscHandle::connect(&name).unwrap();
1142
1143        let pkts: Vec<Bytes> = (0u8..16)
1144            .map(|i| Bytes::from(vec![i; 64]))
1145            .collect();
1146        face.send_batch(&pkts).await.unwrap();
1147
1148        for i in 0u8..16 {
1149            let received = tokio::time::timeout(std::time::Duration::from_secs(2), handle.recv())
1150                .await
1151                .expect("timed out")
1152                .unwrap();
1153            assert_eq!(received.len(), 64);
1154            assert_eq!(received[0], i);
1155        }
1156    }
1157
1158    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1159    async fn send_batch_exceeds_ring_capacity() {
1160        let name = format!("{}-bfull", test_name());
1161        let face = SpscFace::create(FaceId(22), &name).unwrap();
1162        let handle = SpscHandle::connect(&name).unwrap();
1163
1164        // DEFAULT_CAPACITY is 32 — send 48 packets. The batch must
1165        // yield internally until the engine drains some slots.
1166        let n = 48usize;
1167        let pkts: Vec<Bytes> = (0..n)
1168            .map(|i| Bytes::from(vec![(i & 0xFF) as u8; 32]))
1169            .collect();
1170
1171        // Spawn the batch send as a task; drain from the engine side
1172        // concurrently so the ring unblocks.
1173        let send_handle = tokio::spawn({
1174            let pkts = pkts.clone();
1175            async move { handle.send_batch(&pkts).await }
1176        });
1177        for i in 0..n {
1178            let received = tokio::time::timeout(std::time::Duration::from_secs(5), face.recv())
1179                .await
1180                .expect("timed out")
1181                .unwrap();
1182            assert_eq!(received[0], (i & 0xFF) as u8);
1183        }
1184        send_handle.await.unwrap().unwrap();
1185    }
1186
1187    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1188    async fn app_to_engine_roundtrip() {
1189        let name = format!("{}-ae", test_name());
1190        let face = SpscFace::create(FaceId(1), &name).unwrap();
1191        let handle = SpscHandle::connect(&name).unwrap();
1192
1193        let pkt = Bytes::from_static(b"\x05\x03\x01\x02\x03");
1194        handle.send(pkt.clone()).await.unwrap();
1195
1196        let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
1197            .await
1198            .expect("timed out")
1199            .unwrap();
1200
1201        assert_eq!(received, pkt);
1202    }
1203
1204    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1205    async fn engine_to_app_roundtrip() {
1206        let name = format!("{}-ea", test_name());
1207        let face = SpscFace::create(FaceId(2), &name).unwrap();
1208        let handle = SpscHandle::connect(&name).unwrap();
1209
1210        let pkt = Bytes::from_static(b"\x06\x03\xAA\xBB\xCC");
1211        face.send(pkt.clone()).await.unwrap();
1212
1213        let received = tokio::time::timeout(std::time::Duration::from_secs(2), handle.recv())
1214            .await
1215            .expect("timed out")
1216            .unwrap();
1217
1218        assert_eq!(received, pkt);
1219    }
1220
1221    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1222    async fn multiple_packets_both_directions() {
1223        let name = format!("{}-bi", test_name());
1224        let face = SpscFace::create(FaceId(3), &name).unwrap();
1225        let handle = SpscHandle::connect(&name).unwrap();
1226
1227        // App → Engine: 4 packets
1228        for i in 0u8..4 {
1229            handle.send(Bytes::from(vec![i; 64])).await.unwrap();
1230        }
1231        for i in 0u8..4 {
1232            let pkt = face.recv().await.unwrap();
1233            assert_eq!(&pkt[..], &vec![i; 64][..]);
1234        }
1235
1236        // Engine → App: 4 packets
1237        for i in 0u8..4 {
1238            face.send(Bytes::from(vec![i + 10; 128])).await.unwrap();
1239        }
1240        for i in 0u8..4 {
1241            let pkt = handle.recv().await.unwrap();
1242            assert_eq!(&pkt[..], &vec![i + 10; 128][..]);
1243        }
1244    }
1245}