1use std::ffi::CString;
38use std::path::PathBuf;
39use std::sync::atomic::{AtomicU32, Ordering};
40
41use bytes::Bytes;
42
43use ndn_transport::{Face, FaceError, FaceId, FaceKind};
44
45use crate::local::shm::ShmError;
46
47const MAGIC: u64 = 0x4E44_4E5F_5348_4D00; pub const DEFAULT_CAPACITY: u32 = 256;
68
69pub const DEFAULT_SLOT_SIZE: u32 = 8960;
80
81const SHM_BUDGET: usize = 2 * DEFAULT_CAPACITY as usize * slot_stride(DEFAULT_SLOT_SIZE);
87
88pub const SHM_SLOT_OVERHEAD: usize = 16 * 1024;
94
95pub fn slot_size_for_mtu(mtu: usize) -> u32 {
102 let raw = mtu.saturating_add(SHM_SLOT_OVERHEAD);
103 let aligned = raw.div_ceil(64) * 64;
104 aligned.min(u32::MAX as usize) as u32
105}
106
107pub fn capacity_for_slot(slot_size: u32) -> u32 {
111 let stride = slot_stride(slot_size);
112 let cap = SHM_BUDGET / (2 * stride);
113 (cap as u32).max(16)
114}
115
116const OFF_A2E_TAIL: usize = 64; const OFF_A2E_HEAD: usize = 128; const OFF_E2A_TAIL: usize = 192; const OFF_E2A_HEAD: usize = 256; const OFF_A2E_PARKED: usize = 320; const OFF_E2A_PARKED: usize = 384; const HEADER_SIZE: usize = 448; const fn slot_stride(slot_size: u32) -> usize {
127 4 + slot_size as usize
128}
129
130const SPIN_ITERS: u32 = 64;
135
136fn shm_total_size(capacity: u32, slot_size: u32) -> usize {
137 HEADER_SIZE + 2 * capacity as usize * slot_stride(slot_size)
138}
139
140fn a2e_ring_offset() -> usize {
141 HEADER_SIZE
142}
143fn e2a_ring_offset(capacity: u32, slot_size: u32) -> usize {
144 HEADER_SIZE + capacity as usize * slot_stride(slot_size)
145}
146
147fn posix_shm_name(name: &str) -> String {
150 format!("/ndn-shm-{name}")
151}
152
153fn a2e_pipe_path(name: &str) -> PathBuf {
155 PathBuf::from(format!("/tmp/.ndn-{name}.a2e.pipe"))
156}
157
158fn e2a_pipe_path(name: &str) -> PathBuf {
160 PathBuf::from(format!("/tmp/.ndn-{name}.e2a.pipe"))
161}
162
163struct ShmRegion {
167 ptr: *mut u8,
168 size: usize,
169 shm_name: Option<CString>,
171}
172
173unsafe impl Send for ShmRegion {}
174unsafe impl Sync for ShmRegion {}
175
176impl ShmRegion {
177 fn create(shm_name: &str, size: usize) -> Result<Self, ShmError> {
179 let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
180 let ptr = unsafe {
181 let fd = libc::shm_open(
182 cname.as_ptr(),
183 libc::O_CREAT | libc::O_RDWR | libc::O_TRUNC,
184 0o666 as libc::mode_t as libc::c_uint,
188 );
189 if fd == -1 {
190 return Err(ShmError::Io(std::io::Error::last_os_error()));
191 }
192
193 if libc::ftruncate(fd, size as libc::off_t) == -1 {
194 libc::close(fd);
195 return Err(ShmError::Io(std::io::Error::last_os_error()));
196 }
197
198 let p = libc::mmap(
199 std::ptr::null_mut(),
200 size,
201 libc::PROT_READ | libc::PROT_WRITE,
202 libc::MAP_SHARED,
203 fd,
204 0,
205 );
206 libc::close(fd);
207 if p == libc::MAP_FAILED {
208 return Err(ShmError::Io(std::io::Error::last_os_error()));
209 }
210 p as *mut u8
211 };
212 Ok(ShmRegion {
213 ptr,
214 size,
215 shm_name: Some(cname),
216 })
217 }
218
219 fn open(shm_name: &str, size: usize) -> Result<Self, ShmError> {
221 let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
222 let ptr = unsafe {
223 let fd = libc::shm_open(cname.as_ptr(), libc::O_RDWR, 0);
224 if fd == -1 {
225 return Err(ShmError::Io(std::io::Error::last_os_error()));
226 }
227
228 let p = libc::mmap(
229 std::ptr::null_mut(),
230 size,
231 libc::PROT_READ | libc::PROT_WRITE,
232 libc::MAP_SHARED,
233 fd,
234 0,
235 );
236 libc::close(fd);
237 if p == libc::MAP_FAILED {
238 return Err(ShmError::Io(std::io::Error::last_os_error()));
239 }
240 p as *mut u8
241 };
242 Ok(ShmRegion {
243 ptr,
244 size,
245 shm_name: None,
246 })
247 }
248
249 fn as_ptr(&self) -> *mut u8 {
250 self.ptr
251 }
252
253 unsafe fn write_header(&self, capacity: u32, slot_size: u32) {
259 unsafe {
260 (self.ptr as *mut u64).write_unaligned(MAGIC);
261 (self.ptr.add(8) as *mut u32).write_unaligned(capacity);
262 (self.ptr.add(12) as *mut u32).write_unaligned(slot_size);
263 }
264 }
266
267 unsafe fn read_header(&self) -> Result<(u32, u32), ShmError> {
272 unsafe {
273 let magic = (self.ptr as *const u64).read_unaligned();
274 if magic != MAGIC {
275 return Err(ShmError::InvalidMagic);
276 }
277 let capacity = (self.ptr.add(8) as *const u32).read_unaligned();
278 let slot_size = (self.ptr.add(12) as *const u32).read_unaligned();
279 Ok((capacity, slot_size))
280 }
281 }
282}
283
284impl Drop for ShmRegion {
285 fn drop(&mut self) {
286 unsafe {
287 libc::munmap(self.ptr as *mut libc::c_void, self.size);
288 if let Some(ref n) = self.shm_name {
289 libc::shm_unlink(n.as_ptr());
290 }
291 }
292 }
293}
294
295unsafe fn ring_push(
304 base: *mut u8,
305 ring_off: usize,
306 tail_off: usize,
307 head_off: usize,
308 capacity: u32,
309 slot_size: u32,
310 data: &[u8],
311) -> bool {
312 debug_assert!(data.len() <= slot_size as usize);
313
314 let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
315 let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
316
317 let t = tail_a.load(Ordering::Relaxed);
318 let h = head_a.load(Ordering::Acquire);
319 if t.wrapping_sub(h) >= capacity {
320 return false;
321 }
322
323 let idx = (t % capacity) as usize;
324 let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
325
326 unsafe {
327 (slot as *mut u32).write_unaligned(data.len() as u32);
328 std::ptr::copy_nonoverlapping(data.as_ptr(), slot.add(4), data.len());
329 }
330 tail_a.store(t.wrapping_add(1), Ordering::Release);
331 true
332}
333
334unsafe fn ring_push_batch(
357 base: *mut u8,
358 ring_off: usize,
359 tail_off: usize,
360 head_off: usize,
361 capacity: u32,
362 slot_size: u32,
363 pkts: &[&[u8]],
364) -> usize {
365 if pkts.is_empty() {
366 return 0;
367 }
368 let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
369 let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
370
371 let mut t = tail_a.load(Ordering::Relaxed);
372 let h = head_a.load(Ordering::Acquire);
373 let free = capacity.wrapping_sub(t.wrapping_sub(h));
374 let to_push = (free as usize).min(pkts.len());
375 if to_push == 0 {
376 return 0;
377 }
378
379 for pkt in &pkts[..to_push] {
380 debug_assert!(pkt.len() <= slot_size as usize);
381 let idx = (t % capacity) as usize;
382 let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
383 unsafe {
384 (slot as *mut u32).write_unaligned(pkt.len() as u32);
385 std::ptr::copy_nonoverlapping(pkt.as_ptr(), slot.add(4), pkt.len());
386 }
387 t = t.wrapping_add(1);
388 }
389 tail_a.store(t, Ordering::Release);
391 to_push
392}
393
394unsafe fn ring_pop(
399 base: *mut u8,
400 ring_off: usize,
401 tail_off: usize,
402 head_off: usize,
403 capacity: u32,
404 slot_size: u32,
405) -> Option<Bytes> {
406 let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
407 let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
408
409 let h = head_a.load(Ordering::Relaxed);
410 let t = tail_a.load(Ordering::Acquire);
411 if h == t {
412 return None;
413 }
414
415 let idx = (h % capacity) as usize;
416 let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
417
418 let len = unsafe { (slot as *const u32).read_unaligned() as usize };
419 let len = len.min(slot_size as usize);
421 let data = unsafe { Bytes::copy_from_slice(std::slice::from_raw_parts(slot.add(4), len)) };
422
423 head_a.store(h.wrapping_add(1), Ordering::Release);
424 Some(data)
425}
426
427fn open_fifo_rdwr(path: &std::path::Path) -> Result<std::os::unix::io::OwnedFd, ShmError> {
436 use std::os::unix::io::{FromRawFd, OwnedFd};
437 let cpath = CString::new(path.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
438 let fd = unsafe { libc::open(cpath.as_ptr(), libc::O_RDWR | libc::O_NONBLOCK) };
439 if fd == -1 {
440 return Err(ShmError::Io(std::io::Error::last_os_error()));
441 }
442 Ok(unsafe { OwnedFd::from_raw_fd(fd) })
443}
444
445async fn pipe_await(
449 rx: &tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
450) -> std::io::Result<()> {
451 use std::os::unix::io::AsRawFd;
452 loop {
453 let mut guard = rx.readable().await?;
454 let mut buf = [0u8; 64];
455 let fd = rx.get_ref().as_raw_fd();
456 let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
457 guard.clear_ready();
458 if n > 0 {
459 return Ok(());
460 }
461 if n == 0 {
462 return Err(std::io::Error::new(
464 std::io::ErrorKind::UnexpectedEof,
465 "SHM wakeup pipe closed (peer died)",
466 ));
467 }
468 if n == -1 {
469 let err = std::io::Error::last_os_error();
470 if err.kind() != std::io::ErrorKind::WouldBlock {
471 return Err(err);
472 }
473 }
474 }
475}
476
477fn pipe_write(tx: &std::os::unix::io::OwnedFd) {
482 use std::os::unix::io::AsRawFd;
483 let b = [1u8];
484 unsafe {
485 libc::write(tx.as_raw_fd(), b.as_ptr() as *const libc::c_void, 1);
486 }
487}
488
489pub struct SpscFace {
497 id: FaceId,
498 shm: ShmRegion,
499 capacity: u32,
500 slot_size: u32,
501 a2e_off: usize,
502 e2a_off: usize,
503 a2e_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
505 e2a_tx: std::os::unix::io::OwnedFd,
507 a2e_pipe_path: PathBuf,
509 e2a_pipe_path: PathBuf,
510}
511
512impl SpscFace {
513 pub fn create(id: FaceId, name: &str) -> Result<Self, ShmError> {
518 Self::create_with(id, name, DEFAULT_CAPACITY, DEFAULT_SLOT_SIZE)
519 }
520
521 pub fn create_for_mtu(id: FaceId, name: &str, mtu: usize) -> Result<Self, ShmError> {
528 let ss = slot_size_for_mtu(mtu);
529 Self::create_with(id, name, capacity_for_slot(ss), ss)
530 }
531
532 pub fn create_with(
534 id: FaceId,
535 name: &str,
536 capacity: u32,
537 slot_size: u32,
538 ) -> Result<Self, ShmError> {
539 let size = shm_total_size(capacity, slot_size);
540 let shm = ShmRegion::create(&posix_shm_name(name), size)?;
541 unsafe {
542 shm.write_header(capacity, slot_size);
543 }
544
545 let a2e_off = a2e_ring_offset();
546 let e2a_off = e2a_ring_offset(capacity, slot_size);
547
548 use tokio::io::unix::AsyncFd;
549
550 let a2e_path = a2e_pipe_path(name);
551 let e2a_path = e2a_pipe_path(name);
552
553 let _ = std::fs::remove_file(&a2e_path);
555 let _ = std::fs::remove_file(&e2a_path);
556
557 for p in [&a2e_path, &e2a_path] {
559 let cp = CString::new(p.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
560 if unsafe { libc::mkfifo(cp.as_ptr(), 0o600) } == -1 {
561 return Err(ShmError::Io(std::io::Error::last_os_error()));
562 }
563 }
564
565 let a2e_fd = open_fifo_rdwr(&a2e_path)?;
567 let a2e_rx = AsyncFd::new(a2e_fd).map_err(ShmError::Io)?;
568
569 let e2a_tx = open_fifo_rdwr(&e2a_path)?;
571
572 Ok(SpscFace {
573 id,
574 shm,
575 capacity,
576 slot_size,
577 a2e_off,
578 e2a_off,
579 a2e_rx,
580 e2a_tx,
581 a2e_pipe_path: a2e_path,
582 e2a_pipe_path: e2a_path,
583 })
584 }
585
586 fn try_pop_a2e(&self) -> Option<Bytes> {
587 unsafe {
588 ring_pop(
589 self.shm.as_ptr(),
590 self.a2e_off,
591 OFF_A2E_TAIL,
592 OFF_A2E_HEAD,
593 self.capacity,
594 self.slot_size,
595 )
596 }
597 }
598
599 fn try_push_e2a(&self, data: &[u8]) -> bool {
600 unsafe {
601 ring_push(
602 self.shm.as_ptr(),
603 self.e2a_off,
604 OFF_E2A_TAIL,
605 OFF_E2A_HEAD,
606 self.capacity,
607 self.slot_size,
608 data,
609 )
610 }
611 }
612
613 fn try_push_batch_e2a(&self, pkts: &[&[u8]]) -> usize {
614 unsafe {
615 ring_push_batch(
616 self.shm.as_ptr(),
617 self.e2a_off,
618 OFF_E2A_TAIL,
619 OFF_E2A_HEAD,
620 self.capacity,
621 self.slot_size,
622 pkts,
623 )
624 }
625 }
626
627 pub async fn send_batch(&self, pkts: &[Bytes]) -> Result<(), FaceError> {
631 if pkts.is_empty() {
632 return Ok(());
633 }
634 for pkt in pkts {
635 if pkt.len() > self.slot_size as usize {
636 return Err(FaceError::Io(std::io::Error::new(
637 std::io::ErrorKind::InvalidInput,
638 "packet exceeds SHM slot size",
639 )));
640 }
641 }
642 let parked =
644 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
645 let views: Vec<&[u8]> = pkts.iter().map(|p| p.as_ref()).collect();
648 let mut start = 0usize;
649 while start < views.len() {
650 let pushed = self.try_push_batch_e2a(&views[start..]);
651 if pushed == 0 {
652 tokio::task::yield_now().await;
653 continue;
654 }
655 start += pushed;
656 if parked.load(Ordering::SeqCst) != 0 {
657 pipe_write(&self.e2a_tx);
658 }
659 }
660 Ok(())
661 }
662}
663
664impl Drop for SpscFace {
665 fn drop(&mut self) {
666 let _ = std::fs::remove_file(&self.a2e_pipe_path);
667 let _ = std::fs::remove_file(&self.e2a_pipe_path);
668 }
669}
670
671impl Face for SpscFace {
672 fn id(&self) -> FaceId {
673 self.id
674 }
675 fn kind(&self) -> FaceKind {
676 FaceKind::Shm
677 }
678
679 async fn recv(&self) -> Result<Bytes, FaceError> {
680 let parked =
682 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
683 loop {
684 if let Some(pkt) = self.try_pop_a2e() {
685 return Ok(pkt);
686 }
687 for _ in 0..SPIN_ITERS {
690 std::hint::spin_loop();
691 if let Some(pkt) = self.try_pop_a2e() {
692 return Ok(pkt);
693 }
694 }
695 parked.store(1, Ordering::SeqCst);
699 if let Some(pkt) = self.try_pop_a2e() {
702 parked.store(0, Ordering::Relaxed);
703 return Ok(pkt);
704 }
705
706 pipe_await(&self.a2e_rx)
708 .await
709 .map_err(|_| FaceError::Closed)?;
710
711 parked.store(0, Ordering::Relaxed);
712 }
713 }
714
715 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
716 if pkt.len() > self.slot_size as usize {
717 return Err(FaceError::Io(std::io::Error::new(
718 std::io::ErrorKind::InvalidInput,
719 "packet exceeds SHM slot size",
720 )));
721 }
722 let parked =
724 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
725 loop {
727 if self.try_push_e2a(&pkt) {
728 break;
729 }
730 tokio::task::yield_now().await;
731 }
732 if parked.load(Ordering::SeqCst) != 0 {
734 pipe_write(&self.e2a_tx);
735 }
736 Ok(())
737 }
738}
739
740pub struct SpscHandle {
751 shm: ShmRegion,
752 capacity: u32,
753 slot_size: u32,
754 a2e_off: usize,
755 e2a_off: usize,
756 e2a_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
758 a2e_tx: std::os::unix::io::OwnedFd,
760 cancel: tokio_util::sync::CancellationToken,
762}
763
764impl SpscHandle {
765 pub fn connect(name: &str) -> Result<Self, ShmError> {
767 let shm_name_str = posix_shm_name(name);
768 let cname = CString::new(shm_name_str.as_str()).map_err(|_| ShmError::InvalidName)?;
769
770 let (capacity, slot_size) = unsafe {
772 let fd = libc::shm_open(cname.as_ptr(), libc::O_RDONLY, 0);
773 if fd == -1 {
774 return Err(ShmError::Io(std::io::Error::last_os_error()));
775 }
776 let p = libc::mmap(
777 std::ptr::null_mut(),
778 HEADER_SIZE,
779 libc::PROT_READ,
780 libc::MAP_SHARED,
781 fd,
782 0,
783 );
784 libc::close(fd);
785 if p == libc::MAP_FAILED {
786 return Err(ShmError::Io(std::io::Error::last_os_error()));
787 }
788 let base = p as *const u8;
789 let magic = (base as *const u64).read_unaligned();
790 if magic != MAGIC {
791 libc::munmap(p, HEADER_SIZE);
792 return Err(ShmError::InvalidMagic);
793 }
794 let cap = (base.add(8) as *const u32).read_unaligned();
795 let slen = (base.add(12) as *const u32).read_unaligned();
796 libc::munmap(p, HEADER_SIZE);
797 (cap, slen)
798 };
799
800 let size = shm_total_size(capacity, slot_size);
802 let shm = ShmRegion::open(&shm_name_str, size)?;
803 unsafe { shm.read_header()? };
804
805 let a2e_off = a2e_ring_offset();
806 let e2a_off = e2a_ring_offset(capacity, slot_size);
807
808 use tokio::io::unix::AsyncFd;
809
810 let a2e_path = a2e_pipe_path(name); let e2a_path = e2a_pipe_path(name); let a2e_tx = open_fifo_rdwr(&a2e_path)?;
815
816 let e2a_fd = open_fifo_rdwr(&e2a_path)?;
818 let e2a_rx = AsyncFd::new(e2a_fd).map_err(ShmError::Io)?;
819
820 Ok(SpscHandle {
821 shm,
822 capacity,
823 slot_size,
824 a2e_off,
825 e2a_off,
826 e2a_rx,
827 a2e_tx,
828 cancel: tokio_util::sync::CancellationToken::new(),
829 })
830 }
831
832 pub fn set_cancel(&mut self, cancel: tokio_util::sync::CancellationToken) {
836 self.cancel = cancel;
837 }
838
839 fn try_push_a2e(&self, data: &[u8]) -> bool {
840 unsafe {
841 ring_push(
842 self.shm.as_ptr(),
843 self.a2e_off,
844 OFF_A2E_TAIL,
845 OFF_A2E_HEAD,
846 self.capacity,
847 self.slot_size,
848 data,
849 )
850 }
851 }
852
853 fn try_pop_e2a(&self) -> Option<Bytes> {
854 unsafe {
855 ring_pop(
856 self.shm.as_ptr(),
857 self.e2a_off,
858 OFF_E2A_TAIL,
859 OFF_E2A_HEAD,
860 self.capacity,
861 self.slot_size,
862 )
863 }
864 }
865
866 fn try_push_batch_a2e(&self, pkts: &[&[u8]]) -> usize {
867 unsafe {
868 ring_push_batch(
869 self.shm.as_ptr(),
870 self.a2e_off,
871 OFF_A2E_TAIL,
872 OFF_A2E_HEAD,
873 self.capacity,
874 self.slot_size,
875 pkts,
876 )
877 }
878 }
879
880 pub async fn send_batch(&self, pkts: &[Bytes]) -> Result<(), ShmError> {
905 if self.cancel.is_cancelled() {
906 return Err(ShmError::Closed);
907 }
908 if pkts.is_empty() {
909 return Ok(());
910 }
911 for pkt in pkts {
912 if pkt.len() > self.slot_size as usize {
913 return Err(ShmError::PacketTooLarge);
914 }
915 }
916 let parked =
918 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
919 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
920 let views: Vec<&[u8]> = pkts.iter().map(|p| p.as_ref()).collect();
921 let mut start = 0usize;
922 while start < views.len() {
923 let pushed = self.try_push_batch_a2e(&views[start..]);
924 if pushed == 0 {
925 if self.cancel.is_cancelled() {
926 return Err(ShmError::Closed);
927 }
928 if tokio::time::Instant::now() >= deadline {
929 return Err(ShmError::Closed);
930 }
931 tokio::task::yield_now().await;
932 continue;
933 }
934 start += pushed;
935 if parked.load(Ordering::SeqCst) != 0 {
940 pipe_write(&self.a2e_tx);
941 }
942 }
943 Ok(())
944 }
945
946 pub async fn send(&self, pkt: Bytes) -> Result<(), ShmError> {
956 if self.cancel.is_cancelled() {
957 return Err(ShmError::Closed);
958 }
959 if pkt.len() > self.slot_size as usize {
960 return Err(ShmError::PacketTooLarge);
961 }
962 let parked =
964 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
965 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
966 loop {
967 if self.try_push_a2e(&pkt) {
968 break;
969 }
970 if self.cancel.is_cancelled() {
971 return Err(ShmError::Closed);
972 }
973 if tokio::time::Instant::now() >= deadline {
974 return Err(ShmError::Closed);
975 }
976 tokio::task::yield_now().await;
977 }
978 if parked.load(Ordering::SeqCst) != 0 {
980 pipe_write(&self.a2e_tx);
981 }
982 Ok(())
983 }
984
985 pub async fn recv(&self) -> Option<Bytes> {
990 if self.cancel.is_cancelled() {
991 return None;
992 }
993 let parked =
995 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
996 loop {
997 if let Some(pkt) = self.try_pop_e2a() {
998 return Some(pkt);
999 }
1000 for _ in 0..SPIN_ITERS {
1003 std::hint::spin_loop();
1004 if let Some(pkt) = self.try_pop_e2a() {
1005 return Some(pkt);
1006 }
1007 }
1008 parked.store(1, Ordering::SeqCst);
1009 if let Some(pkt) = self.try_pop_e2a() {
1010 parked.store(0, Ordering::Relaxed);
1011 return Some(pkt);
1012 }
1013
1014 tokio::select! {
1019 result = pipe_await(&self.e2a_rx) => {
1020 parked.store(0, Ordering::Relaxed);
1021 if result.is_err() { return None; }
1022 }
1023 _ = self.cancel.cancelled() => {
1024 parked.store(0, Ordering::Relaxed);
1025 return None;
1026 }
1027 }
1028 }
1029 }
1030}
1031
1032#[cfg(test)]
1038mod tests {
1039 use super::*;
1040 use ndn_transport::Face;
1041
1042 fn test_name() -> String {
1043 format!("test-spsc-{}", std::process::id())
1045 }
1046
1047 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1049 async fn face_kind_and_id() {
1050 let name = test_name();
1051 let face = SpscFace::create(FaceId(7), &name).unwrap();
1052 assert_eq!(face.id(), FaceId(7));
1053 assert_eq!(face.kind(), FaceKind::Shm);
1054 }
1055
1056 #[test]
1057 fn slot_size_for_mtu_no_floor_clamp() {
1058 let small = slot_size_for_mtu(1024);
1061 assert_eq!(small, 17408);
1062 assert!(small < DEFAULT_SLOT_SIZE + SHM_SLOT_OVERHEAD as u32);
1063
1064 assert_eq!(slot_size_for_mtu(0), 16384);
1066 }
1067
1068 #[test]
1069 fn slot_size_for_mtu_scales_up_for_large_mtu() {
1070 let one_mib = slot_size_for_mtu(1024 * 1024);
1071 assert!(one_mib >= 1024 * 1024 + SHM_SLOT_OVERHEAD as u32);
1072 assert_eq!(one_mib % 64, 0);
1073 }
1074
1075 #[test]
1076 fn capacity_for_slot_inversely_scales() {
1077 assert_eq!(capacity_for_slot(DEFAULT_SLOT_SIZE), DEFAULT_CAPACITY);
1079 let cap_256k = capacity_for_slot(272_384);
1081 assert!(cap_256k < DEFAULT_CAPACITY);
1082 assert!(cap_256k >= 16);
1083 let cap_1m = capacity_for_slot(1_064_960);
1085 assert_eq!(cap_1m, 16);
1086 }
1087
1088 #[test]
1089 fn slot_size_for_mtu_is_cache_line_aligned() {
1090 for mtu in [256_000, 512_000, 768_000, 1_000_000, 2_000_000] {
1091 let s = slot_size_for_mtu(mtu);
1092 assert_eq!(s % 64, 0, "slot_size_for_mtu({mtu}) = {s} not 64-aligned");
1093 }
1094 }
1095
1096 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1097 async fn create_for_mtu_large_segment_roundtrip() {
1098 let name = format!("{}-big", test_name());
1103 let face = SpscFace::create_for_mtu(FaceId(42), &name, 256 * 1024).unwrap();
1104 let handle = SpscHandle::connect(&name).unwrap();
1105
1106 let payload = Bytes::from(vec![0xABu8; 260_000]);
1107 handle.send(payload.clone()).await.unwrap();
1108 let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
1109 .await
1110 .expect("timed out")
1111 .unwrap();
1112 assert_eq!(received.len(), payload.len());
1113 assert_eq!(&received[..16], &payload[..16]);
1114 }
1115
1116 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1117 async fn send_batch_app_to_engine() {
1118 let name = format!("{}-bae", test_name());
1119 let face = SpscFace::create(FaceId(20), &name).unwrap();
1120 let handle = SpscHandle::connect(&name).unwrap();
1121
1122 let pkts: Vec<Bytes> = (0u8..16)
1123 .map(|i| Bytes::from(vec![i; 64]))
1124 .collect();
1125 handle.send_batch(&pkts).await.unwrap();
1126
1127 for i in 0u8..16 {
1128 let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
1129 .await
1130 .expect("timed out")
1131 .unwrap();
1132 assert_eq!(received.len(), 64);
1133 assert_eq!(received[0], i);
1134 }
1135 }
1136
1137 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1138 async fn send_batch_engine_to_app() {
1139 let name = format!("{}-bea", test_name());
1140 let face = SpscFace::create(FaceId(21), &name).unwrap();
1141 let handle = SpscHandle::connect(&name).unwrap();
1142
1143 let pkts: Vec<Bytes> = (0u8..16)
1144 .map(|i| Bytes::from(vec![i; 64]))
1145 .collect();
1146 face.send_batch(&pkts).await.unwrap();
1147
1148 for i in 0u8..16 {
1149 let received = tokio::time::timeout(std::time::Duration::from_secs(2), handle.recv())
1150 .await
1151 .expect("timed out")
1152 .unwrap();
1153 assert_eq!(received.len(), 64);
1154 assert_eq!(received[0], i);
1155 }
1156 }
1157
1158 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1159 async fn send_batch_exceeds_ring_capacity() {
1160 let name = format!("{}-bfull", test_name());
1161 let face = SpscFace::create(FaceId(22), &name).unwrap();
1162 let handle = SpscHandle::connect(&name).unwrap();
1163
1164 let n = 48usize;
1167 let pkts: Vec<Bytes> = (0..n)
1168 .map(|i| Bytes::from(vec![(i & 0xFF) as u8; 32]))
1169 .collect();
1170
1171 let send_handle = tokio::spawn({
1174 let pkts = pkts.clone();
1175 async move { handle.send_batch(&pkts).await }
1176 });
1177 for i in 0..n {
1178 let received = tokio::time::timeout(std::time::Duration::from_secs(5), face.recv())
1179 .await
1180 .expect("timed out")
1181 .unwrap();
1182 assert_eq!(received[0], (i & 0xFF) as u8);
1183 }
1184 send_handle.await.unwrap().unwrap();
1185 }
1186
1187 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1188 async fn app_to_engine_roundtrip() {
1189 let name = format!("{}-ae", test_name());
1190 let face = SpscFace::create(FaceId(1), &name).unwrap();
1191 let handle = SpscHandle::connect(&name).unwrap();
1192
1193 let pkt = Bytes::from_static(b"\x05\x03\x01\x02\x03");
1194 handle.send(pkt.clone()).await.unwrap();
1195
1196 let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
1197 .await
1198 .expect("timed out")
1199 .unwrap();
1200
1201 assert_eq!(received, pkt);
1202 }
1203
1204 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1205 async fn engine_to_app_roundtrip() {
1206 let name = format!("{}-ea", test_name());
1207 let face = SpscFace::create(FaceId(2), &name).unwrap();
1208 let handle = SpscHandle::connect(&name).unwrap();
1209
1210 let pkt = Bytes::from_static(b"\x06\x03\xAA\xBB\xCC");
1211 face.send(pkt.clone()).await.unwrap();
1212
1213 let received = tokio::time::timeout(std::time::Duration::from_secs(2), handle.recv())
1214 .await
1215 .expect("timed out")
1216 .unwrap();
1217
1218 assert_eq!(received, pkt);
1219 }
1220
1221 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1222 async fn multiple_packets_both_directions() {
1223 let name = format!("{}-bi", test_name());
1224 let face = SpscFace::create(FaceId(3), &name).unwrap();
1225 let handle = SpscHandle::connect(&name).unwrap();
1226
1227 for i in 0u8..4 {
1229 handle.send(Bytes::from(vec![i; 64])).await.unwrap();
1230 }
1231 for i in 0u8..4 {
1232 let pkt = face.recv().await.unwrap();
1233 assert_eq!(&pkt[..], &vec![i; 64][..]);
1234 }
1235
1236 for i in 0u8..4 {
1238 face.send(Bytes::from(vec![i + 10; 128])).await.unwrap();
1239 }
1240 for i in 0u8..4 {
1241 let pkt = handle.recv().await.unwrap();
1242 assert_eq!(&pkt[..], &vec![i + 10; 128][..]);
1243 }
1244 }
1245}