1use std::ffi::CString;
38use std::path::PathBuf;
39use std::sync::atomic::{AtomicU32, Ordering};
40
41use bytes::Bytes;
42
43use ndn_transport::{Face, FaceError, FaceId, FaceKind};
44
45use crate::shm::ShmError;
46
47const MAGIC: u64 = 0x4E44_4E5F_5348_4D00; pub const DEFAULT_CAPACITY: u32 = 256;
61pub const DEFAULT_SLOT_SIZE: u32 = 8960;
63
64const OFF_A2E_TAIL: usize = 64; const OFF_A2E_HEAD: usize = 128; const OFF_E2A_TAIL: usize = 192; const OFF_E2A_HEAD: usize = 256; const OFF_A2E_PARKED: usize = 320; const OFF_E2A_PARKED: usize = 384; const HEADER_SIZE: usize = 448; fn slot_stride(slot_size: u32) -> usize {
75 4 + slot_size as usize
76}
77
78const SPIN_ITERS: u32 = 64;
83
84fn shm_total_size(capacity: u32, slot_size: u32) -> usize {
85 HEADER_SIZE + 2 * capacity as usize * slot_stride(slot_size)
86}
87
88fn a2e_ring_offset() -> usize {
89 HEADER_SIZE
90}
91fn e2a_ring_offset(capacity: u32, slot_size: u32) -> usize {
92 HEADER_SIZE + capacity as usize * slot_stride(slot_size)
93}
94
95fn posix_shm_name(name: &str) -> String {
98 format!("/ndn-shm-{name}")
99}
100
101fn a2e_pipe_path(name: &str) -> PathBuf {
103 PathBuf::from(format!("/tmp/.ndn-{name}.a2e.pipe"))
104}
105
106fn e2a_pipe_path(name: &str) -> PathBuf {
108 PathBuf::from(format!("/tmp/.ndn-{name}.e2a.pipe"))
109}
110
111struct ShmRegion {
115 ptr: *mut u8,
116 size: usize,
117 shm_name: Option<CString>,
119}
120
121unsafe impl Send for ShmRegion {}
122unsafe impl Sync for ShmRegion {}
123
124impl ShmRegion {
125 fn create(shm_name: &str, size: usize) -> Result<Self, ShmError> {
127 let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
128 let ptr = unsafe {
129 let fd = libc::shm_open(
130 cname.as_ptr(),
131 libc::O_CREAT | libc::O_RDWR | libc::O_TRUNC,
132 0o666 as libc::mode_t as libc::c_uint,
136 );
137 if fd == -1 {
138 return Err(ShmError::Io(std::io::Error::last_os_error()));
139 }
140
141 if libc::ftruncate(fd, size as libc::off_t) == -1 {
142 libc::close(fd);
143 return Err(ShmError::Io(std::io::Error::last_os_error()));
144 }
145
146 let p = libc::mmap(
147 std::ptr::null_mut(),
148 size,
149 libc::PROT_READ | libc::PROT_WRITE,
150 libc::MAP_SHARED,
151 fd,
152 0,
153 );
154 libc::close(fd);
155 if p == libc::MAP_FAILED {
156 return Err(ShmError::Io(std::io::Error::last_os_error()));
157 }
158 p as *mut u8
159 };
160 Ok(ShmRegion {
161 ptr,
162 size,
163 shm_name: Some(cname),
164 })
165 }
166
167 fn open(shm_name: &str, size: usize) -> Result<Self, ShmError> {
169 let cname = CString::new(shm_name).map_err(|_| ShmError::InvalidName)?;
170 let ptr = unsafe {
171 let fd = libc::shm_open(cname.as_ptr(), libc::O_RDWR, 0);
172 if fd == -1 {
173 return Err(ShmError::Io(std::io::Error::last_os_error()));
174 }
175
176 let p = libc::mmap(
177 std::ptr::null_mut(),
178 size,
179 libc::PROT_READ | libc::PROT_WRITE,
180 libc::MAP_SHARED,
181 fd,
182 0,
183 );
184 libc::close(fd);
185 if p == libc::MAP_FAILED {
186 return Err(ShmError::Io(std::io::Error::last_os_error()));
187 }
188 p as *mut u8
189 };
190 Ok(ShmRegion {
191 ptr,
192 size,
193 shm_name: None,
194 })
195 }
196
197 fn as_ptr(&self) -> *mut u8 {
198 self.ptr
199 }
200
201 unsafe fn write_header(&self, capacity: u32, slot_size: u32) {
207 unsafe {
208 (self.ptr as *mut u64).write_unaligned(MAGIC);
209 (self.ptr.add(8) as *mut u32).write_unaligned(capacity);
210 (self.ptr.add(12) as *mut u32).write_unaligned(slot_size);
211 }
212 }
214
215 unsafe fn read_header(&self) -> Result<(u32, u32), ShmError> {
220 unsafe {
221 let magic = (self.ptr as *const u64).read_unaligned();
222 if magic != MAGIC {
223 return Err(ShmError::InvalidMagic);
224 }
225 let capacity = (self.ptr.add(8) as *const u32).read_unaligned();
226 let slot_size = (self.ptr.add(12) as *const u32).read_unaligned();
227 Ok((capacity, slot_size))
228 }
229 }
230}
231
232impl Drop for ShmRegion {
233 fn drop(&mut self) {
234 unsafe {
235 libc::munmap(self.ptr as *mut libc::c_void, self.size);
236 if let Some(ref n) = self.shm_name {
237 libc::shm_unlink(n.as_ptr());
238 }
239 }
240 }
241}
242
243unsafe fn ring_push(
252 base: *mut u8,
253 ring_off: usize,
254 tail_off: usize,
255 head_off: usize,
256 capacity: u32,
257 slot_size: u32,
258 data: &[u8],
259) -> bool {
260 debug_assert!(data.len() <= slot_size as usize);
261
262 let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
263 let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
264
265 let t = tail_a.load(Ordering::Relaxed);
266 let h = head_a.load(Ordering::Acquire);
267 if t.wrapping_sub(h) >= capacity {
268 return false;
269 }
270
271 let idx = (t % capacity) as usize;
272 let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
273
274 unsafe {
275 (slot as *mut u32).write_unaligned(data.len() as u32);
276 std::ptr::copy_nonoverlapping(data.as_ptr(), slot.add(4), data.len());
277 }
278 tail_a.store(t.wrapping_add(1), Ordering::Release);
279 true
280}
281
282unsafe fn ring_pop(
287 base: *mut u8,
288 ring_off: usize,
289 tail_off: usize,
290 head_off: usize,
291 capacity: u32,
292 slot_size: u32,
293) -> Option<Bytes> {
294 let tail_a = unsafe { AtomicU32::from_ptr(base.add(tail_off) as *mut u32) };
295 let head_a = unsafe { AtomicU32::from_ptr(base.add(head_off) as *mut u32) };
296
297 let h = head_a.load(Ordering::Relaxed);
298 let t = tail_a.load(Ordering::Acquire);
299 if h == t {
300 return None;
301 }
302
303 let idx = (h % capacity) as usize;
304 let slot = unsafe { base.add(ring_off + idx * slot_stride(slot_size)) };
305
306 let len = unsafe { (slot as *const u32).read_unaligned() as usize };
307 let len = len.min(slot_size as usize);
309 let data = unsafe { Bytes::copy_from_slice(std::slice::from_raw_parts(slot.add(4), len)) };
310
311 head_a.store(h.wrapping_add(1), Ordering::Release);
312 Some(data)
313}
314
315fn open_fifo_rdwr(path: &std::path::Path) -> Result<std::os::unix::io::OwnedFd, ShmError> {
324 use std::os::unix::io::{FromRawFd, OwnedFd};
325 let cpath = CString::new(path.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
326 let fd = unsafe { libc::open(cpath.as_ptr(), libc::O_RDWR | libc::O_NONBLOCK) };
327 if fd == -1 {
328 return Err(ShmError::Io(std::io::Error::last_os_error()));
329 }
330 Ok(unsafe { OwnedFd::from_raw_fd(fd) })
331}
332
333async fn pipe_await(
337 rx: &tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
338) -> std::io::Result<()> {
339 use std::os::unix::io::AsRawFd;
340 loop {
341 let mut guard = rx.readable().await?;
342 let mut buf = [0u8; 64];
343 let fd = rx.get_ref().as_raw_fd();
344 let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
345 guard.clear_ready();
346 if n > 0 {
347 return Ok(());
348 }
349 if n == 0 {
350 return Err(std::io::Error::new(
352 std::io::ErrorKind::UnexpectedEof,
353 "SHM wakeup pipe closed (peer died)",
354 ));
355 }
356 if n == -1 {
357 let err = std::io::Error::last_os_error();
358 if err.kind() != std::io::ErrorKind::WouldBlock {
359 return Err(err);
360 }
361 }
362 }
363}
364
365fn pipe_write(tx: &std::os::unix::io::OwnedFd) {
370 use std::os::unix::io::AsRawFd;
371 let b = [1u8];
372 unsafe {
373 libc::write(tx.as_raw_fd(), b.as_ptr() as *const libc::c_void, 1);
374 }
375}
376
377pub struct SpscFace {
385 id: FaceId,
386 shm: ShmRegion,
387 capacity: u32,
388 slot_size: u32,
389 a2e_off: usize,
390 e2a_off: usize,
391 a2e_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
393 e2a_tx: std::os::unix::io::OwnedFd,
395 a2e_pipe_path: PathBuf,
397 e2a_pipe_path: PathBuf,
398}
399
400impl SpscFace {
401 pub fn create(id: FaceId, name: &str) -> Result<Self, ShmError> {
406 Self::create_with(id, name, DEFAULT_CAPACITY, DEFAULT_SLOT_SIZE)
407 }
408
409 pub fn create_with(
411 id: FaceId,
412 name: &str,
413 capacity: u32,
414 slot_size: u32,
415 ) -> Result<Self, ShmError> {
416 let size = shm_total_size(capacity, slot_size);
417 let shm = ShmRegion::create(&posix_shm_name(name), size)?;
418 unsafe {
419 shm.write_header(capacity, slot_size);
420 }
421
422 let a2e_off = a2e_ring_offset();
423 let e2a_off = e2a_ring_offset(capacity, slot_size);
424
425 use tokio::io::unix::AsyncFd;
426
427 let a2e_path = a2e_pipe_path(name);
428 let e2a_path = e2a_pipe_path(name);
429
430 let _ = std::fs::remove_file(&a2e_path);
432 let _ = std::fs::remove_file(&e2a_path);
433
434 for p in [&a2e_path, &e2a_path] {
436 let cp = CString::new(p.to_str().unwrap_or("")).map_err(|_| ShmError::InvalidName)?;
437 if unsafe { libc::mkfifo(cp.as_ptr(), 0o600) } == -1 {
438 return Err(ShmError::Io(std::io::Error::last_os_error()));
439 }
440 }
441
442 let a2e_fd = open_fifo_rdwr(&a2e_path)?;
444 let a2e_rx = AsyncFd::new(a2e_fd).map_err(ShmError::Io)?;
445
446 let e2a_tx = open_fifo_rdwr(&e2a_path)?;
448
449 Ok(SpscFace {
450 id,
451 shm,
452 capacity,
453 slot_size,
454 a2e_off,
455 e2a_off,
456 a2e_rx,
457 e2a_tx,
458 a2e_pipe_path: a2e_path,
459 e2a_pipe_path: e2a_path,
460 })
461 }
462
463 fn try_pop_a2e(&self) -> Option<Bytes> {
464 unsafe {
465 ring_pop(
466 self.shm.as_ptr(),
467 self.a2e_off,
468 OFF_A2E_TAIL,
469 OFF_A2E_HEAD,
470 self.capacity,
471 self.slot_size,
472 )
473 }
474 }
475
476 fn try_push_e2a(&self, data: &[u8]) -> bool {
477 unsafe {
478 ring_push(
479 self.shm.as_ptr(),
480 self.e2a_off,
481 OFF_E2A_TAIL,
482 OFF_E2A_HEAD,
483 self.capacity,
484 self.slot_size,
485 data,
486 )
487 }
488 }
489}
490
491impl Drop for SpscFace {
492 fn drop(&mut self) {
493 let _ = std::fs::remove_file(&self.a2e_pipe_path);
494 let _ = std::fs::remove_file(&self.e2a_pipe_path);
495 }
496}
497
498impl Face for SpscFace {
499 fn id(&self) -> FaceId {
500 self.id
501 }
502 fn kind(&self) -> FaceKind {
503 FaceKind::Shm
504 }
505
506 async fn recv(&self) -> Result<Bytes, FaceError> {
507 let parked =
509 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
510 loop {
511 if let Some(pkt) = self.try_pop_a2e() {
512 return Ok(pkt);
513 }
514 for _ in 0..SPIN_ITERS {
517 std::hint::spin_loop();
518 if let Some(pkt) = self.try_pop_a2e() {
519 return Ok(pkt);
520 }
521 }
522 parked.store(1, Ordering::SeqCst);
526 if let Some(pkt) = self.try_pop_a2e() {
529 parked.store(0, Ordering::Relaxed);
530 return Ok(pkt);
531 }
532
533 pipe_await(&self.a2e_rx)
535 .await
536 .map_err(|_| FaceError::Closed)?;
537
538 parked.store(0, Ordering::Relaxed);
539 }
540 }
541
542 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
543 if pkt.len() > self.slot_size as usize {
544 return Err(FaceError::Io(std::io::Error::new(
545 std::io::ErrorKind::InvalidInput,
546 "packet exceeds SHM slot size",
547 )));
548 }
549 let parked =
551 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
552 loop {
554 if self.try_push_e2a(&pkt) {
555 break;
556 }
557 tokio::task::yield_now().await;
558 }
559 if parked.load(Ordering::SeqCst) != 0 {
561 pipe_write(&self.e2a_tx);
562 }
563 Ok(())
564 }
565}
566
567pub struct SpscHandle {
578 shm: ShmRegion,
579 capacity: u32,
580 slot_size: u32,
581 a2e_off: usize,
582 e2a_off: usize,
583 e2a_rx: tokio::io::unix::AsyncFd<std::os::unix::io::OwnedFd>,
585 a2e_tx: std::os::unix::io::OwnedFd,
587 cancel: tokio_util::sync::CancellationToken,
589}
590
591impl SpscHandle {
592 pub fn connect(name: &str) -> Result<Self, ShmError> {
594 let shm_name_str = posix_shm_name(name);
595 let cname = CString::new(shm_name_str.as_str()).map_err(|_| ShmError::InvalidName)?;
596
597 let (capacity, slot_size) = unsafe {
599 let fd = libc::shm_open(cname.as_ptr(), libc::O_RDONLY, 0);
600 if fd == -1 {
601 return Err(ShmError::Io(std::io::Error::last_os_error()));
602 }
603 let p = libc::mmap(
604 std::ptr::null_mut(),
605 HEADER_SIZE,
606 libc::PROT_READ,
607 libc::MAP_SHARED,
608 fd,
609 0,
610 );
611 libc::close(fd);
612 if p == libc::MAP_FAILED {
613 return Err(ShmError::Io(std::io::Error::last_os_error()));
614 }
615 let base = p as *const u8;
616 let magic = (base as *const u64).read_unaligned();
617 if magic != MAGIC {
618 libc::munmap(p, HEADER_SIZE);
619 return Err(ShmError::InvalidMagic);
620 }
621 let cap = (base.add(8) as *const u32).read_unaligned();
622 let slen = (base.add(12) as *const u32).read_unaligned();
623 libc::munmap(p, HEADER_SIZE);
624 (cap, slen)
625 };
626
627 let size = shm_total_size(capacity, slot_size);
629 let shm = ShmRegion::open(&shm_name_str, size)?;
630 unsafe { shm.read_header()? };
631
632 let a2e_off = a2e_ring_offset();
633 let e2a_off = e2a_ring_offset(capacity, slot_size);
634
635 use tokio::io::unix::AsyncFd;
636
637 let a2e_path = a2e_pipe_path(name); let e2a_path = e2a_pipe_path(name); let a2e_tx = open_fifo_rdwr(&a2e_path)?;
642
643 let e2a_fd = open_fifo_rdwr(&e2a_path)?;
645 let e2a_rx = AsyncFd::new(e2a_fd).map_err(ShmError::Io)?;
646
647 Ok(SpscHandle {
648 shm,
649 capacity,
650 slot_size,
651 a2e_off,
652 e2a_off,
653 e2a_rx,
654 a2e_tx,
655 cancel: tokio_util::sync::CancellationToken::new(),
656 })
657 }
658
659 pub fn set_cancel(&mut self, cancel: tokio_util::sync::CancellationToken) {
663 self.cancel = cancel;
664 }
665
666 fn try_push_a2e(&self, data: &[u8]) -> bool {
667 unsafe {
668 ring_push(
669 self.shm.as_ptr(),
670 self.a2e_off,
671 OFF_A2E_TAIL,
672 OFF_A2E_HEAD,
673 self.capacity,
674 self.slot_size,
675 data,
676 )
677 }
678 }
679
680 fn try_pop_e2a(&self) -> Option<Bytes> {
681 unsafe {
682 ring_pop(
683 self.shm.as_ptr(),
684 self.e2a_off,
685 OFF_E2A_TAIL,
686 OFF_E2A_HEAD,
687 self.capacity,
688 self.slot_size,
689 )
690 }
691 }
692
693 pub async fn send(&self, pkt: Bytes) -> Result<(), ShmError> {
703 if self.cancel.is_cancelled() {
704 return Err(ShmError::Closed);
705 }
706 if pkt.len() > self.slot_size as usize {
707 return Err(ShmError::PacketTooLarge);
708 }
709 let parked =
711 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_A2E_PARKED) as *mut u32) };
712 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
713 loop {
714 if self.try_push_a2e(&pkt) {
715 break;
716 }
717 if self.cancel.is_cancelled() {
718 return Err(ShmError::Closed);
719 }
720 if tokio::time::Instant::now() >= deadline {
721 return Err(ShmError::Closed);
722 }
723 tokio::task::yield_now().await;
724 }
725 if parked.load(Ordering::SeqCst) != 0 {
727 pipe_write(&self.a2e_tx);
728 }
729 Ok(())
730 }
731
732 pub async fn recv(&self) -> Option<Bytes> {
737 if self.cancel.is_cancelled() {
738 return None;
739 }
740 let parked =
742 unsafe { AtomicU32::from_ptr(self.shm.as_ptr().add(OFF_E2A_PARKED) as *mut u32) };
743 loop {
744 if let Some(pkt) = self.try_pop_e2a() {
745 return Some(pkt);
746 }
747 for _ in 0..SPIN_ITERS {
750 std::hint::spin_loop();
751 if let Some(pkt) = self.try_pop_e2a() {
752 return Some(pkt);
753 }
754 }
755 parked.store(1, Ordering::SeqCst);
756 if let Some(pkt) = self.try_pop_e2a() {
757 parked.store(0, Ordering::Relaxed);
758 return Some(pkt);
759 }
760
761 tokio::select! {
766 result = pipe_await(&self.e2a_rx) => {
767 parked.store(0, Ordering::Relaxed);
768 if result.is_err() { return None; }
769 }
770 _ = self.cancel.cancelled() => {
771 parked.store(0, Ordering::Relaxed);
772 return None;
773 }
774 }
775 }
776 }
777}
778
779#[cfg(test)]
785mod tests {
786 use super::*;
787 use ndn_transport::Face;
788
789 fn test_name() -> String {
790 format!("test-spsc-{}", std::process::id())
792 }
793
794 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
796 async fn face_kind_and_id() {
797 let name = test_name();
798 let face = SpscFace::create(FaceId(7), &name).unwrap();
799 assert_eq!(face.id(), FaceId(7));
800 assert_eq!(face.kind(), FaceKind::Shm);
801 }
802
803 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
804 async fn app_to_engine_roundtrip() {
805 let name = format!("{}-ae", test_name());
806 let face = SpscFace::create(FaceId(1), &name).unwrap();
807 let handle = SpscHandle::connect(&name).unwrap();
808
809 let pkt = Bytes::from_static(b"\x05\x03\x01\x02\x03");
810 handle.send(pkt.clone()).await.unwrap();
811
812 let received = tokio::time::timeout(std::time::Duration::from_secs(2), face.recv())
813 .await
814 .expect("timed out")
815 .unwrap();
816
817 assert_eq!(received, pkt);
818 }
819
820 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
821 async fn engine_to_app_roundtrip() {
822 let name = format!("{}-ea", test_name());
823 let face = SpscFace::create(FaceId(2), &name).unwrap();
824 let handle = SpscHandle::connect(&name).unwrap();
825
826 let pkt = Bytes::from_static(b"\x06\x03\xAA\xBB\xCC");
827 face.send(pkt.clone()).await.unwrap();
828
829 let received = tokio::time::timeout(std::time::Duration::from_secs(2), handle.recv())
830 .await
831 .expect("timed out")
832 .unwrap();
833
834 assert_eq!(received, pkt);
835 }
836
837 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
838 async fn multiple_packets_both_directions() {
839 let name = format!("{}-bi", test_name());
840 let face = SpscFace::create(FaceId(3), &name).unwrap();
841 let handle = SpscHandle::connect(&name).unwrap();
842
843 for i in 0u8..4 {
845 handle.send(Bytes::from(vec![i; 64])).await.unwrap();
846 }
847 for i in 0u8..4 {
848 let pkt = face.recv().await.unwrap();
849 assert_eq!(&pkt[..], &vec![i; 64][..]);
850 }
851
852 for i in 0u8..4 {
854 face.send(Bytes::from(vec![i + 10; 128])).await.unwrap();
855 }
856 for i in 0u8..4 {
857 let pkt = handle.recv().await.unwrap();
858 assert_eq!(&pkt[..], &vec![i + 10; 128][..]);
859 }
860 }
861}