ndn_ipc/
chunked.rs

1use bytes::{Bytes, BytesMut};
2
3use ndn_packet::Name;
4
5/// Default segment size: 8 KiB, well under any NDN MTU.
6pub const NDN_DEFAULT_SEGMENT_SIZE: usize = 8192;
7
8/// Segments a large payload into fixed-size chunks for NDN-style chunked transfer.
9///
10/// Each segment is identified by its zero-based index; the total segment count
11/// is available via `segment_count()` for FinalBlockId encoding.
12pub struct ChunkedProducer {
13    prefix: Name,
14    segments: Vec<Bytes>,
15}
16
17impl ChunkedProducer {
18    /// Segment `payload` into chunks of at most `segment_size` bytes.
19    pub fn new(prefix: Name, payload: Bytes, segment_size: usize) -> Self {
20        let seg_size = segment_size.max(1);
21        let segments = payload
22            .chunks(seg_size)
23            .map(Bytes::copy_from_slice)
24            .collect();
25        Self { prefix, segments }
26    }
27
28    pub fn prefix(&self) -> &Name {
29        &self.prefix
30    }
31
32    pub fn segment_count(&self) -> usize {
33        self.segments.len()
34    }
35
36    /// Return the payload for segment `index`, or `None` if out of range.
37    pub fn segment(&self, index: usize) -> Option<&Bytes> {
38        self.segments.get(index)
39    }
40}
41
42/// Reassembles segments produced by `ChunkedProducer` into the original payload.
43///
44/// Segments may arrive out of order; `receive_segment` inserts each one by
45/// index. `reassemble` returns `Some(Bytes)` once all segments are present.
46pub struct ChunkedConsumer {
47    prefix: Name,
48    segment_count: usize,
49    received: Vec<Option<Bytes>>,
50}
51
52impl ChunkedConsumer {
53    /// Create a consumer expecting exactly `segment_count` segments.
54    pub fn new(prefix: Name, segment_count: usize) -> Self {
55        Self {
56            prefix,
57            segment_count,
58            received: vec![None; segment_count],
59        }
60    }
61
62    pub fn prefix(&self) -> &Name {
63        &self.prefix
64    }
65
66    pub fn segment_count(&self) -> usize {
67        self.segment_count
68    }
69
70    /// Store the payload for `index`.  Out-of-range indices are silently dropped.
71    pub fn receive_segment(&mut self, index: usize, payload: Bytes) {
72        if index < self.segment_count {
73            self.received[index] = Some(payload);
74        }
75    }
76
77    /// Returns `true` when every segment has been received.
78    pub fn is_complete(&self) -> bool {
79        self.received.iter().all(Option::is_some)
80    }
81
82    /// Concatenate all segments in order.  Returns `None` if incomplete.
83    pub fn reassemble(&self) -> Option<Bytes> {
84        if !self.is_complete() {
85            return None;
86        }
87        let total: usize = self
88            .received
89            .iter()
90            .filter_map(Option::as_ref)
91            .map(Bytes::len)
92            .sum();
93        let mut out = BytesMut::with_capacity(total);
94        for seg in &self.received {
95            out.extend_from_slice(seg.as_ref().unwrap());
96        }
97        Some(out.freeze())
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    fn prefix() -> Name {
106        Name::root()
107    }
108
109    #[test]
110    fn producer_single_segment() {
111        let payload = Bytes::from_static(b"hello");
112        let p = ChunkedProducer::new(prefix(), payload.clone(), 8192);
113        assert_eq!(p.segment_count(), 1);
114        assert_eq!(p.segment(0).unwrap(), &payload);
115    }
116
117    #[test]
118    fn producer_multiple_segments() {
119        let payload = Bytes::from(vec![0u8; 100]);
120        let p = ChunkedProducer::new(prefix(), payload, 30);
121        // ceil(100 / 30) = 4
122        assert_eq!(p.segment_count(), 4);
123        assert_eq!(p.segment(0).unwrap().len(), 30);
124        assert_eq!(p.segment(3).unwrap().len(), 10);
125    }
126
127    #[test]
128    fn producer_out_of_range_returns_none() {
129        let p = ChunkedProducer::new(prefix(), Bytes::from_static(b"x"), 8192);
130        assert!(p.segment(1).is_none());
131    }
132
133    #[test]
134    fn consumer_reassembles_in_order() {
135        let payload = Bytes::from_static(b"hello world");
136        let p = ChunkedProducer::new(prefix(), payload.clone(), 5);
137        let mut c = ChunkedConsumer::new(prefix(), p.segment_count());
138        for i in 0..p.segment_count() {
139            c.receive_segment(i, p.segment(i).unwrap().clone());
140        }
141        assert!(c.is_complete());
142        assert_eq!(c.reassemble().unwrap(), payload);
143    }
144
145    #[test]
146    fn consumer_reassembles_out_of_order() {
147        let payload = Bytes::from(b"abcde".repeat(2).to_vec());
148        let p = ChunkedProducer::new(prefix(), payload.clone(), 5);
149        let mut c = ChunkedConsumer::new(prefix(), p.segment_count());
150        // receive segment 1 first
151        c.receive_segment(1, p.segment(1).unwrap().clone());
152        assert!(!c.is_complete());
153        c.receive_segment(0, p.segment(0).unwrap().clone());
154        assert!(c.is_complete());
155        assert_eq!(c.reassemble().unwrap(), payload);
156    }
157
158    #[test]
159    fn consumer_incomplete_reassemble_returns_none() {
160        let mut c = ChunkedConsumer::new(prefix(), 3);
161        c.receive_segment(0, Bytes::from_static(b"a"));
162        assert!(!c.is_complete());
163        assert!(c.reassemble().is_none());
164    }
165
166    #[test]
167    fn consumer_ignores_out_of_range_segment() {
168        let mut c = ChunkedConsumer::new(prefix(), 2);
169        c.receive_segment(99, Bytes::from_static(b"x")); // out of range, no panic
170        assert!(!c.is_complete());
171    }
172}