ndn_face_net/
multicast.rs

1use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use bytes::Bytes;
6use tokio::net::UdpSocket;
7
8use ndn_packet::fragment::{DEFAULT_UDP_MTU, fragment_packet};
9use ndn_transport::{Face, FaceAddr, FaceError, FaceId, FaceKind};
10
11/// IANA-assigned NDN IPv4 link-local multicast group.
12pub const NDN_MULTICAST_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 23, 170);
13
14/// NDN standard UDP port.
15pub const NDN_PORT: u16 = 6363;
16
17/// NDN face over IPv4 link-local multicast.
18///
19/// Interests sent on this face reach all NDN-capable nodes on the local link
20/// without requiring prior knowledge of their addresses. Data is returned via
21/// unicast `UdpFace` to the specific responder.
22///
23/// ## Typical usage
24///
25/// 1. Start `MulticastUdpFace` at boot for neighbor discovery and prefix
26///    announcements.
27/// 2. On receiving a multicast Interest, create a unicast `UdpFace` back to
28///    the responder's source address and register it in the FIB.
29/// 3. Subsequent traffic uses the unicast face; the multicast face handles
30///    only discovery and control traffic.
31pub struct MulticastUdpFace {
32    id: FaceId,
33    socket: Arc<UdpSocket>,
34    dest: SocketAddr,
35    mtu: usize,
36    seq: AtomicU64,
37}
38
39impl MulticastUdpFace {
40    /// Bind to `port`, join `group` on interface `iface`.
41    /// Use `NDN_MULTICAST_V4` and `NDN_PORT` for standard NDN.
42    pub async fn new(
43        iface: Ipv4Addr,
44        port: u16,
45        group: Ipv4Addr,
46        id: FaceId,
47    ) -> std::io::Result<Self> {
48        let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)).await?;
49        socket.set_multicast_loop_v4(true)?;
50        socket.join_multicast_v4(group, iface)?;
51        Ok(Self {
52            id,
53            socket: Arc::new(socket),
54            dest: SocketAddr::V4(SocketAddrV4::new(group, port)),
55            mtu: DEFAULT_UDP_MTU,
56            seq: AtomicU64::new(0),
57        })
58    }
59
60    /// Standard NDN multicast (`224.0.23.170:6363`) on `iface`.
61    pub async fn ndn_default(iface: Ipv4Addr, id: FaceId) -> std::io::Result<Self> {
62        Self::new(iface, NDN_PORT, NDN_MULTICAST_V4, id).await
63    }
64
65    /// Wrap a pre-configured socket. The caller is responsible for binding and
66    /// joining the multicast group. Useful when `SO_REUSEADDR` is needed.
67    pub fn with_socket(id: FaceId, socket: UdpSocket, dest: SocketAddr) -> Self {
68        Self {
69            id,
70            socket: Arc::new(socket),
71            dest,
72            mtu: DEFAULT_UDP_MTU,
73            seq: AtomicU64::new(0),
74        }
75    }
76
77    pub fn dest(&self) -> SocketAddr {
78        self.dest
79    }
80}
81
82impl MulticastUdpFace {
83    /// Receive the next NDN packet along with the UDP source address.
84    ///
85    /// Used by the discovery layer to learn the sender's address and create a
86    /// unicast reply face — without requiring the sender to embed their address
87    /// in the Interest payload.
88    pub async fn recv_with_source(&self) -> Result<(Bytes, std::net::SocketAddr), FaceError> {
89        let mut buf = vec![0u8; 9000];
90        let (n, src) = self.socket.recv_from(&mut buf).await?;
91        buf.truncate(n);
92        Ok((Bytes::from(buf), src))
93    }
94}
95
96impl Face for MulticastUdpFace {
97    fn id(&self) -> FaceId {
98        self.id
99    }
100    fn kind(&self) -> FaceKind {
101        FaceKind::Multicast
102    }
103
104    /// Receive the next NDN packet from any sender on the multicast group.
105    async fn recv(&self) -> Result<Bytes, FaceError> {
106        let (pkt, _src) = self.recv_with_source().await?;
107        Ok(pkt)
108    }
109
110    /// Receive packet and expose the UDP source address to the discovery layer.
111    async fn recv_with_addr(&self) -> Result<(Bytes, Option<FaceAddr>), FaceError> {
112        let (pkt, src) = self.recv_with_source().await?;
113        Ok((pkt, Some(FaceAddr::Udp(src))))
114    }
115
116    /// Broadcast an NDN packet to the multicast group.
117    async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
118        if pkt.len() + 4 <= self.mtu {
119            let wire = ndn_packet::lp::encode_lp_packet(&pkt);
120            self.socket.send_to(&wire, self.dest).await?;
121        } else {
122            let seq = self.seq.fetch_add(1, Ordering::Relaxed);
123            let fragments = fragment_packet(&pkt, self.mtu, seq);
124            for frag in &fragments {
125                match self.socket.try_send_to(frag, self.dest) {
126                    Ok(_) => {}
127                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
128                        self.socket.send_to(frag, self.dest).await?;
129                    }
130                    Err(e) => return Err(e.into()),
131                }
132            }
133        }
134        Ok(())
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn ndn_multicast_group_is_multicast() {
144        assert!(NDN_MULTICAST_V4.is_multicast());
145        assert_eq!(NDN_MULTICAST_V4.octets(), [224, 0, 23, 170]);
146    }
147
148    #[test]
149    fn ndn_port_is_6363() {
150        assert_eq!(NDN_PORT, 6363);
151    }
152
153    #[tokio::test]
154    async fn with_socket_metadata() {
155        let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
156        let dest: SocketAddr = "224.0.23.170:6363".parse().unwrap();
157        let face = MulticastUdpFace::with_socket(FaceId(3), socket, dest);
158        assert_eq!(face.id(), FaceId(3));
159        assert_eq!(face.kind(), FaceKind::Multicast);
160        assert_eq!(face.dest(), dest);
161    }
162
163    /// Full multicast loopback: may be skipped in sandboxed CI environments
164    /// where joining multicast groups is restricted or loopback is unsupported.
165    #[tokio::test]
166    async fn multicast_loopback_roundtrip() {
167        let group = NDN_MULTICAST_V4;
168        let iface = Ipv4Addr::LOCALHOST;
169
170        // Bind two sockets on OS-chosen ports.
171        let sock_send = UdpSocket::bind("0.0.0.0:0").await.unwrap();
172        let sock_recv = UdpSocket::bind("0.0.0.0:0").await.unwrap();
173        let recv_port = sock_recv.local_addr().unwrap().port();
174
175        if sock_send.set_multicast_loop_v4(true).is_err() {
176            return; // multicast loop unsupported — skip
177        }
178        if sock_recv.join_multicast_v4(group, iface).is_err() {
179            return; // multicast join unsupported — skip
180        }
181
182        let dest = SocketAddr::V4(SocketAddrV4::new(group, recv_port));
183        let sender = MulticastUdpFace::with_socket(FaceId(0), sock_send, dest);
184        let receiver = MulticastUdpFace::with_socket(FaceId(1), sock_recv, dest);
185
186        let pkt = Bytes::from_static(b"\x05\x03ndn");
187        if sender.send(pkt.clone()).await.is_err() {
188            return; // sending to multicast unsupported — skip
189        }
190
191        let expected = ndn_packet::lp::encode_lp_packet(&pkt);
192
193        // Wrap with a timeout — environments that route multicast away from loopback
194        // will block recv() indefinitely without one.
195        match tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv()).await {
196            Ok(Ok(received)) => assert_eq!(received, expected),
197            _ => { /* packet didn't arrive — skip */ }
198        }
199    }
200}