xref: /aosp_15_r20/external/crosvm/net_util/src/slirp/packet_ring_buffer.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::collections::VecDeque;
6 use std::fmt;
7 use std::fmt::Display;
8 use std::time::Duration;
9 
10 use pcap_file::pcap::PacketHeader;
11 
12 const PACKET_HEADER_SIZE_IN_BYTES: usize = std::mem::size_of::<PacketHeader>();
13 
14 /// A wrapper around a ringer buffer that stores packet information.
15 /// This was made so on crosvm, we can write the packet information to a file
16 /// for debugging purposes.
17 
18 pub struct PacketRingBuffer {
19     ring_buffer: VecDeque<PacketInfo>,
20     max_size_in_bytes: usize,
21     current_size_in_bytes: usize,
22     last_popped_packet_timestamp: Option<Duration>,
23 }
24 
25 pub struct PacketInfo {
26     pub buf: Vec<u8>,
27     pub timestamp: Duration,
28 }
29 
30 #[derive(Eq, PartialEq, Debug)]
31 pub enum Error {
32     PacketTooBigError {
33         rb_max_size_in_bytes: usize,
34         packet_size_in_bytes: usize,
35     },
36 }
37 pub type Result<T> = std::result::Result<T, Error>;
38 
39 impl Display for Error {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result40     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41         use self::Error::*;
42 
43         match self {
44             PacketTooBigError {
45                 rb_max_size_in_bytes,
46                 packet_size_in_bytes,
47             } => write!(
48                 f,
49                 "Packet of size {} bytes can't fit into Ring buffer of size {}",
50                 rb_max_size_in_bytes, packet_size_in_bytes
51             ),
52         }
53     }
54 }
55 
56 impl PacketRingBuffer {
new(max_size_in_bytes: usize) -> PacketRingBuffer57     pub fn new(max_size_in_bytes: usize) -> PacketRingBuffer {
58         PacketRingBuffer {
59             ring_buffer: VecDeque::new(),
60             max_size_in_bytes,
61             current_size_in_bytes: 0,
62             last_popped_packet_timestamp: None,
63         }
64     }
65 
add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()>66     pub fn add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()> {
67         self.prune_from_ring_buffer_if_oversized(buf)?;
68 
69         self.ring_buffer.push_front(PacketInfo {
70             buf: buf.to_vec(),
71             timestamp: packet_timestamp,
72         });
73         self.current_size_in_bytes += buf.len() + PACKET_HEADER_SIZE_IN_BYTES;
74 
75         Ok(())
76     }
77 
78     // While the size of the rb with the new packet is less than the max size of the rb.
prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()>79     fn prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()> {
80         let new_packet_size_in_bytes = buf.len() + PACKET_HEADER_SIZE_IN_BYTES;
81 
82         while self.current_size_in_bytes + new_packet_size_in_bytes > self.max_size_in_bytes {
83             match self.ring_buffer.pop_back() {
84                 Some(val) => {
85                     self.current_size_in_bytes -= val.buf.len() + PACKET_HEADER_SIZE_IN_BYTES;
86                     self.last_popped_packet_timestamp = self
87                         .last_popped_packet_timestamp
88                         .map(|t| std::cmp::max(t, val.timestamp))
89                         .or(Some(val.timestamp))
90                 }
91                 None => {
92                     return Err(Error::PacketTooBigError {
93                         rb_max_size_in_bytes: self.max_size_in_bytes,
94                         packet_size_in_bytes: new_packet_size_in_bytes,
95                     })
96                 }
97             }
98         }
99         Ok(())
100     }
101 
102     /// Aggregates two ring buffers of packets by removing packets prior to the max oldest
103     /// removed packet and sorting them by time.
pop_ring_buffers_and_aggregate<'a>( packet_rb1: &'a mut PacketRingBuffer, packet_rb2: &'a mut PacketRingBuffer, ) -> Vec<&'a PacketInfo>104     pub fn pop_ring_buffers_and_aggregate<'a>(
105         packet_rb1: &'a mut PacketRingBuffer,
106         packet_rb2: &'a mut PacketRingBuffer,
107     ) -> Vec<&'a PacketInfo> {
108         let mut result: Vec<&PacketInfo> = Vec::new();
109         result.extend(packet_rb1.ring_buffer.iter().collect::<Vec<&PacketInfo>>());
110         result.extend(packet_rb2.ring_buffer.iter().collect::<Vec<&PacketInfo>>());
111 
112         // The oldest time we want to keep in the aggregated result.
113         let start_time = std::cmp::max(
114             packet_rb1.last_popped_packet_timestamp,
115             packet_rb2.last_popped_packet_timestamp,
116         );
117 
118         let mut result = if let Some(start_time) = start_time {
119             result
120                 .into_iter()
121                 .filter(|packet| packet.timestamp > start_time)
122                 .collect()
123         } else {
124             result
125         };
126 
127         result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
128         result
129     }
130 }
131 
132 #[cfg(test)]
133 mod tests {
134     use super::*;
135 
136     #[test]
test_add()137     fn test_add() {
138         let mut packet_rb = PacketRingBuffer::new(
139             /* max_size_in_bytes= */ 4 + PACKET_HEADER_SIZE_IN_BYTES,
140         );
141         let buf: &[u8] = &[1, 2, 3, 4];
142         let start_time = Duration::from_nanos(45);
143 
144         assert_eq!(packet_rb.ring_buffer.len(), 0);
145 
146         packet_rb
147             .add_packet(buf, start_time)
148             .expect("Failed to add packet.");
149 
150         let packet = packet_rb.ring_buffer.pop_back().unwrap();
151         assert_eq!(packet.buf, &[1, 2, 3, 4]);
152         assert_eq!(packet.timestamp.as_nanos(), 45);
153         assert_eq!(packet_rb.last_popped_packet_timestamp, None);
154         // Each packet has 16 bytes in it's PacketHeader
155         assert_eq!(
156             packet_rb.current_size_in_bytes,
157             4 + PACKET_HEADER_SIZE_IN_BYTES
158         );
159     }
160 
161     #[test]
test_add_no_space()162     fn test_add_no_space() {
163         // Max size is 3 bytes of buffer data + PACKET_HEADER_SIZE_IN_BYTES
164         let mut packet_rb = PacketRingBuffer::new(
165             /* max_size_in_bytes= */ 3 + PACKET_HEADER_SIZE_IN_BYTES,
166         );
167         let buf: &[u8] = &[1, 2, 3, 4];
168         let start_time = Duration::from_nanos(45);
169 
170         let res = packet_rb.add_packet(buf, start_time);
171 
172         // Should error because rb size is 19 bytes, but packet will take 20 bytes (4 bytes in
173         // buffer + 16 bytes from Packet header)
174         assert!(res.is_err());
175 
176         assert_eq!(
177             res.unwrap_err(),
178             Error::PacketTooBigError {
179                 rb_max_size_in_bytes: packet_rb.max_size_in_bytes,
180                 packet_size_in_bytes: 4 + PACKET_HEADER_SIZE_IN_BYTES
181             }
182         );
183     }
184 
185     #[test]
test_add_exceeds_size_pop_one()186     fn test_add_exceeds_size_pop_one() {
187         let mut packet_rb = PacketRingBuffer::new(
188             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
189         );
190 
191         packet_rb
192             .add_packet(&[1], Duration::from_nanos(1))
193             .expect("Failed to add packet.");
194         packet_rb
195             .add_packet(&[2], Duration::from_nanos(2))
196             .expect("Failed to add packet.");
197         packet_rb
198             .add_packet(&[3], Duration::from_nanos(3))
199             .expect("Failed to add packet.");
200         packet_rb
201             .add_packet(&[4], Duration::from_nanos(4))
202             .expect("Failed to add packet.");
203 
204         assert_eq!(packet_rb.ring_buffer.len(), 3);
205 
206         let packet1 = packet_rb.ring_buffer.pop_back().unwrap();
207         let packet2 = packet_rb.ring_buffer.pop_back().unwrap();
208         let packet3 = packet_rb.ring_buffer.pop_back().unwrap();
209 
210         assert_eq!(packet1.buf, &[2]);
211         assert_eq!(packet1.timestamp.as_nanos(), 2);
212         assert_eq!(packet2.buf, &[3]);
213         assert_eq!(packet2.timestamp.as_nanos(), 3);
214         assert_eq!(packet3.buf, &[4]);
215         assert_eq!(packet3.timestamp.as_nanos(), 4);
216         assert_eq!(
217             packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(),
218             1
219         );
220         assert_eq!(packet_rb.current_size_in_bytes, 3 + 3 * 16);
221     }
222 
223     #[test]
test_add_exceeds_size_pop_multiple()224     fn test_add_exceeds_size_pop_multiple() {
225         let mut packet_rb = PacketRingBuffer::new(
226             /* max_size_in_bytes= */ 2 + PACKET_HEADER_SIZE_IN_BYTES,
227         );
228 
229         packet_rb
230             .add_packet(&[1], Duration::from_nanos(1))
231             .expect("Failed to add packet.");
232         packet_rb
233             .add_packet(&[2], Duration::from_nanos(2))
234             .expect("Failed to add packet.");
235         packet_rb
236             .add_packet(&[3, 4], Duration::from_nanos(3))
237             .expect("Failed to add packet.");
238         packet_rb
239             .add_packet(&[5, 6], Duration::from_nanos(4))
240             .expect("Failed to add packet.");
241 
242         // The first 3 packets should've been popped
243         assert_eq!(packet_rb.ring_buffer.len(), 1);
244 
245         let packet1 = packet_rb.ring_buffer.pop_back().unwrap();
246 
247         assert_eq!(packet1.buf, &[5, 6]);
248         assert_eq!(packet1.timestamp.as_nanos(), 4);
249         assert_eq!(
250             packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(),
251             3
252         );
253         assert_eq!(
254             packet_rb.current_size_in_bytes,
255             2 + PACKET_HEADER_SIZE_IN_BYTES
256         );
257     }
258 
259     #[test]
test_aggregate_one_empty()260     fn test_aggregate_one_empty() {
261         let mut tx_packet_rb = PacketRingBuffer::new(
262             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
263         );
264 
265         let mut rx_packet_rb = PacketRingBuffer::new(
266             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
267         );
268         rx_packet_rb
269             .add_packet(&[4], Duration::from_nanos(6))
270             .expect("Failed to add packet.");
271         rx_packet_rb
272             .add_packet(&[5], Duration::from_nanos(10))
273             .expect("Failed to add packet.");
274 
275         let res =
276             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
277         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
278 
279         assert_eq!(packet_data_list, [&[4], &[5]]);
280     }
281 
282     #[test]
test_aggregate_both_empty()283     fn test_aggregate_both_empty() {
284         let mut tx_packet_rb = PacketRingBuffer::new(
285             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
286         );
287 
288         let mut rx_packet_rb = PacketRingBuffer::new(
289             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
290         );
291 
292         let res =
293             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
294         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
295 
296         assert!(packet_data_list.is_empty());
297     }
298 
299     #[test]
test_aggregate_none_popped()300     fn test_aggregate_none_popped() {
301         let mut tx_packet_rb = PacketRingBuffer::new(
302             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
303         );
304         tx_packet_rb
305             .add_packet(&[1], Duration::from_nanos(2))
306             .expect("Failed to add packet.");
307         tx_packet_rb
308             .add_packet(&[2], Duration::from_nanos(8))
309             .expect("Failed to add packet.");
310         tx_packet_rb
311             .add_packet(&[3], Duration::from_nanos(9))
312             .expect("Failed to add packet.");
313 
314         let mut rx_packet_rb = PacketRingBuffer::new(
315             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
316         );
317         rx_packet_rb
318             .add_packet(&[4], Duration::from_nanos(6))
319             .expect("Failed to add packet.");
320         rx_packet_rb
321             .add_packet(&[5], Duration::from_nanos(10))
322             .expect("Failed to add packet.");
323 
324         let res =
325             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
326 
327         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
328         assert_eq!(packet_data_list, [&[1], &[4], &[2], &[3], &[5]]);
329     }
330 
331     #[test]
test_aggregate_with_one_ring_buffer_popped()332     fn test_aggregate_with_one_ring_buffer_popped() {
333         let mut tx_packet_rb = PacketRingBuffer::new(
334             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
335         );
336         tx_packet_rb
337             .add_packet(&[1], Duration::from_nanos(2))
338             .expect("Failed to add packet.");
339         tx_packet_rb
340             .add_packet(&[2], Duration::from_nanos(8))
341             .expect("Failed to add packet.");
342         tx_packet_rb
343             .add_packet(&[3, 4], Duration::from_nanos(9))
344             .expect("Failed to add packet.");
345         tx_packet_rb
346             .add_packet(&[5], Duration::from_nanos(14))
347             .expect("Failed to add packet.");
348 
349         let mut rx_packet_rb = PacketRingBuffer::new(
350             /* max_size_in_bytes= */ 2 + 2 * PACKET_HEADER_SIZE_IN_BYTES,
351         );
352         rx_packet_rb
353             .add_packet(&[6], Duration::from_nanos(6))
354             .expect("Failed to add packet.");
355         rx_packet_rb
356             .add_packet(&[7], Duration::from_nanos(10))
357             .expect("Failed to add packet.");
358 
359         let res =
360             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
361 
362         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
363         assert_eq!(packet_data_list.len(), 3);
364         assert_eq!(packet_data_list[0], &[3, 4]);
365         assert_eq!(packet_data_list[1], &[7]);
366         assert_eq!(packet_data_list[2], &[5]);
367     }
368 
369     #[test]
test_aggregate_with_both_ring_buffers_popped()370     fn test_aggregate_with_both_ring_buffers_popped() {
371         let mut tx_packet_rb = PacketRingBuffer::new(
372             /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES,
373         );
374         tx_packet_rb
375             .add_packet(&[1], Duration::from_nanos(2))
376             .expect("Failed to add packet.");
377         tx_packet_rb
378             .add_packet(&[2], Duration::from_nanos(8))
379             .expect("Failed to add packet.");
380         tx_packet_rb
381             .add_packet(&[3, 4], Duration::from_nanos(9))
382             .expect("Failed to add packet.");
383         tx_packet_rb
384             .add_packet(&[5], Duration::from_nanos(15))
385             .expect("Failed to add packet.");
386 
387         let mut rx_packet_rb = PacketRingBuffer::new(
388             /* max_size_in_bytes= */ 5 + 2 * PACKET_HEADER_SIZE_IN_BYTES,
389         );
390         rx_packet_rb
391             .add_packet(&[6], Duration::from_nanos(6))
392             .expect("Failed to add packet.");
393         rx_packet_rb
394             .add_packet(&[7, 8], Duration::from_nanos(10))
395             .expect("Failed to add packet.");
396         rx_packet_rb
397             .add_packet(&[9], Duration::from_nanos(12))
398             .expect("Failed to add packet.");
399         rx_packet_rb
400             .add_packet(&[10, 11, 12], Duration::from_nanos(13))
401             .expect("Failed to add packet.");
402         rx_packet_rb
403             .add_packet(&[13, 14], Duration::from_nanos(16))
404             .expect("Failed to add packet.");
405 
406         let res =
407             PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb);
408 
409         let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect();
410         assert_eq!(packet_data_list.len(), 3);
411         assert_eq!(packet_data_list[0], &[10, 11, 12]);
412         assert_eq!(packet_data_list[1], &[5]);
413         assert_eq!(packet_data_list[2], &[13, 14]);
414     }
415 }
416