1use std::sync::Mutex;
8use std::time::Duration;
9
10use bytes::Bytes;
11use ndn_transport::{Face, FaceError, FaceId, FaceKind};
12use rand::Rng;
13use tokio::sync::mpsc;
14use tracing::trace;
15
16use crate::sim_link::LinkConfig;
17
18pub struct SimFace {
24 id: FaceId,
25 tx: mpsc::Sender<Bytes>,
27 rx: tokio::sync::Mutex<mpsc::Receiver<Bytes>>,
29 config: LinkConfig,
31 next_tx_ready: Mutex<tokio::time::Instant>,
34}
35
36impl SimFace {
37 pub(crate) fn new(
38 id: FaceId,
39 tx: mpsc::Sender<Bytes>,
40 rx: mpsc::Receiver<Bytes>,
41 config: LinkConfig,
42 ) -> Self {
43 Self {
44 id,
45 tx,
46 rx: tokio::sync::Mutex::new(rx),
47 config,
48 next_tx_ready: Mutex::new(tokio::time::Instant::now()),
49 }
50 }
51}
52
53impl Face for SimFace {
54 fn id(&self) -> FaceId {
55 self.id
56 }
57
58 fn kind(&self) -> FaceKind {
59 FaceKind::Internal
60 }
61
62 fn remote_uri(&self) -> Option<String> {
63 Some(format!("sim://face#{}", self.id.0))
64 }
65
66 async fn recv(&self) -> Result<Bytes, FaceError> {
67 self.rx.lock().await.recv().await.ok_or(FaceError::Closed)
68 }
69
70 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
71 if self.config.loss_rate > 0.0 {
73 let roll: f64 = rand::rng().random();
74 if roll < self.config.loss_rate {
75 trace!(face = %self.id, "SimFace: packet dropped (loss)");
76 return Ok(());
77 }
78 }
79
80 let deliver_delay = if self.config.bandwidth_bps > 0 {
84 let pkt_bits = (pkt.len() as u64) * 8;
85 let tx_duration =
86 Duration::from_nanos(pkt_bits * 1_000_000_000 / self.config.bandwidth_bps);
87
88 let now = tokio::time::Instant::now();
89 let tx_start = {
90 let mut next = self.next_tx_ready.lock().unwrap();
91 if *next < now {
92 *next = now;
93 }
94 let start = *next;
95 *next = start + tx_duration;
96 start
97 };
98
99 let wait_for_tx = tx_start.saturating_duration_since(now);
101 wait_for_tx + self.config.delay + random_jitter(self.config.jitter)
102 } else {
103 self.config.delay + random_jitter(self.config.jitter)
104 };
105
106 if deliver_delay.is_zero() {
108 self.tx.send(pkt).await.map_err(|_| FaceError::Closed)
110 } else {
111 let tx = self.tx.clone();
113 let face_id = self.id;
114 tokio::spawn(async move {
115 tokio::time::sleep(deliver_delay).await;
116 if tx.send(pkt).await.is_err() {
117 trace!(face = %face_id, "SimFace: remote end closed during delayed delivery");
118 }
119 });
120 Ok(())
121 }
122 }
123}
124
125fn random_jitter(max_jitter: Duration) -> Duration {
127 if max_jitter.is_zero() {
128 return Duration::ZERO;
129 }
130 let nanos = rand::rng().random_range(0..=max_jitter.as_nanos() as u64);
131 Duration::from_nanos(nanos)
132}