ndn_compute/
compute_face.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use tokio::sync::{Mutex, mpsc};
5use tracing::warn;
6
7use ndn_packet::Interest;
8use ndn_transport::{Face, FaceError, FaceId, FaceKind};
9
10use crate::ComputeRegistry;
11
12/// A synthetic face that routes Interests to registered compute handlers.
13///
14/// The FIB routes Interests matching `/compute/*` (or any registered prefix)
15/// to this face. On receipt via [`Face::send`], `ComputeFace` dispatches to
16/// the appropriate [`ComputeHandler`](crate::ComputeHandler), encodes the
17/// resulting Data, and makes it available through [`Face::recv`] so the
18/// engine pipeline can satisfy the originating PIT entries.
19///
20/// # Wiring
21///
22/// Register the face with the engine, then add a FIB route pointing the
23/// desired prefix at this face's [`FaceId`]. The engine will forward matching
24/// Interests here automatically and pick up computed Data through `recv()`.
25pub struct ComputeFace {
26    id: FaceId,
27    registry: Arc<ComputeRegistry>,
28    /// Sink for computed Data wire bytes injected by `send()`.
29    tx: mpsc::Sender<Bytes>,
30    /// Source consumed by `recv()`. `Mutex` makes `&self` usable from async.
31    rx: Mutex<mpsc::Receiver<Bytes>>,
32}
33
34impl ComputeFace {
35    /// Create a new `ComputeFace` with an internal channel depth of `capacity`
36    /// pending computed responses.
37    pub fn new(id: FaceId, registry: Arc<ComputeRegistry>) -> Self {
38        Self::with_capacity(id, registry, 64)
39    }
40
41    /// Create with an explicit response channel capacity.
42    pub fn with_capacity(id: FaceId, registry: Arc<ComputeRegistry>, capacity: usize) -> Self {
43        let (tx, rx) = mpsc::channel(capacity);
44        Self {
45            id,
46            registry,
47            tx,
48            rx: Mutex::new(rx),
49        }
50    }
51}
52
53impl Face for ComputeFace {
54    fn id(&self) -> FaceId {
55        self.id
56    }
57
58    fn kind(&self) -> FaceKind {
59        FaceKind::Compute
60    }
61
62    /// Receive the next computed Data packet.
63    ///
64    /// Blocks until a `send()` call completes computation and enqueues a
65    /// response, or returns `FaceError::Closed` if all senders are dropped.
66    async fn recv(&self) -> Result<Bytes, FaceError> {
67        self.rx.lock().await.recv().await.ok_or(FaceError::Closed)
68    }
69
70    /// Dispatch an incoming Interest to the matching compute handler.
71    ///
72    /// Decodes the Interest, looks up the handler in the registry via
73    /// longest-prefix match, and spawns a task to run the handler and inject
74    /// the resulting Data wire bytes back through `recv()`.
75    ///
76    /// Returns immediately — computation is async and non-blocking.
77    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
78        let interest = match Interest::decode(pkt) {
79            Ok(i) => i,
80            Err(e) => {
81                warn!("ComputeFace: failed to decode Interest: {e}");
82                return Ok(());
83            }
84        };
85
86        let registry = Arc::clone(&self.registry);
87        let tx = self.tx.clone();
88
89        tokio::spawn(async move {
90            match registry.dispatch(&interest).await {
91                Some(Ok(data)) => {
92                    let wire = data.raw().clone();
93                    if tx.send(wire).await.is_err() {
94                        warn!(
95                            "ComputeFace: pipeline receiver dropped before Data could be injected"
96                        );
97                    }
98                }
99                Some(Err(e)) => {
100                    warn!("ComputeFace: handler error for {:?}: {e}", interest.name);
101                }
102                None => {
103                    warn!("ComputeFace: no handler registered for {:?}", interest.name);
104                }
105            }
106        });
107
108        Ok(())
109    }
110}