ndn_faces/net/
multicast.rs

1use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use bytes::Bytes;
6use tokio::net::UdpSocket;
7
8use ndn_packet::fragment::{DEFAULT_UDP_MTU, fragment_packet};
9use ndn_transport::{Face, FaceAddr, FaceError, FaceId, FaceKind, LinkType};
10
11/// IANA-assigned NDN IPv4 link-local multicast group.
12pub const NDN_MULTICAST_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 23, 170);
13
14/// NDN standard UDP port.
15pub const NDN_PORT: u16 = 6363;
16
17/// NDN face over IPv4 link-local multicast.
18///
19/// Interests sent on this face reach all NDN-capable nodes on the local link
20/// without requiring prior knowledge of their addresses. Data is returned via
21/// unicast `UdpFace` to the specific responder.
22///
23/// ## Typical usage
24///
25/// 1. Start `MulticastUdpFace` at boot for neighbor discovery and prefix
26///    announcements.
27/// 2. On receiving a multicast Interest, create a unicast `UdpFace` back to
28///    the responder's source address and register it in the FIB.
29/// 3. Subsequent traffic uses the unicast face; the multicast face handles
30///    only discovery and control traffic.
31pub struct MulticastUdpFace {
32    id: FaceId,
33    socket: Arc<UdpSocket>,
34    dest: SocketAddr,
35    mtu: usize,
36    seq: AtomicU64,
37    /// Link type reported by [`Face::link_type`].  Defaults to `MultiAccess`;
38    /// set to `AdHoc` for Wi-Fi IBSS / MANET deployments via [`Self::ad_hoc`].
39    link_type: LinkType,
40}
41
42impl MulticastUdpFace {
43    /// Bind to `port`, join `group` on interface `iface`.
44    /// Use `NDN_MULTICAST_V4` and `NDN_PORT` for standard NDN.
45    pub async fn new(
46        iface: Ipv4Addr,
47        port: u16,
48        group: Ipv4Addr,
49        id: FaceId,
50    ) -> std::io::Result<Self> {
51        let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)).await?;
52        socket.set_multicast_loop_v4(true)?;
53        socket.join_multicast_v4(group, iface)?;
54        Ok(Self {
55            id,
56            socket: Arc::new(socket),
57            dest: SocketAddr::V4(SocketAddrV4::new(group, port)),
58            mtu: DEFAULT_UDP_MTU,
59            seq: AtomicU64::new(0),
60            link_type: LinkType::MultiAccess,
61        })
62    }
63
64    /// Standard NDN multicast (`224.0.23.170:6363`) on `iface`.
65    pub async fn ndn_default(iface: Ipv4Addr, id: FaceId) -> std::io::Result<Self> {
66        Self::new(iface, NDN_PORT, NDN_MULTICAST_V4, id).await
67    }
68
69    /// Wrap a pre-configured socket. The caller is responsible for binding and
70    /// joining the multicast group. Useful when `SO_REUSEADDR` is needed.
71    pub fn with_socket(id: FaceId, socket: UdpSocket, dest: SocketAddr) -> Self {
72        Self {
73            id,
74            socket: Arc::new(socket),
75            dest,
76            mtu: DEFAULT_UDP_MTU,
77            seq: AtomicU64::new(0),
78            link_type: LinkType::MultiAccess,
79        }
80    }
81
82    /// Set link type to `AdHoc` for Wi-Fi IBSS / MANET deployments.
83    ///
84    /// Ad-hoc link type signals to forwarding strategies that not all nodes on
85    /// the link hear every multicast frame, so multi-access Interest suppression
86    /// should be disabled.
87    pub fn ad_hoc(mut self) -> Self {
88        self.link_type = LinkType::AdHoc;
89        self
90    }
91
92    pub fn dest(&self) -> SocketAddr {
93        self.dest
94    }
95}
96
97impl MulticastUdpFace {
98    /// Receive the next NDN packet along with the UDP source address.
99    ///
100    /// Used by the discovery layer to learn the sender's address and create a
101    /// unicast reply face — without requiring the sender to embed their address
102    /// in the Interest payload.
103    pub async fn recv_with_source(&self) -> Result<(Bytes, std::net::SocketAddr), FaceError> {
104        let mut buf = vec![0u8; 9000];
105        let (n, src) = self.socket.recv_from(&mut buf).await?;
106        buf.truncate(n);
107        Ok((Bytes::from(buf), src))
108    }
109}
110
111impl Face for MulticastUdpFace {
112    fn id(&self) -> FaceId {
113        self.id
114    }
115    fn kind(&self) -> FaceKind {
116        FaceKind::Multicast
117    }
118    fn link_type(&self) -> LinkType {
119        self.link_type
120    }
121
122    /// Receive the next NDN packet from any sender on the multicast group.
123    async fn recv(&self) -> Result<Bytes, FaceError> {
124        let (pkt, _src) = self.recv_with_source().await?;
125        Ok(pkt)
126    }
127
128    /// Receive packet and expose the UDP source address to the discovery layer.
129    async fn recv_with_addr(&self) -> Result<(Bytes, Option<FaceAddr>), FaceError> {
130        let (pkt, src) = self.recv_with_source().await?;
131        Ok((pkt, Some(FaceAddr::Udp(src))))
132    }
133
134    /// Broadcast an NDN packet to the multicast group.
135    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
136        if pkt.len() + 4 <= self.mtu {
137            let wire = ndn_packet::lp::encode_lp_packet(&pkt);
138            self.socket.send_to(&wire, self.dest).await?;
139        } else {
140            let seq = self.seq.fetch_add(1, Ordering::Relaxed);
141            let fragments = fragment_packet(&pkt, self.mtu, seq);
142            for frag in &fragments {
143                match self.socket.try_send_to(frag, self.dest) {
144                    Ok(_) => {}
145                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
146                        self.socket.send_to(frag, self.dest).await?;
147                    }
148                    Err(e) => return Err(e.into()),
149                }
150            }
151        }
152        Ok(())
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[test]
161    fn ndn_multicast_group_is_multicast() {
162        assert!(NDN_MULTICAST_V4.is_multicast());
163        assert_eq!(NDN_MULTICAST_V4.octets(), [224, 0, 23, 170]);
164    }
165
166    #[test]
167    fn ndn_port_is_6363() {
168        assert_eq!(NDN_PORT, 6363);
169    }
170
171    #[tokio::test]
172    async fn with_socket_metadata() {
173        let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
174        let dest: SocketAddr = "224.0.23.170:6363".parse().unwrap();
175        let face = MulticastUdpFace::with_socket(FaceId(3), socket, dest);
176        assert_eq!(face.id(), FaceId(3));
177        assert_eq!(face.kind(), FaceKind::Multicast);
178        assert_eq!(face.dest(), dest);
179    }
180
181    /// Full multicast loopback: may be skipped in sandboxed CI environments
182    /// where joining multicast groups is restricted or loopback is unsupported.
183    #[tokio::test]
184    async fn multicast_loopback_roundtrip() {
185        let group = NDN_MULTICAST_V4;
186        let iface = Ipv4Addr::LOCALHOST;
187
188        // Bind two sockets on OS-chosen ports.
189        let sock_send = UdpSocket::bind("0.0.0.0:0").await.unwrap();
190        let sock_recv = UdpSocket::bind("0.0.0.0:0").await.unwrap();
191        let recv_port = sock_recv.local_addr().unwrap().port();
192
193        if sock_send.set_multicast_loop_v4(true).is_err() {
194            return; // multicast loop unsupported — skip
195        }
196        if sock_recv.join_multicast_v4(group, iface).is_err() {
197            return; // multicast join unsupported — skip
198        }
199
200        let dest = SocketAddr::V4(SocketAddrV4::new(group, recv_port));
201        let sender = MulticastUdpFace::with_socket(FaceId(0), sock_send, dest);
202        let receiver = MulticastUdpFace::with_socket(FaceId(1), sock_recv, dest);
203
204        let pkt = Bytes::from_static(b"\x05\x03ndn");
205        if sender.send(pkt.clone()).await.is_err() {
206            return; // sending to multicast unsupported — skip
207        }
208
209        let expected = ndn_packet::lp::encode_lp_packet(&pkt);
210
211        // Wrap with a timeout — environments that route multicast away from loopback
212        // will block recv() indefinitely without one.
213        match tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv()).await {
214            Ok(Ok(received)) => assert_eq!(received, expected),
215            _ => { /* packet didn't arrive — skip */ }
216        }
217    }
218}