ndn_transport/
face_table.rs

1#[cfg(not(target_arch = "wasm32"))]
2use dashmap::DashMap;
3use std::sync::{Arc, Mutex};
4
5use crate::{Face, FaceId};
6
7/// Result type for [`ErasedFace::recv_bytes_with_addr`].
8type RecvWithAddrResult =
9    Result<(bytes::Bytes, Option<crate::face::FaceAddr>), crate::face::FaceError>;
10
11/// Concurrent map from `FaceId` to a type-erased face handle.
12///
13/// Pipeline stages clone the `Arc<dyn ErasedFace>` out of the table and
14/// release the table reference before calling `send()`, so no lock is held
15/// during I/O.
16///
17/// Face IDs are recycled: when a face is removed its ID is returned to a free
18/// list and reused by the next `alloc_id()` call.  Reserved IDs
19/// (`>= 0xFFFF_0000`) are never allocated by `alloc_id()` and are used for
20/// internal engine faces (e.g. the management `AppFace`).
21pub struct FaceTable {
22    #[cfg(not(target_arch = "wasm32"))]
23    faces: DashMap<FaceId, Arc<dyn ErasedFace>>,
24    #[cfg(target_arch = "wasm32")]
25    faces: Mutex<std::collections::HashMap<FaceId, Arc<dyn ErasedFace>>>,
26    next_id: std::sync::atomic::AtomicU32,
27    free: Mutex<Vec<u32>>,
28}
29
30/// Object-safe wrapper around the `Face` trait so it can be stored in the face table.
31pub trait ErasedFace: Send + Sync + 'static {
32    fn id(&self) -> FaceId;
33    fn kind(&self) -> crate::face::FaceKind;
34    fn remote_uri(&self) -> Option<String>;
35    fn local_uri(&self) -> Option<String>;
36    fn send_bytes(
37        &self,
38        pkt: bytes::Bytes,
39    ) -> std::pin::Pin<
40        Box<dyn std::future::Future<Output = Result<(), crate::face::FaceError>> + Send + '_>,
41    >;
42    fn recv_bytes(
43        &self,
44    ) -> std::pin::Pin<
45        Box<
46            dyn std::future::Future<Output = Result<bytes::Bytes, crate::face::FaceError>>
47                + Send
48                + '_,
49        >,
50    >;
51
52    /// Object-safe version of [`Face::recv_with_addr`].
53    ///
54    /// Returns the raw packet together with the link-layer sender address
55    /// when the face type exposes it (e.g. multicast UDP). Returns `None`
56    /// for faces that receive from a single known peer.
57    fn recv_bytes_with_addr(
58        &self,
59    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = RecvWithAddrResult> + Send + '_>>;
60}
61
62impl<F: Face> ErasedFace for F {
63    fn id(&self) -> FaceId {
64        Face::id(self)
65    }
66
67    fn kind(&self) -> crate::face::FaceKind {
68        Face::kind(self)
69    }
70
71    fn remote_uri(&self) -> Option<String> {
72        Face::remote_uri(self)
73    }
74
75    fn local_uri(&self) -> Option<String> {
76        Face::local_uri(self)
77    }
78
79    fn send_bytes(
80        &self,
81        pkt: bytes::Bytes,
82    ) -> std::pin::Pin<
83        Box<dyn std::future::Future<Output = Result<(), crate::face::FaceError>> + Send + '_>,
84    > {
85        Box::pin(Face::send(self, pkt))
86    }
87
88    fn recv_bytes(
89        &self,
90    ) -> std::pin::Pin<
91        Box<
92            dyn std::future::Future<Output = Result<bytes::Bytes, crate::face::FaceError>>
93                + Send
94                + '_,
95        >,
96    > {
97        Box::pin(Face::recv(self))
98    }
99
100    fn recv_bytes_with_addr(
101        &self,
102    ) -> std::pin::Pin<
103        Box<
104            dyn std::future::Future<
105                    Output = Result<
106                        (bytes::Bytes, Option<crate::face::FaceAddr>),
107                        crate::face::FaceError,
108                    >,
109                > + Send
110                + '_,
111        >,
112    > {
113        Box::pin(Face::recv_with_addr(self))
114    }
115}
116
117/// Snapshot of a face's metadata for reporting/display.
118#[derive(Debug, Clone)]
119pub struct FaceInfo {
120    pub id: FaceId,
121    pub kind: crate::face::FaceKind,
122    pub remote_uri: Option<String>,
123    pub local_uri: Option<String>,
124}
125
126/// Reserved face ID range used for internal engine faces (management AppFace, etc.).
127/// IDs in this range are never allocated by `alloc_id()`.
128pub const RESERVED_FACE_ID_MIN: u32 = 0xFFFF_0000;
129
130impl FaceTable {
131    pub fn new() -> Self {
132        Self {
133            #[cfg(not(target_arch = "wasm32"))]
134            faces: DashMap::new(),
135            #[cfg(target_arch = "wasm32")]
136            faces: Mutex::new(std::collections::HashMap::new()),
137            next_id: std::sync::atomic::AtomicU32::new(1),
138            free: Mutex::new(Vec::new()),
139        }
140    }
141
142    /// Allocate the next available `FaceId`, reusing a recycled ID if possible.
143    ///
144    /// Never returns an ID in the reserved range (`>= RESERVED_FACE_ID_MIN`).
145    pub fn alloc_id(&self) -> FaceId {
146        // Prefer a recycled ID.
147        if let Ok(mut free) = self.free.lock()
148            && let Some(id) = free.pop()
149        {
150            return FaceId(id);
151        }
152        // Otherwise allocate a fresh one, skipping over the reserved range.
153        loop {
154            let id = self
155                .next_id
156                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
157            if id < RESERVED_FACE_ID_MIN {
158                return FaceId(id);
159            }
160            // Wrap back to 1 and retry.
161            let _ = self.next_id.compare_exchange(
162                id.wrapping_add(1),
163                1,
164                std::sync::atomic::Ordering::Relaxed,
165                std::sync::atomic::Ordering::Relaxed,
166            );
167        }
168    }
169
170    /// Register a face. Returns the assigned `FaceId`.
171    pub fn insert<F: Face>(&self, face: F) -> FaceId {
172        let id = face.id();
173        let arc: Arc<dyn ErasedFace> = Arc::new(face);
174        #[cfg(not(target_arch = "wasm32"))]
175        self.faces.insert(id, arc);
176        #[cfg(target_arch = "wasm32")]
177        self.faces.lock().unwrap().insert(id, arc);
178        id
179    }
180
181    /// Register a pre-wrapped erased face (e.g. a face accepted from a listener
182    /// that is already stored in an `Arc`).  Returns the face's `FaceId`.
183    pub fn insert_arc(&self, face: Arc<dyn ErasedFace>) -> FaceId {
184        let id = face.id();
185        #[cfg(not(target_arch = "wasm32"))]
186        self.faces.insert(id, face);
187        #[cfg(target_arch = "wasm32")]
188        self.faces.lock().unwrap().insert(id, face);
189        id
190    }
191
192    /// Look up a face handle. Returns `None` if the face has been removed.
193    pub fn get(&self, id: FaceId) -> Option<Arc<dyn ErasedFace>> {
194        #[cfg(not(target_arch = "wasm32"))]
195        return self.faces.get(&id).map(|r| Arc::clone(&*r));
196        #[cfg(target_arch = "wasm32")]
197        return self.faces.lock().unwrap().get(&id).map(Arc::clone);
198    }
199
200    /// Remove a face from the table, recycling its ID for future `alloc_id()` calls.
201    pub fn remove(&self, id: FaceId) {
202        #[cfg(not(target_arch = "wasm32"))]
203        self.faces.remove(&id);
204        #[cfg(target_arch = "wasm32")]
205        self.faces.lock().unwrap().remove(&id);
206        // Return dynamic IDs to the free list for reuse.
207        if id.0 < RESERVED_FACE_ID_MIN
208            && let Ok(mut free) = self.free.lock()
209        {
210            free.push(id.0);
211        }
212    }
213
214    /// Number of registered faces.
215    pub fn len(&self) -> usize {
216        #[cfg(not(target_arch = "wasm32"))]
217        return self.faces.len();
218        #[cfg(target_arch = "wasm32")]
219        return self.faces.lock().unwrap().len();
220    }
221
222    pub fn is_empty(&self) -> bool {
223        #[cfg(not(target_arch = "wasm32"))]
224        return self.faces.is_empty();
225        #[cfg(target_arch = "wasm32")]
226        return self.faces.lock().unwrap().is_empty();
227    }
228
229    /// Iterate over all registered face IDs.
230    pub fn face_ids(&self) -> Vec<FaceId> {
231        #[cfg(not(target_arch = "wasm32"))]
232        return self.faces.iter().map(|r| *r.key()).collect();
233        #[cfg(target_arch = "wasm32")]
234        return self.faces.lock().unwrap().keys().copied().collect();
235    }
236
237    /// Return all registered faces as `(FaceId, FaceKind)` pairs.
238    pub fn face_entries(&self) -> Vec<(FaceId, crate::face::FaceKind)> {
239        #[cfg(not(target_arch = "wasm32"))]
240        return self.faces.iter().map(|r| (r.id(), r.kind())).collect();
241        #[cfg(target_arch = "wasm32")]
242        return self
243            .faces
244            .lock()
245            .unwrap()
246            .values()
247            .map(|f| (f.id(), f.kind()))
248            .collect();
249    }
250
251    /// Return detailed info for all registered faces.
252    pub fn face_info(&self) -> Vec<FaceInfo> {
253        #[cfg(not(target_arch = "wasm32"))]
254        return self
255            .faces
256            .iter()
257            .map(|r| FaceInfo {
258                id: r.id(),
259                kind: r.kind(),
260                remote_uri: r.remote_uri(),
261                local_uri: r.local_uri(),
262            })
263            .collect();
264        #[cfg(target_arch = "wasm32")]
265        return self
266            .faces
267            .lock()
268            .unwrap()
269            .values()
270            .map(|f| FaceInfo {
271                id: f.id(),
272                kind: f.kind(),
273                remote_uri: f.remote_uri(),
274                local_uri: f.local_uri(),
275            })
276            .collect();
277    }
278}
279
280impl Default for FaceTable {
281    fn default() -> Self {
282        Self::new()
283    }
284}