ndn_ipc/
mgmt_client.rs

1/// Programmatic management client for `ndn-fwd` forwarder control.
2///
3/// `MgmtClient` provides typed methods for every NFD management command,
4/// making it easy for control applications (routing daemons, CLI tools, etc.)
5/// to interact with a running forwarder without hand-building Interest names.
6///
7/// # Example
8///
9/// ```rust,no_run
10/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
11/// use ndn_ipc::MgmtClient;
12///
13/// let mgmt = MgmtClient::connect("/run/nfd/nfd.sock").await?;
14/// mgmt.route_add(&"/ndn".parse()?, Some(1), 10).await?;
15/// let status = mgmt.status().await?;
16/// println!("{} {}", status.status_code, status.status_text);
17/// # Ok(())
18/// # }
19/// ```
20use std::sync::Arc;
21
22use bytes::Bytes;
23use tokio::sync::Mutex;
24
25use ndn_config::{
26    ControlParameters, ControlResponse,
27    nfd_command::{command_name, dataset_name, module, verb},
28};
29use ndn_faces::local::IpcFace;
30use ndn_packet::{Name, encode::InterestBuilder};
31use ndn_transport::{Face, FaceId};
32
33use crate::forwarder_client::ForwarderError;
34
35/// Management client for a running `ndn-fwd` forwarder.
36///
37/// Sends NFD management Interests over an [`IpcFace`] and decodes the
38/// `ControlResponse` from the returned Data packet.
39///
40/// On Unix the transport is a Unix domain socket; on Windows it is a
41/// Named Pipe.  Both are accessed through the same `MgmtClient` API.
42pub struct MgmtClient {
43    face: Arc<IpcFace>,
44    recv_lock: Mutex<()>,
45}
46
47impl MgmtClient {
48    /// Connect to the forwarder's IPC socket.
49    ///
50    /// `face_socket` is a Unix domain socket path on Unix (e.g.
51    /// `/run/nfd/nfd.sock`) or a Named Pipe path on Windows (e.g.
52    /// `\\.\pipe\ndn`).
53    pub async fn connect(face_socket: impl AsRef<str>) -> Result<Self, ForwarderError> {
54        let face =
55            Arc::new(ndn_faces::local::ipc_face_connect(FaceId(0), face_socket.as_ref()).await?);
56        Ok(Self {
57            face,
58            recv_lock: Mutex::new(()),
59        })
60    }
61
62    /// Wrap an existing [`IpcFace`] (e.g. from a `ForwarderClient`).
63    pub fn from_face(face: Arc<IpcFace>) -> Self {
64        Self {
65            face,
66            recv_lock: Mutex::new(()),
67        }
68    }
69
70    // ─── Route management ───────────────────────────────────────────────
71
72    /// Add (or update) a route: `rib/register`.
73    ///
74    /// Pass `face_id: None` to let the router use the requesting face (the
75    /// default NFD behaviour when no FaceId is supplied).  This is the correct
76    /// value to use when connecting over a Unix socket without SHM, because
77    /// there is no separate SHM face ID to reference.
78    pub async fn route_add(
79        &self,
80        prefix: &Name,
81        face_id: Option<u64>,
82        cost: u64,
83    ) -> Result<ControlParameters, ForwarderError> {
84        let params = ControlParameters {
85            name: Some(prefix.clone()),
86            face_id,
87            cost: Some(cost),
88            ..Default::default()
89        };
90        self.command(module::RIB, verb::REGISTER, &params).await
91    }
92
93    /// Remove a route: `rib/unregister`.
94    ///
95    /// Pass `face_id: None` to remove the route on the requesting face.
96    pub async fn route_remove(
97        &self,
98        prefix: &Name,
99        face_id: Option<u64>,
100    ) -> Result<ControlParameters, ForwarderError> {
101        let params = ControlParameters {
102            name: Some(prefix.clone()),
103            face_id,
104            ..Default::default()
105        };
106        self.command(module::RIB, verb::UNREGISTER, &params).await
107    }
108
109    /// List all FIB routes: `fib/list`.
110    ///
111    /// Returns NFD TLV `FibEntry` dataset entries (per-spec wire format).
112    pub async fn route_list(&self) -> Result<Vec<ndn_config::FibEntry>, ForwarderError> {
113        let bytes = self.dataset_raw(module::FIB, verb::LIST).await?;
114        Ok(ndn_config::FibEntry::decode_all(&bytes))
115    }
116
117    /// List all RIB routes: `rib/list`.
118    ///
119    /// Returns NFD TLV `RibEntry` dataset entries (per-spec wire format).
120    pub async fn rib_list(&self) -> Result<Vec<ndn_config::RibEntry>, ForwarderError> {
121        let bytes = self.dataset_raw(module::RIB, verb::LIST).await?;
122        Ok(ndn_config::RibEntry::decode_all(&bytes))
123    }
124
125    // ─── Face management ────────────────────────────────────────────────
126
127    /// Create a face: `faces/create`.
128    pub async fn face_create(&self, uri: &str) -> Result<ControlParameters, ForwarderError> {
129        self.face_create_with_mtu(uri, None).await
130    }
131
132    /// Create a face with an optional `mtu` hint: `faces/create`.
133    ///
134    /// For SHM faces the router uses `mtu` to size the ring slot so
135    /// it can carry Data packets whose content body is up to `mtu`
136    /// bytes. For Unix and network faces `mtu` is currently ignored.
137    pub async fn face_create_with_mtu(
138        &self,
139        uri: &str,
140        mtu: Option<u64>,
141    ) -> Result<ControlParameters, ForwarderError> {
142        let params = ControlParameters {
143            uri: Some(uri.to_owned()),
144            mtu,
145            ..Default::default()
146        };
147        self.command(module::FACES, verb::CREATE, &params).await
148    }
149
150    /// Destroy a face: `faces/destroy`.
151    pub async fn face_destroy(&self, face_id: u64) -> Result<ControlParameters, ForwarderError> {
152        let params = ControlParameters {
153            face_id: Some(face_id),
154            ..Default::default()
155        };
156        self.command(module::FACES, verb::DESTROY, &params).await
157    }
158
159    /// List all faces: `faces/list`.
160    ///
161    /// Returns NFD TLV `FaceStatus` dataset entries (per-spec wire format).
162    pub async fn face_list(&self) -> Result<Vec<ndn_config::FaceStatus>, ForwarderError> {
163        let bytes = self.dataset_raw(module::FACES, verb::LIST).await?;
164        Ok(ndn_config::FaceStatus::decode_all(&bytes))
165    }
166
167    // ─── Strategy management ────────────────────────────────────────────
168
169    /// Set forwarding strategy for a prefix: `strategy-choice/set`.
170    pub async fn strategy_set(
171        &self,
172        prefix: &Name,
173        strategy: &Name,
174    ) -> Result<ControlParameters, ForwarderError> {
175        let params = ControlParameters {
176            name: Some(prefix.clone()),
177            strategy: Some(strategy.clone()),
178            ..Default::default()
179        };
180        self.command(module::STRATEGY, verb::SET, &params).await
181    }
182
183    /// Unset forwarding strategy for a prefix: `strategy-choice/unset`.
184    pub async fn strategy_unset(&self, prefix: &Name) -> Result<ControlParameters, ForwarderError> {
185        let params = ControlParameters {
186            name: Some(prefix.clone()),
187            ..Default::default()
188        };
189        self.command(module::STRATEGY, verb::UNSET, &params).await
190    }
191
192    /// List all strategy choices: `strategy-choice/list`.
193    ///
194    /// Returns NFD TLV `StrategyChoice` dataset entries (per-spec wire format).
195    pub async fn strategy_list(&self) -> Result<Vec<ndn_config::StrategyChoice>, ForwarderError> {
196        let bytes = self.dataset_raw(module::STRATEGY, verb::LIST).await?;
197        Ok(ndn_config::StrategyChoice::decode_all(&bytes))
198    }
199
200    // ─── Content store ──────────────────────────────────────────────────
201
202    /// Content store info: `cs/info`.
203    pub async fn cs_info(&self) -> Result<ControlResponse, ForwarderError> {
204        self.dataset(module::CS, verb::INFO).await
205    }
206
207    /// Configure CS capacity: `cs/config`.
208    ///
209    /// If `capacity` is `Some`, sets the new max capacity in bytes.
210    /// Always returns the current capacity.
211    pub async fn cs_config(
212        &self,
213        capacity: Option<u64>,
214    ) -> Result<ControlParameters, ForwarderError> {
215        let params = ControlParameters {
216            capacity,
217            ..Default::default()
218        };
219        self.command(module::CS, verb::CONFIG, &params).await
220    }
221
222    /// Erase CS entries by prefix: `cs/erase`.
223    ///
224    /// Returns the number of entries erased (in the `count` field of the
225    /// response ControlParameters).
226    pub async fn cs_erase(
227        &self,
228        prefix: &ndn_packet::Name,
229        count: Option<u64>,
230    ) -> Result<ControlParameters, ForwarderError> {
231        let params = ControlParameters {
232            name: Some(prefix.clone()),
233            count,
234            ..Default::default()
235        };
236        self.command(module::CS, verb::ERASE, &params).await
237    }
238
239    // ─── Neighbors ──────────────────────────────────────────────────────
240
241    /// List discovered neighbors: `neighbors/list`.
242    pub async fn neighbors_list(&self) -> Result<ControlResponse, ForwarderError> {
243        self.dataset(module::NEIGHBORS, verb::LIST).await
244    }
245
246    // ─── Service discovery ──────────────────────────────────────────────
247
248    /// List locally announced services: `service/list`.
249    pub async fn service_list(&self) -> Result<ControlResponse, ForwarderError> {
250        self.dataset(module::SERVICE, verb::LIST).await
251    }
252
253    /// Announce a service prefix at runtime: `service/announce`.
254    pub async fn service_announce(
255        &self,
256        prefix: &Name,
257    ) -> Result<ControlParameters, ForwarderError> {
258        let params = ControlParameters {
259            name: Some(prefix.clone()),
260            ..Default::default()
261        };
262        self.command(module::SERVICE, verb::ANNOUNCE, &params).await
263    }
264
265    /// Withdraw a previously announced service prefix: `service/withdraw`.
266    pub async fn service_withdraw(
267        &self,
268        prefix: &Name,
269    ) -> Result<ControlParameters, ForwarderError> {
270        let params = ControlParameters {
271            name: Some(prefix.clone()),
272            ..Default::default()
273        };
274        self.command(module::SERVICE, verb::WITHDRAW, &params).await
275    }
276
277    /// Browse all known service records (local + received from peers): `service/browse`.
278    ///
279    /// When `prefix` is `Some`, the router returns only records whose
280    /// `announced_prefix` has `prefix` as a prefix (server-side filter).
281    pub async fn service_browse(
282        &self,
283        prefix: Option<&Name>,
284    ) -> Result<ControlResponse, ForwarderError> {
285        let name = match prefix {
286            None => dataset_name(module::SERVICE, verb::BROWSE),
287            Some(p) => {
288                let params = ControlParameters {
289                    name: Some(p.clone()),
290                    ..Default::default()
291                };
292                command_name(module::SERVICE, verb::BROWSE, &params)
293            }
294        };
295        self.send_interest(name).await
296    }
297
298    // ─── Status ─────────────────────────────────────────────────────────
299
300    /// General forwarder status: `status/general`.
301    pub async fn status(&self) -> Result<ControlResponse, ForwarderError> {
302        self.dataset(module::STATUS, b"general").await
303    }
304
305    /// Request graceful shutdown: `status/shutdown`.
306    pub async fn shutdown(&self) -> Result<ControlResponse, ForwarderError> {
307        self.dataset(module::STATUS, b"shutdown").await
308    }
309
310    // ─── Config ──────────────────────────────────────────────────────────────
311
312    /// Retrieve the running router configuration as TOML: `config/get`.
313    pub async fn config_get(&self) -> Result<ControlResponse, ForwarderError> {
314        self.dataset(module::CONFIG, verb::GET).await
315    }
316
317    // ─── Faces counters ─────────────────────────────────────────────────
318
319    /// Per-face packet/byte counters: `faces/counters`.
320    pub async fn face_counters(&self) -> Result<ControlResponse, ForwarderError> {
321        self.dataset(module::FACES, verb::COUNTERS).await
322    }
323
324    // ─── Measurements ───────────────────────────────────────────────────
325
326    /// Per-prefix measurements (satisfaction rate, RTTs): `measurements/list`.
327    pub async fn measurements_list(&self) -> Result<ControlResponse, ForwarderError> {
328        self.dataset(module::MEASUREMENTS, verb::LIST).await
329    }
330
331    // ─── Security ────────────────────────────────────────────────────────
332
333    /// List all identity keys in the PIB: `security/identity-list`.
334    pub async fn security_identity_list(&self) -> Result<ControlResponse, ForwarderError> {
335        self.dataset(module::SECURITY, verb::IDENTITY_LIST).await
336    }
337
338    /// Query the active identity status: `security/identity-status`.
339    ///
340    /// Returns a `ControlResponse` whose `status_text` is a space-separated
341    /// key=value line: `identity=<name> is_ephemeral=<bool> pib_path=<path>`.
342    /// Works whether or not a PIB is configured (unlike `identity-list`).
343    pub async fn security_identity_status(&self) -> Result<ControlResponse, ForwarderError> {
344        self.dataset(module::SECURITY, verb::IDENTITY_STATUS).await
345    }
346
347    /// Generate a new Ed25519 identity key: `security/identity-generate`.
348    pub async fn security_identity_generate(
349        &self,
350        name: &Name,
351    ) -> Result<ControlParameters, ForwarderError> {
352        let params = ControlParameters {
353            name: Some(name.clone()),
354            ..Default::default()
355        };
356        self.command(module::SECURITY, verb::IDENTITY_GENERATE, &params)
357            .await
358    }
359
360    /// List all trust anchors in the PIB: `security/anchor-list`.
361    pub async fn security_anchor_list(&self) -> Result<ControlResponse, ForwarderError> {
362        self.dataset(module::SECURITY, verb::ANCHOR_LIST).await
363    }
364
365    /// Delete a key from the PIB: `security/key-delete`.
366    pub async fn security_key_delete(
367        &self,
368        name: &Name,
369    ) -> Result<ControlParameters, ForwarderError> {
370        let params = ControlParameters {
371            name: Some(name.clone()),
372            ..Default::default()
373        };
374        self.command(module::SECURITY, verb::KEY_DELETE, &params)
375            .await
376    }
377
378    /// Get the `did:ndn:` DID for a named identity: `security/identity-did`.
379    ///
380    /// The response `status_text` contains the DID string.
381    pub async fn security_identity_did(
382        &self,
383        name: &Name,
384    ) -> Result<ControlResponse, ForwarderError> {
385        let params = ControlParameters {
386            name: Some(name.clone()),
387            ..Default::default()
388        };
389        let name = command_name(module::SECURITY, verb::IDENTITY_DID, &params);
390        self.send_interest(name).await
391    }
392
393    /// Retrieve the router's NDNCERT CA profile: `security/ca-info`.
394    ///
395    /// Returns `NOT_FOUND` if no `ca_prefix` is configured.
396    pub async fn security_ca_info(&self) -> Result<ControlResponse, ForwarderError> {
397        self.dataset(module::SECURITY, verb::CA_INFO).await
398    }
399
400    /// Initiate NDNCERT enrollment with a CA: `security/ca-enroll`.
401    ///
402    /// Parameters:
403    /// - `ca_prefix` — NDN name of the CA (e.g. `/ndn/edu/example/CA`)
404    /// - `challenge_type` — `"token"`, `"pin"`, `"possession"`, or `"yubikey-hotp"`
405    /// - `challenge_param` — the challenge secret/code
406    ///
407    /// The router starts a background enrollment session and returns immediately
408    /// with `status_text = "started"`.  Poll `security/identity-list` to detect
409    /// when the certificate has been installed.
410    pub async fn security_ca_enroll(
411        &self,
412        ca_prefix: &Name,
413        challenge_type: &str,
414        challenge_param: &str,
415    ) -> Result<ControlParameters, ForwarderError> {
416        let params = ControlParameters {
417            name: Some(ca_prefix.clone()),
418            uri: Some(format!("{challenge_type}:{challenge_param}")),
419            ..Default::default()
420        };
421        self.command(module::SECURITY, verb::CA_ENROLL, &params)
422            .await
423    }
424
425    /// Add a Zero-Touch-Provisioning token to the CA: `security/ca-token-add`.
426    ///
427    /// Returns the generated token in `ControlParameters::uri`.
428    pub async fn security_ca_token_add(
429        &self,
430        description: &str,
431    ) -> Result<ControlParameters, ForwarderError> {
432        let params = ControlParameters {
433            uri: Some(description.to_owned()),
434            ..Default::default()
435        };
436        self.command(module::SECURITY, verb::CA_TOKEN_ADD, &params)
437            .await
438    }
439
440    /// List pending NDNCERT CA enrollment requests: `security/ca-requests`.
441    pub async fn security_ca_requests(&self) -> Result<ControlResponse, ForwarderError> {
442        self.dataset(module::SECURITY, verb::CA_REQUESTS).await
443    }
444
445    /// Detect whether a YubiKey is connected: `security/yubikey-detect`.
446    ///
447    /// Returns `Ok` with `status_text = "present"` if a YubiKey is found,
448    /// or an error if not present or the `yubikey-piv` feature is not compiled in.
449    pub async fn security_yubikey_detect(&self) -> Result<ControlResponse, ForwarderError> {
450        self.dataset(module::SECURITY, verb::YUBIKEY_DETECT).await
451    }
452
453    /// Generate a P-256 key in YubiKey PIV slot 9a: `security/yubikey-generate`.
454    ///
455    /// On success the response `body.uri` contains the base64url-encoded 65-byte
456    /// uncompressed public key.
457    pub async fn security_yubikey_generate(
458        &self,
459        name: &Name,
460    ) -> Result<ControlParameters, ForwarderError> {
461        let params = ControlParameters {
462            name: Some(name.clone()),
463            ..Default::default()
464        };
465        self.command(module::SECURITY, verb::YUBIKEY_GENERATE, &params)
466            .await
467    }
468
469    // ─── Trust schema ───────────────────────────────────────────────────────
470
471    /// List all active trust schema rules: `security/schema-list`.
472    ///
473    /// Returns a human-readable list of rules; one per line in the `status_text`:
474    /// `[0] /data_pattern => /key_pattern`
475    pub async fn security_schema_list(&self) -> Result<ControlResponse, ForwarderError> {
476        self.dataset(module::SECURITY, verb::SCHEMA_LIST).await
477    }
478
479    /// Add a trust schema rule: `security/schema-rule-add`.
480    ///
481    /// `rule` must be in the form `"<data_pattern> => <key_pattern>"`, e.g.:
482    /// `"/sensor/<node>/<type> => /sensor/<node>/KEY/<id>"`.
483    pub async fn security_schema_rule_add(
484        &self,
485        rule: &str,
486    ) -> Result<ControlResponse, ForwarderError> {
487        let params = ControlParameters {
488            uri: Some(rule.to_owned()),
489            ..Default::default()
490        };
491        let name = ndn_config::command_name(module::SECURITY, verb::SCHEMA_RULE_ADD, &params);
492        self.send_interest(name).await
493    }
494
495    /// Remove a trust schema rule by index: `security/schema-rule-remove`.
496    ///
497    /// `index` is the 0-based position from `security_schema_list()`.
498    pub async fn security_schema_rule_remove(
499        &self,
500        index: u64,
501    ) -> Result<ControlResponse, ForwarderError> {
502        let params = ControlParameters {
503            count: Some(index),
504            ..Default::default()
505        };
506        let name = ndn_config::command_name(module::SECURITY, verb::SCHEMA_RULE_REMOVE, &params);
507        self.send_interest(name).await
508    }
509
510    /// Replace the entire trust schema: `security/schema-set`.
511    ///
512    /// `rules` is a newline-separated list of rule strings. Each line must be in
513    /// the form `"<data_pattern> => <key_pattern>"`. Pass an empty string to
514    /// clear all rules (schema rejects everything).
515    pub async fn security_schema_set(
516        &self,
517        rules: &str,
518    ) -> Result<ControlResponse, ForwarderError> {
519        let params = ControlParameters {
520            uri: Some(rules.to_owned()),
521            ..Default::default()
522        };
523        let name = ndn_config::command_name(module::SECURITY, verb::SCHEMA_SET, &params);
524        self.send_interest(name).await
525    }
526
527    // ─── Discovery ──────────────────────────────────────────────────────────
528
529    /// Get discovery protocol status and current config: `discovery/status`.
530    pub async fn discovery_status(&self) -> Result<ControlResponse, ForwarderError> {
531        self.dataset(module::DISCOVERY, b"status").await
532    }
533
534    /// Update runtime-mutable discovery parameters: `discovery/config`.
535    ///
536    /// Pass parameters as a URL query string:
537    /// `"hello_interval_base_ms=5000&liveness_miss_count=3"`.
538    ///
539    /// Supported keys: `hello_interval_base_ms`, `hello_interval_max_ms`,
540    /// `hello_jitter`, `liveness_timeout_ms`, `liveness_miss_count`,
541    /// `probe_timeout_ms`, `swim_indirect_fanout`, `gossip_fanout`,
542    /// `auto_create_faces`.
543    pub async fn discovery_config_set(
544        &self,
545        params: &str,
546    ) -> Result<ControlResponse, ForwarderError> {
547        let cp = ControlParameters {
548            uri: Some(params.to_owned()),
549            ..Default::default()
550        };
551        let name = command_name(module::DISCOVERY, verb::CONFIG, &cp);
552        self.send_interest(name).await
553    }
554
555    /// Get DVR routing protocol status: `routing/dvr-status`.
556    pub async fn routing_dvr_status(&self) -> Result<ControlResponse, ForwarderError> {
557        self.dataset(module::ROUTING, verb::DVR_STATUS).await
558    }
559
560    /// Update runtime-mutable DVR parameters: `routing/dvr-config`.
561    ///
562    /// Pass parameters as a URL query string:
563    /// `"update_interval_ms=30000&route_ttl_ms=90000"`.
564    pub async fn routing_dvr_config_set(
565        &self,
566        params: &str,
567    ) -> Result<ControlResponse, ForwarderError> {
568        let cp = ControlParameters {
569            uri: Some(params.to_owned()),
570            ..Default::default()
571        };
572        let name = command_name(module::ROUTING, verb::DVR_CONFIG, &cp);
573        self.send_interest(name).await
574    }
575
576    // ─── Log filter ─────────────────────────────────────────────────────
577
578    /// Get the current runtime log filter string: `log/get-filter`.
579    pub async fn log_get_filter(&self) -> Result<ControlResponse, ForwarderError> {
580        self.dataset(module::LOG, verb::GET_FILTER).await
581    }
582
583    /// Get new log lines from the router's in-memory ring buffer: `log/get-recent`.
584    ///
585    /// Pass the last sequence number received (0 on first call) in `after_seq`.
586    /// The router returns only entries with a higher sequence number, so repeated
587    /// polls never replay old lines.
588    ///
589    /// Response format: first line is the new max sequence number, followed by
590    /// zero or more log lines.
591    pub async fn log_get_recent(&self, after_seq: u64) -> Result<ControlResponse, ForwarderError> {
592        let params = ControlParameters {
593            count: Some(after_seq),
594            ..Default::default()
595        };
596        let name = command_name(module::LOG, verb::GET_RECENT, &params);
597        self.send_interest(name).await
598    }
599
600    /// Set the runtime log filter: `log/set-filter`.
601    ///
602    /// The `filter` string is an `EnvFilter`-compatible directive
603    /// (e.g. `"info"`, `"debug,ndn_engine=trace"`).
604    pub async fn log_set_filter(&self, filter: &str) -> Result<ControlResponse, ForwarderError> {
605        let params = ControlParameters {
606            uri: Some(filter.to_owned()),
607            ..Default::default()
608        };
609        let name = command_name(module::LOG, verb::SET_FILTER, &params);
610        self.send_interest(name).await
611    }
612
613    // ─── Core transport ─────────────────────────────────────────────────
614
615    /// Send a command Interest with ControlParameters and decode the response.
616    async fn command(
617        &self,
618        module_name: &[u8],
619        verb_name: &[u8],
620        params: &ControlParameters,
621    ) -> Result<ControlParameters, ForwarderError> {
622        let name = command_name(module_name, verb_name, params);
623        let resp = self.send_interest(name).await?;
624
625        if !resp.is_ok() {
626            return Err(ForwarderError::Command {
627                code: resp.status_code,
628                text: resp.status_text,
629            });
630        }
631
632        Ok(resp.body.unwrap_or_default())
633    }
634
635    /// Send a dataset Interest and return raw content bytes.
636    ///
637    /// Used for the four NFD-standard list datasets (`faces/list`, `fib/list`,
638    /// `rib/list`, `strategy-choice/list`) whose content is concatenated TLV
639    /// entries rather than a ControlResponse.
640    async fn dataset_raw(
641        &self,
642        module_name: &[u8],
643        verb_name: &[u8],
644    ) -> Result<Bytes, ForwarderError> {
645        let name = dataset_name(module_name, verb_name);
646        let interest_wire = InterestBuilder::new(name).build();
647        self.send_content_bytes(interest_wire).await
648    }
649
650    /// Send an Interest and return the raw content bytes from the Data reply.
651    async fn send_content_bytes(&self, interest_wire: Bytes) -> Result<Bytes, ForwarderError> {
652        let _guard = self.recv_lock.lock().await;
653
654        self.face
655            .send(ndn_packet::lp::encode_lp_packet(&interest_wire))
656            .await?;
657
658        let data_wire = self
659            .face
660            .recv()
661            .await
662            .map(crate::forwarder_client::strip_lp)?;
663        let data =
664            ndn_packet::Data::decode(data_wire).map_err(|_| ForwarderError::MalformedResponse)?;
665
666        let content = data.content().ok_or(ForwarderError::MalformedResponse)?;
667        Ok(Bytes::copy_from_slice(content))
668    }
669
670    /// Send a dataset Interest (no ControlParameters) and return the full response.
671    ///
672    /// Dataset queries are sent **unsigned** (plain Interest with no
673    /// ApplicationParameters).  NFD and yanfd/ndnd require unsigned Interests
674    /// for dataset queries; ndn-fwd accepts both signed and unsigned.
675    async fn dataset(
676        &self,
677        module_name: &[u8],
678        verb_name: &[u8],
679    ) -> Result<ControlResponse, ForwarderError> {
680        let name = dataset_name(module_name, verb_name);
681        self.send_unsigned_interest(name).await
682    }
683
684    /// Send an unsigned dataset Interest and decode the ControlResponse.
685    ///
686    /// Used for read-only queries (`faces/list`, `fib/list`, `status/general`,
687    /// etc.) where NFD and yanfd reject signed Interests.
688    ///
689    /// The Interest is LP-wrapped before sending so external forwarders accept it.
690    async fn send_unsigned_interest(&self, name: Name) -> Result<ControlResponse, ForwarderError> {
691        let interest_wire = InterestBuilder::new(name).build();
692        self.send_raw(interest_wire).await
693    }
694
695    /// Send a signed command Interest and decode the ControlResponse from the Data reply.
696    ///
697    /// Command Interests (`rib/register`, `faces/create`, etc.) are signed with
698    /// `DigestSha256` so that NFD and ndnd accept them as authenticated commands.
699    ///
700    /// The Interest is LP-wrapped before sending: external forwarders (NFD,
701    /// yanfd/ndnd) require NDNLPv2 framing on their Unix socket faces.
702    async fn send_interest(&self, name: Name) -> Result<ControlResponse, ForwarderError> {
703        let interest_wire = InterestBuilder::new(name).sign_digest_sha256();
704        self.send_raw(interest_wire).await
705    }
706
707    /// Core send+recv: LP-wrap `interest_wire`, send to face, decode response.
708    async fn send_raw(&self, interest_wire: Bytes) -> Result<ControlResponse, ForwarderError> {
709        let interest_wire = interest_wire;
710
711        // Serialise send+recv so concurrent callers don't interleave.
712        let _guard = self.recv_lock.lock().await;
713
714        self.face
715            .send(ndn_packet::lp::encode_lp_packet(&interest_wire))
716            .await?;
717
718        let data_wire = self
719            .face
720            .recv()
721            .await
722            .map(crate::forwarder_client::strip_lp)?;
723        let data =
724            ndn_packet::Data::decode(data_wire).map_err(|_| ForwarderError::MalformedResponse)?;
725
726        let content = data.content().ok_or(ForwarderError::MalformedResponse)?;
727
728        ControlResponse::decode(Bytes::copy_from_slice(content))
729            .map_err(|_| ForwarderError::MalformedResponse)
730    }
731}