ndn_compute/
registry.rs

1use ndn_packet::{Data, Interest, Name};
2use ndn_store::NameTrie;
3use std::sync::Arc;
4
5/// A handler function registered for a name prefix.
6///
7/// Called when an Interest arrives at `ComputeFace` matching the prefix.
8/// The result is returned as a Data packet and cached in the engine CS.
9pub trait ComputeHandler: Send + Sync + 'static {
10    fn compute(
11        &self,
12        interest: &Interest,
13    ) -> impl std::future::Future<Output = Result<Data, ComputeError>> + Send;
14}
15
16#[derive(Debug)]
17pub enum ComputeError {
18    NotFound,
19    ComputeFailed(String),
20}
21
22impl std::fmt::Display for ComputeError {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            ComputeError::NotFound => write!(f, "no compute handler for this name"),
26            ComputeError::ComputeFailed(e) => write!(f, "compute failed: {e}"),
27        }
28    }
29}
30
31impl std::error::Error for ComputeError {}
32
33/// Registry mapping name prefixes to `ComputeHandler` implementations.
34pub struct ComputeRegistry {
35    handlers: NameTrie<Arc<dyn ErasedHandler>>,
36}
37
38trait ErasedHandler: Send + Sync + 'static {
39    fn compute_erased<'a>(
40        &'a self,
41        interest: &'a Interest,
42    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Data, ComputeError>> + Send + 'a>>;
43}
44
45impl<H: ComputeHandler> ErasedHandler for H {
46    fn compute_erased<'a>(
47        &'a self,
48        interest: &'a Interest,
49    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Data, ComputeError>> + Send + 'a>>
50    {
51        Box::pin(self.compute(interest))
52    }
53}
54
55impl ComputeRegistry {
56    pub fn new() -> Self {
57        Self {
58            handlers: NameTrie::new(),
59        }
60    }
61
62    pub fn register<H: ComputeHandler>(&self, prefix: &Name, handler: H) {
63        self.handlers.insert(prefix, Arc::new(handler));
64    }
65
66    pub async fn dispatch(&self, interest: &Interest) -> Option<Result<Data, ComputeError>> {
67        let handler = self.handlers.lpm(&interest.name)?;
68        Some(handler.compute_erased(interest).await)
69    }
70}
71
72impl Default for ComputeRegistry {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use bytes::Bytes;
82    use ndn_packet::NameComponent;
83    use ndn_tlv::TlvWriter;
84
85    fn minimal_data() -> Data {
86        // DATA > NAME > NAMECOMP("test")
87        let nc = {
88            let mut w = TlvWriter::new();
89            w.write_tlv(0x08, b"test");
90            w.finish()
91        };
92        let name = {
93            let mut w = TlvWriter::new();
94            w.write_tlv(0x07, &nc);
95            w.finish()
96        };
97        let pkt = {
98            let mut w = TlvWriter::new();
99            w.write_tlv(0x06, &name);
100            w.finish()
101        };
102        Data::decode(pkt).unwrap()
103    }
104
105    struct EchoHandler;
106
107    impl ComputeHandler for EchoHandler {
108        async fn compute(&self, _interest: &Interest) -> Result<Data, ComputeError> {
109            Ok(minimal_data())
110        }
111    }
112
113    struct FailHandler;
114
115    impl ComputeHandler for FailHandler {
116        async fn compute(&self, _interest: &Interest) -> Result<Data, ComputeError> {
117            Err(ComputeError::ComputeFailed("intentional failure".into()))
118        }
119    }
120
121    fn make_interest(comp: &'static str) -> Interest {
122        let name =
123            Name::from_components([NameComponent::generic(Bytes::from_static(comp.as_bytes()))]);
124        Interest::new(name)
125    }
126
127    #[tokio::test]
128    async fn dispatch_to_registered_handler() {
129        let registry = ComputeRegistry::new();
130        let prefix =
131            Name::from_components([NameComponent::generic(Bytes::from_static(b"compute"))]);
132        registry.register(&prefix, EchoHandler);
133        let interest = make_interest("compute");
134        let result = registry.dispatch(&interest).await;
135        assert!(result.is_some());
136        assert!(result.unwrap().is_ok());
137    }
138
139    #[tokio::test]
140    async fn dispatch_no_match_returns_none() {
141        let registry = ComputeRegistry::new();
142        let interest = make_interest("unknown");
143        assert!(registry.dispatch(&interest).await.is_none());
144    }
145
146    #[tokio::test]
147    async fn dispatch_handler_error_propagates() {
148        let registry = ComputeRegistry::new();
149        let prefix = Name::from_components([NameComponent::generic(Bytes::from_static(b"fail"))]);
150        registry.register(&prefix, FailHandler);
151        let interest = make_interest("fail");
152        let result = registry.dispatch(&interest).await.unwrap();
153        assert!(matches!(result, Err(ComputeError::ComputeFailed(_))));
154    }
155
156    #[test]
157    fn compute_error_display() {
158        assert!(!ComputeError::NotFound.to_string().is_empty());
159        assert!(
160            !ComputeError::ComputeFailed("x".into())
161                .to_string()
162                .is_empty()
163        );
164    }
165}