ndn_face_net/
multicast.rs1use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use bytes::Bytes;
6use tokio::net::UdpSocket;
7
8use ndn_packet::fragment::{DEFAULT_UDP_MTU, fragment_packet};
9use ndn_transport::{Face, FaceAddr, FaceError, FaceId, FaceKind};
10
11pub const NDN_MULTICAST_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 23, 170);
13
14pub const NDN_PORT: u16 = 6363;
16
17pub struct MulticastUdpFace {
32 id: FaceId,
33 socket: Arc<UdpSocket>,
34 dest: SocketAddr,
35 mtu: usize,
36 seq: AtomicU64,
37}
38
39impl MulticastUdpFace {
40 pub async fn new(
43 iface: Ipv4Addr,
44 port: u16,
45 group: Ipv4Addr,
46 id: FaceId,
47 ) -> std::io::Result<Self> {
48 let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)).await?;
49 socket.set_multicast_loop_v4(true)?;
50 socket.join_multicast_v4(group, iface)?;
51 Ok(Self {
52 id,
53 socket: Arc::new(socket),
54 dest: SocketAddr::V4(SocketAddrV4::new(group, port)),
55 mtu: DEFAULT_UDP_MTU,
56 seq: AtomicU64::new(0),
57 })
58 }
59
60 pub async fn ndn_default(iface: Ipv4Addr, id: FaceId) -> std::io::Result<Self> {
62 Self::new(iface, NDN_PORT, NDN_MULTICAST_V4, id).await
63 }
64
65 pub fn with_socket(id: FaceId, socket: UdpSocket, dest: SocketAddr) -> Self {
68 Self {
69 id,
70 socket: Arc::new(socket),
71 dest,
72 mtu: DEFAULT_UDP_MTU,
73 seq: AtomicU64::new(0),
74 }
75 }
76
77 pub fn dest(&self) -> SocketAddr {
78 self.dest
79 }
80}
81
82impl MulticastUdpFace {
83 pub async fn recv_with_source(&self) -> Result<(Bytes, std::net::SocketAddr), FaceError> {
89 let mut buf = vec![0u8; 9000];
90 let (n, src) = self.socket.recv_from(&mut buf).await?;
91 buf.truncate(n);
92 Ok((Bytes::from(buf), src))
93 }
94}
95
96impl Face for MulticastUdpFace {
97 fn id(&self) -> FaceId {
98 self.id
99 }
100 fn kind(&self) -> FaceKind {
101 FaceKind::Multicast
102 }
103
104 async fn recv(&self) -> Result<Bytes, FaceError> {
106 let (pkt, _src) = self.recv_with_source().await?;
107 Ok(pkt)
108 }
109
110 async fn recv_with_addr(&self) -> Result<(Bytes, Option<FaceAddr>), FaceError> {
112 let (pkt, src) = self.recv_with_source().await?;
113 Ok((pkt, Some(FaceAddr::Udp(src))))
114 }
115
116 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
118 if pkt.len() + 4 <= self.mtu {
119 let wire = ndn_packet::lp::encode_lp_packet(&pkt);
120 self.socket.send_to(&wire, self.dest).await?;
121 } else {
122 let seq = self.seq.fetch_add(1, Ordering::Relaxed);
123 let fragments = fragment_packet(&pkt, self.mtu, seq);
124 for frag in &fragments {
125 match self.socket.try_send_to(frag, self.dest) {
126 Ok(_) => {}
127 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
128 self.socket.send_to(frag, self.dest).await?;
129 }
130 Err(e) => return Err(e.into()),
131 }
132 }
133 }
134 Ok(())
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn ndn_multicast_group_is_multicast() {
144 assert!(NDN_MULTICAST_V4.is_multicast());
145 assert_eq!(NDN_MULTICAST_V4.octets(), [224, 0, 23, 170]);
146 }
147
148 #[test]
149 fn ndn_port_is_6363() {
150 assert_eq!(NDN_PORT, 6363);
151 }
152
153 #[tokio::test]
154 async fn with_socket_metadata() {
155 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
156 let dest: SocketAddr = "224.0.23.170:6363".parse().unwrap();
157 let face = MulticastUdpFace::with_socket(FaceId(3), socket, dest);
158 assert_eq!(face.id(), FaceId(3));
159 assert_eq!(face.kind(), FaceKind::Multicast);
160 assert_eq!(face.dest(), dest);
161 }
162
163 #[tokio::test]
166 async fn multicast_loopback_roundtrip() {
167 let group = NDN_MULTICAST_V4;
168 let iface = Ipv4Addr::LOCALHOST;
169
170 let sock_send = UdpSocket::bind("0.0.0.0:0").await.unwrap();
172 let sock_recv = UdpSocket::bind("0.0.0.0:0").await.unwrap();
173 let recv_port = sock_recv.local_addr().unwrap().port();
174
175 if sock_send.set_multicast_loop_v4(true).is_err() {
176 return; }
178 if sock_recv.join_multicast_v4(group, iface).is_err() {
179 return; }
181
182 let dest = SocketAddr::V4(SocketAddrV4::new(group, recv_port));
183 let sender = MulticastUdpFace::with_socket(FaceId(0), sock_send, dest);
184 let receiver = MulticastUdpFace::with_socket(FaceId(1), sock_recv, dest);
185
186 let pkt = Bytes::from_static(b"\x05\x03ndn");
187 if sender.send(pkt.clone()).await.is_err() {
188 return; }
190
191 let expected = ndn_packet::lp::encode_lp_packet(&pkt);
192
193 match tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv()).await {
196 Ok(Ok(received)) => assert_eq!(received, expected),
197 _ => { }
198 }
199 }
200}