ndn_compute/compute_face.rs
1use std::sync::Arc;
2
3use bytes::Bytes;
4use tokio::sync::{Mutex, mpsc};
5use tracing::warn;
6
7use ndn_packet::Interest;
8use ndn_transport::{Face, FaceError, FaceId, FaceKind};
9
10use crate::ComputeRegistry;
11
12/// A synthetic face that routes Interests to registered compute handlers.
13///
14/// The FIB routes Interests matching `/compute/*` (or any registered prefix)
15/// to this face. On receipt via [`Face::send`], `ComputeFace` dispatches to
16/// the appropriate [`ComputeHandler`](crate::ComputeHandler), encodes the
17/// resulting Data, and makes it available through [`Face::recv`] so the
18/// engine pipeline can satisfy the originating PIT entries.
19///
20/// # Wiring
21///
22/// Register the face with the engine, then add a FIB route pointing the
23/// desired prefix at this face's [`FaceId`]. The engine will forward matching
24/// Interests here automatically and pick up computed Data through `recv()`.
25pub struct ComputeFace {
26 id: FaceId,
27 registry: Arc<ComputeRegistry>,
28 /// Sink for computed Data wire bytes injected by `send()`.
29 tx: mpsc::Sender<Bytes>,
30 /// Source consumed by `recv()`. `Mutex` makes `&self` usable from async.
31 rx: Mutex<mpsc::Receiver<Bytes>>,
32}
33
34impl ComputeFace {
35 /// Create a new `ComputeFace` with an internal channel depth of `capacity`
36 /// pending computed responses.
37 pub fn new(id: FaceId, registry: Arc<ComputeRegistry>) -> Self {
38 Self::with_capacity(id, registry, 64)
39 }
40
41 /// Create with an explicit response channel capacity.
42 pub fn with_capacity(id: FaceId, registry: Arc<ComputeRegistry>, capacity: usize) -> Self {
43 let (tx, rx) = mpsc::channel(capacity);
44 Self {
45 id,
46 registry,
47 tx,
48 rx: Mutex::new(rx),
49 }
50 }
51}
52
53impl Face for ComputeFace {
54 fn id(&self) -> FaceId {
55 self.id
56 }
57
58 fn kind(&self) -> FaceKind {
59 FaceKind::Compute
60 }
61
62 /// Receive the next computed Data packet.
63 ///
64 /// Blocks until a `send()` call completes computation and enqueues a
65 /// response, or returns `FaceError::Closed` if all senders are dropped.
66 async fn recv(&self) -> Result<Bytes, FaceError> {
67 self.rx.lock().await.recv().await.ok_or(FaceError::Closed)
68 }
69
70 /// Dispatch an incoming Interest to the matching compute handler.
71 ///
72 /// Decodes the Interest, looks up the handler in the registry via
73 /// longest-prefix match, and spawns a task to run the handler and inject
74 /// the resulting Data wire bytes back through `recv()`.
75 ///
76 /// Returns immediately — computation is async and non-blocking.
77 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
78 let interest = match Interest::decode(pkt) {
79 Ok(i) => i,
80 Err(e) => {
81 warn!("ComputeFace: failed to decode Interest: {e}");
82 return Ok(());
83 }
84 };
85
86 let registry = Arc::clone(&self.registry);
87 let tx = self.tx.clone();
88
89 tokio::spawn(async move {
90 match registry.dispatch(&interest).await {
91 Some(Ok(data)) => {
92 let wire = data.raw().clone();
93 if tx.send(wire).await.is_err() {
94 warn!(
95 "ComputeFace: pipeline receiver dropped before Data could be injected"
96 );
97 }
98 }
99 Some(Err(e)) => {
100 warn!("ComputeFace: handler error for {:?}: {e}", interest.name);
101 }
102 None => {
103 warn!("ComputeFace: no handler registered for {:?}", interest.name);
104 }
105 }
106 });
107
108 Ok(())
109 }
110}