1use bytes::{Bytes, BytesMut};
2
3use ndn_packet::Name;
4
5pub const NDN_DEFAULT_SEGMENT_SIZE: usize = 8192;
7
8pub struct ChunkedProducer {
13 prefix: Name,
14 segments: Vec<Bytes>,
15}
16
17impl ChunkedProducer {
18 pub fn new(prefix: Name, payload: Bytes, segment_size: usize) -> Self {
20 let seg_size = segment_size.max(1);
21 let segments = payload
22 .chunks(seg_size)
23 .map(Bytes::copy_from_slice)
24 .collect();
25 Self { prefix, segments }
26 }
27
28 pub fn prefix(&self) -> &Name {
29 &self.prefix
30 }
31
32 pub fn segment_count(&self) -> usize {
33 self.segments.len()
34 }
35
36 pub fn segment(&self, index: usize) -> Option<&Bytes> {
38 self.segments.get(index)
39 }
40}
41
42pub struct ChunkedConsumer {
47 prefix: Name,
48 segment_count: usize,
49 received: Vec<Option<Bytes>>,
50}
51
52impl ChunkedConsumer {
53 pub fn new(prefix: Name, segment_count: usize) -> Self {
55 Self {
56 prefix,
57 segment_count,
58 received: vec![None; segment_count],
59 }
60 }
61
62 pub fn prefix(&self) -> &Name {
63 &self.prefix
64 }
65
66 pub fn segment_count(&self) -> usize {
67 self.segment_count
68 }
69
70 pub fn receive_segment(&mut self, index: usize, payload: Bytes) {
72 if index < self.segment_count {
73 self.received[index] = Some(payload);
74 }
75 }
76
77 pub fn is_complete(&self) -> bool {
79 self.received.iter().all(Option::is_some)
80 }
81
82 pub fn reassemble(&self) -> Option<Bytes> {
84 if !self.is_complete() {
85 return None;
86 }
87 let total: usize = self
88 .received
89 .iter()
90 .filter_map(Option::as_ref)
91 .map(Bytes::len)
92 .sum();
93 let mut out = BytesMut::with_capacity(total);
94 for seg in &self.received {
95 out.extend_from_slice(seg.as_ref().unwrap());
96 }
97 Some(out.freeze())
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 fn prefix() -> Name {
106 Name::root()
107 }
108
109 #[test]
110 fn producer_single_segment() {
111 let payload = Bytes::from_static(b"hello");
112 let p = ChunkedProducer::new(prefix(), payload.clone(), 8192);
113 assert_eq!(p.segment_count(), 1);
114 assert_eq!(p.segment(0).unwrap(), &payload);
115 }
116
117 #[test]
118 fn producer_multiple_segments() {
119 let payload = Bytes::from(vec![0u8; 100]);
120 let p = ChunkedProducer::new(prefix(), payload, 30);
121 assert_eq!(p.segment_count(), 4);
123 assert_eq!(p.segment(0).unwrap().len(), 30);
124 assert_eq!(p.segment(3).unwrap().len(), 10);
125 }
126
127 #[test]
128 fn producer_out_of_range_returns_none() {
129 let p = ChunkedProducer::new(prefix(), Bytes::from_static(b"x"), 8192);
130 assert!(p.segment(1).is_none());
131 }
132
133 #[test]
134 fn consumer_reassembles_in_order() {
135 let payload = Bytes::from_static(b"hello world");
136 let p = ChunkedProducer::new(prefix(), payload.clone(), 5);
137 let mut c = ChunkedConsumer::new(prefix(), p.segment_count());
138 for i in 0..p.segment_count() {
139 c.receive_segment(i, p.segment(i).unwrap().clone());
140 }
141 assert!(c.is_complete());
142 assert_eq!(c.reassemble().unwrap(), payload);
143 }
144
145 #[test]
146 fn consumer_reassembles_out_of_order() {
147 let payload = Bytes::from(b"abcde".repeat(2).to_vec());
148 let p = ChunkedProducer::new(prefix(), payload.clone(), 5);
149 let mut c = ChunkedConsumer::new(prefix(), p.segment_count());
150 c.receive_segment(1, p.segment(1).unwrap().clone());
152 assert!(!c.is_complete());
153 c.receive_segment(0, p.segment(0).unwrap().clone());
154 assert!(c.is_complete());
155 assert_eq!(c.reassemble().unwrap(), payload);
156 }
157
158 #[test]
159 fn consumer_incomplete_reassemble_returns_none() {
160 let mut c = ChunkedConsumer::new(prefix(), 3);
161 c.receive_segment(0, Bytes::from_static(b"a"));
162 assert!(!c.is_complete());
163 assert!(c.reassemble().is_none());
164 }
165
166 #[test]
167 fn consumer_ignores_out_of_range_segment() {
168 let mut c = ChunkedConsumer::new(prefix(), 2);
169 c.receive_segment(99, Bytes::from_static(b"x")); assert!(!c.is_complete());
171 }
172}