ndn_sim/
sim_face.rs

1//! `SimFace` — a simulated face with configurable link properties.
2//!
3//! Each `SimFace` is one endpoint of a [`SimLink`](crate::SimLink). Packets
4//! sent through a `SimFace` are subject to delay, jitter, loss, and bandwidth
5//! constraints before arriving at the remote end.
6
7use std::sync::Mutex;
8use std::time::Duration;
9
10use bytes::Bytes;
11use ndn_transport::{Face, FaceError, FaceId, FaceKind};
12use rand::Rng;
13use tokio::sync::mpsc;
14use tracing::trace;
15
16use crate::sim_link::LinkConfig;
17
18/// A simulated face implementing the [`Face`] trait.
19///
20/// Created in pairs by [`SimLink::pair`](crate::SimLink::pair). Internally
21/// backed by Tokio MPSC channels with link-property emulation applied in the
22/// send path.
23pub struct SimFace {
24    id: FaceId,
25    /// Channel to deliver packets to the remote face's recv.
26    tx: mpsc::Sender<Bytes>,
27    /// Channel to receive packets from the remote face's send.
28    rx: tokio::sync::Mutex<mpsc::Receiver<Bytes>>,
29    /// Link properties applied when sending.
30    config: LinkConfig,
31    /// Bandwidth state: earliest time the next byte can start transmitting.
32    /// Protected by a std Mutex since we only hold it briefly for arithmetic.
33    next_tx_ready: Mutex<tokio::time::Instant>,
34}
35
36impl SimFace {
37    pub(crate) fn new(
38        id: FaceId,
39        tx: mpsc::Sender<Bytes>,
40        rx: mpsc::Receiver<Bytes>,
41        config: LinkConfig,
42    ) -> Self {
43        Self {
44            id,
45            tx,
46            rx: tokio::sync::Mutex::new(rx),
47            config,
48            next_tx_ready: Mutex::new(tokio::time::Instant::now()),
49        }
50    }
51}
52
53impl Face for SimFace {
54    fn id(&self) -> FaceId {
55        self.id
56    }
57
58    fn kind(&self) -> FaceKind {
59        FaceKind::Internal
60    }
61
62    fn remote_uri(&self) -> Option<String> {
63        Some(format!("sim://face#{}", self.id.0))
64    }
65
66    async fn recv(&self) -> Result<Bytes, FaceError> {
67        self.rx.lock().await.recv().await.ok_or(FaceError::Closed)
68    }
69
70    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
71        // ── Loss ─────────────────────────────────────────────────────────────
72        if self.config.loss_rate > 0.0 {
73            let roll: f64 = rand::rng().random();
74            if roll < self.config.loss_rate {
75                trace!(face = %self.id, "SimFace: packet dropped (loss)");
76                return Ok(());
77            }
78        }
79
80        // ── Bandwidth shaping ────────────────────────────────────────────────
81        // Calculate when this packet can start transmitting and when it
82        // finishes. The "next_tx_ready" cursor serialises transmissions.
83        let deliver_delay = if self.config.bandwidth_bps > 0 {
84            let pkt_bits = (pkt.len() as u64) * 8;
85            let tx_duration =
86                Duration::from_nanos(pkt_bits * 1_000_000_000 / self.config.bandwidth_bps);
87
88            let now = tokio::time::Instant::now();
89            let tx_start = {
90                let mut next = self.next_tx_ready.lock().unwrap();
91                if *next < now {
92                    *next = now;
93                }
94                let start = *next;
95                *next = start + tx_duration;
96                start
97            };
98
99            // The packet "arrives" at: tx_start + propagation_delay + jitter
100            let wait_for_tx = tx_start.saturating_duration_since(now);
101            wait_for_tx + self.config.delay + random_jitter(self.config.jitter)
102        } else {
103            self.config.delay + random_jitter(self.config.jitter)
104        };
105
106        // ── Deliver with delay ───────────────────────────────────────────────
107        if deliver_delay.is_zero() {
108            // Fast path: no delay, send directly.
109            self.tx.send(pkt).await.map_err(|_| FaceError::Closed)
110        } else {
111            // Spawn a background task so send() returns immediately.
112            let tx = self.tx.clone();
113            let face_id = self.id;
114            tokio::spawn(async move {
115                tokio::time::sleep(deliver_delay).await;
116                if tx.send(pkt).await.is_err() {
117                    trace!(face = %face_id, "SimFace: remote end closed during delayed delivery");
118                }
119            });
120            Ok(())
121        }
122    }
123}
124
125/// Generate a random jitter in `[0, max_jitter]`.
126fn random_jitter(max_jitter: Duration) -> Duration {
127    if max_jitter.is_zero() {
128        return Duration::ZERO;
129    }
130    let nanos = rand::rng().random_range(0..=max_jitter.as_nanos() as u64);
131    Duration::from_nanos(nanos)
132}