ndn_faces/net/
multicast.rs1use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use bytes::Bytes;
6use tokio::net::UdpSocket;
7
8use ndn_packet::fragment::{DEFAULT_UDP_MTU, fragment_packet};
9use ndn_transport::{Face, FaceAddr, FaceError, FaceId, FaceKind, LinkType};
10
11pub const NDN_MULTICAST_V4: Ipv4Addr = Ipv4Addr::new(224, 0, 23, 170);
13
14pub const NDN_PORT: u16 = 6363;
16
17pub struct MulticastUdpFace {
32 id: FaceId,
33 socket: Arc<UdpSocket>,
34 dest: SocketAddr,
35 mtu: usize,
36 seq: AtomicU64,
37 link_type: LinkType,
40}
41
42impl MulticastUdpFace {
43 pub async fn new(
46 iface: Ipv4Addr,
47 port: u16,
48 group: Ipv4Addr,
49 id: FaceId,
50 ) -> std::io::Result<Self> {
51 let socket = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)).await?;
52 socket.set_multicast_loop_v4(true)?;
53 socket.join_multicast_v4(group, iface)?;
54 Ok(Self {
55 id,
56 socket: Arc::new(socket),
57 dest: SocketAddr::V4(SocketAddrV4::new(group, port)),
58 mtu: DEFAULT_UDP_MTU,
59 seq: AtomicU64::new(0),
60 link_type: LinkType::MultiAccess,
61 })
62 }
63
64 pub async fn ndn_default(iface: Ipv4Addr, id: FaceId) -> std::io::Result<Self> {
66 Self::new(iface, NDN_PORT, NDN_MULTICAST_V4, id).await
67 }
68
69 pub fn with_socket(id: FaceId, socket: UdpSocket, dest: SocketAddr) -> Self {
72 Self {
73 id,
74 socket: Arc::new(socket),
75 dest,
76 mtu: DEFAULT_UDP_MTU,
77 seq: AtomicU64::new(0),
78 link_type: LinkType::MultiAccess,
79 }
80 }
81
82 pub fn ad_hoc(mut self) -> Self {
88 self.link_type = LinkType::AdHoc;
89 self
90 }
91
92 pub fn dest(&self) -> SocketAddr {
93 self.dest
94 }
95}
96
97impl MulticastUdpFace {
98 pub async fn recv_with_source(&self) -> Result<(Bytes, std::net::SocketAddr), FaceError> {
104 let mut buf = vec![0u8; 9000];
105 let (n, src) = self.socket.recv_from(&mut buf).await?;
106 buf.truncate(n);
107 Ok((Bytes::from(buf), src))
108 }
109}
110
111impl Face for MulticastUdpFace {
112 fn id(&self) -> FaceId {
113 self.id
114 }
115 fn kind(&self) -> FaceKind {
116 FaceKind::Multicast
117 }
118 fn link_type(&self) -> LinkType {
119 self.link_type
120 }
121
122 async fn recv(&self) -> Result<Bytes, FaceError> {
124 let (pkt, _src) = self.recv_with_source().await?;
125 Ok(pkt)
126 }
127
128 async fn recv_with_addr(&self) -> Result<(Bytes, Option<FaceAddr>), FaceError> {
130 let (pkt, src) = self.recv_with_source().await?;
131 Ok((pkt, Some(FaceAddr::Udp(src))))
132 }
133
134 async fn send(&self, pkt: Bytes) -> Result<(), FaceError> {
136 if pkt.len() + 4 <= self.mtu {
137 let wire = ndn_packet::lp::encode_lp_packet(&pkt);
138 self.socket.send_to(&wire, self.dest).await?;
139 } else {
140 let seq = self.seq.fetch_add(1, Ordering::Relaxed);
141 let fragments = fragment_packet(&pkt, self.mtu, seq);
142 for frag in &fragments {
143 match self.socket.try_send_to(frag, self.dest) {
144 Ok(_) => {}
145 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
146 self.socket.send_to(frag, self.dest).await?;
147 }
148 Err(e) => return Err(e.into()),
149 }
150 }
151 }
152 Ok(())
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159
160 #[test]
161 fn ndn_multicast_group_is_multicast() {
162 assert!(NDN_MULTICAST_V4.is_multicast());
163 assert_eq!(NDN_MULTICAST_V4.octets(), [224, 0, 23, 170]);
164 }
165
166 #[test]
167 fn ndn_port_is_6363() {
168 assert_eq!(NDN_PORT, 6363);
169 }
170
171 #[tokio::test]
172 async fn with_socket_metadata() {
173 let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
174 let dest: SocketAddr = "224.0.23.170:6363".parse().unwrap();
175 let face = MulticastUdpFace::with_socket(FaceId(3), socket, dest);
176 assert_eq!(face.id(), FaceId(3));
177 assert_eq!(face.kind(), FaceKind::Multicast);
178 assert_eq!(face.dest(), dest);
179 }
180
181 #[tokio::test]
184 async fn multicast_loopback_roundtrip() {
185 let group = NDN_MULTICAST_V4;
186 let iface = Ipv4Addr::LOCALHOST;
187
188 let sock_send = UdpSocket::bind("0.0.0.0:0").await.unwrap();
190 let sock_recv = UdpSocket::bind("0.0.0.0:0").await.unwrap();
191 let recv_port = sock_recv.local_addr().unwrap().port();
192
193 if sock_send.set_multicast_loop_v4(true).is_err() {
194 return; }
196 if sock_recv.join_multicast_v4(group, iface).is_err() {
197 return; }
199
200 let dest = SocketAddr::V4(SocketAddrV4::new(group, recv_port));
201 let sender = MulticastUdpFace::with_socket(FaceId(0), sock_send, dest);
202 let receiver = MulticastUdpFace::with_socket(FaceId(1), sock_recv, dest);
203
204 let pkt = Bytes::from_static(b"\x05\x03ndn");
205 if sender.send(pkt.clone()).await.is_err() {
206 return; }
208
209 let expected = ndn_packet::lp::encode_lp_packet(&pkt);
210
211 match tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv()).await {
214 Ok(Ok(received)) => assert_eq!(received, expected),
215 _ => { }
216 }
217 }
218}