1 // Copyright 2022 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::collections::VecDeque; 6 use std::fmt; 7 use std::fmt::Display; 8 use std::time::Duration; 9 10 use pcap_file::pcap::PacketHeader; 11 12 const PACKET_HEADER_SIZE_IN_BYTES: usize = std::mem::size_of::<PacketHeader>(); 13 14 /// A wrapper around a ringer buffer that stores packet information. 15 /// This was made so on crosvm, we can write the packet information to a file 16 /// for debugging purposes. 17 18 pub struct PacketRingBuffer { 19 ring_buffer: VecDeque<PacketInfo>, 20 max_size_in_bytes: usize, 21 current_size_in_bytes: usize, 22 last_popped_packet_timestamp: Option<Duration>, 23 } 24 25 pub struct PacketInfo { 26 pub buf: Vec<u8>, 27 pub timestamp: Duration, 28 } 29 30 #[derive(Eq, PartialEq, Debug)] 31 pub enum Error { 32 PacketTooBigError { 33 rb_max_size_in_bytes: usize, 34 packet_size_in_bytes: usize, 35 }, 36 } 37 pub type Result<T> = std::result::Result<T, Error>; 38 39 impl Display for Error { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result40 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 41 use self::Error::*; 42 43 match self { 44 PacketTooBigError { 45 rb_max_size_in_bytes, 46 packet_size_in_bytes, 47 } => write!( 48 f, 49 "Packet of size {} bytes can't fit into Ring buffer of size {}", 50 rb_max_size_in_bytes, packet_size_in_bytes 51 ), 52 } 53 } 54 } 55 56 impl PacketRingBuffer { new(max_size_in_bytes: usize) -> PacketRingBuffer57 pub fn new(max_size_in_bytes: usize) -> PacketRingBuffer { 58 PacketRingBuffer { 59 ring_buffer: VecDeque::new(), 60 max_size_in_bytes, 61 current_size_in_bytes: 0, 62 last_popped_packet_timestamp: None, 63 } 64 } 65 add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()>66 pub fn add_packet(&mut self, buf: &[u8], packet_timestamp: Duration) -> Result<()> { 67 self.prune_from_ring_buffer_if_oversized(buf)?; 68 69 self.ring_buffer.push_front(PacketInfo { 70 buf: buf.to_vec(), 71 timestamp: packet_timestamp, 72 }); 73 self.current_size_in_bytes += buf.len() + PACKET_HEADER_SIZE_IN_BYTES; 74 75 Ok(()) 76 } 77 78 // While the size of the rb with the new packet is less than the max size of the rb. prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()>79 fn prune_from_ring_buffer_if_oversized(&mut self, buf: &[u8]) -> Result<()> { 80 let new_packet_size_in_bytes = buf.len() + PACKET_HEADER_SIZE_IN_BYTES; 81 82 while self.current_size_in_bytes + new_packet_size_in_bytes > self.max_size_in_bytes { 83 match self.ring_buffer.pop_back() { 84 Some(val) => { 85 self.current_size_in_bytes -= val.buf.len() + PACKET_HEADER_SIZE_IN_BYTES; 86 self.last_popped_packet_timestamp = self 87 .last_popped_packet_timestamp 88 .map(|t| std::cmp::max(t, val.timestamp)) 89 .or(Some(val.timestamp)) 90 } 91 None => { 92 return Err(Error::PacketTooBigError { 93 rb_max_size_in_bytes: self.max_size_in_bytes, 94 packet_size_in_bytes: new_packet_size_in_bytes, 95 }) 96 } 97 } 98 } 99 Ok(()) 100 } 101 102 /// Aggregates two ring buffers of packets by removing packets prior to the max oldest 103 /// removed packet and sorting them by time. pop_ring_buffers_and_aggregate<'a>( packet_rb1: &'a mut PacketRingBuffer, packet_rb2: &'a mut PacketRingBuffer, ) -> Vec<&'a PacketInfo>104 pub fn pop_ring_buffers_and_aggregate<'a>( 105 packet_rb1: &'a mut PacketRingBuffer, 106 packet_rb2: &'a mut PacketRingBuffer, 107 ) -> Vec<&'a PacketInfo> { 108 let mut result: Vec<&PacketInfo> = Vec::new(); 109 result.extend(packet_rb1.ring_buffer.iter().collect::<Vec<&PacketInfo>>()); 110 result.extend(packet_rb2.ring_buffer.iter().collect::<Vec<&PacketInfo>>()); 111 112 // The oldest time we want to keep in the aggregated result. 113 let start_time = std::cmp::max( 114 packet_rb1.last_popped_packet_timestamp, 115 packet_rb2.last_popped_packet_timestamp, 116 ); 117 118 let mut result = if let Some(start_time) = start_time { 119 result 120 .into_iter() 121 .filter(|packet| packet.timestamp > start_time) 122 .collect() 123 } else { 124 result 125 }; 126 127 result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); 128 result 129 } 130 } 131 132 #[cfg(test)] 133 mod tests { 134 use super::*; 135 136 #[test] test_add()137 fn test_add() { 138 let mut packet_rb = PacketRingBuffer::new( 139 /* max_size_in_bytes= */ 4 + PACKET_HEADER_SIZE_IN_BYTES, 140 ); 141 let buf: &[u8] = &[1, 2, 3, 4]; 142 let start_time = Duration::from_nanos(45); 143 144 assert_eq!(packet_rb.ring_buffer.len(), 0); 145 146 packet_rb 147 .add_packet(buf, start_time) 148 .expect("Failed to add packet."); 149 150 let packet = packet_rb.ring_buffer.pop_back().unwrap(); 151 assert_eq!(packet.buf, &[1, 2, 3, 4]); 152 assert_eq!(packet.timestamp.as_nanos(), 45); 153 assert_eq!(packet_rb.last_popped_packet_timestamp, None); 154 // Each packet has 16 bytes in it's PacketHeader 155 assert_eq!( 156 packet_rb.current_size_in_bytes, 157 4 + PACKET_HEADER_SIZE_IN_BYTES 158 ); 159 } 160 161 #[test] test_add_no_space()162 fn test_add_no_space() { 163 // Max size is 3 bytes of buffer data + PACKET_HEADER_SIZE_IN_BYTES 164 let mut packet_rb = PacketRingBuffer::new( 165 /* max_size_in_bytes= */ 3 + PACKET_HEADER_SIZE_IN_BYTES, 166 ); 167 let buf: &[u8] = &[1, 2, 3, 4]; 168 let start_time = Duration::from_nanos(45); 169 170 let res = packet_rb.add_packet(buf, start_time); 171 172 // Should error because rb size is 19 bytes, but packet will take 20 bytes (4 bytes in 173 // buffer + 16 bytes from Packet header) 174 assert!(res.is_err()); 175 176 assert_eq!( 177 res.unwrap_err(), 178 Error::PacketTooBigError { 179 rb_max_size_in_bytes: packet_rb.max_size_in_bytes, 180 packet_size_in_bytes: 4 + PACKET_HEADER_SIZE_IN_BYTES 181 } 182 ); 183 } 184 185 #[test] test_add_exceeds_size_pop_one()186 fn test_add_exceeds_size_pop_one() { 187 let mut packet_rb = PacketRingBuffer::new( 188 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 189 ); 190 191 packet_rb 192 .add_packet(&[1], Duration::from_nanos(1)) 193 .expect("Failed to add packet."); 194 packet_rb 195 .add_packet(&[2], Duration::from_nanos(2)) 196 .expect("Failed to add packet."); 197 packet_rb 198 .add_packet(&[3], Duration::from_nanos(3)) 199 .expect("Failed to add packet."); 200 packet_rb 201 .add_packet(&[4], Duration::from_nanos(4)) 202 .expect("Failed to add packet."); 203 204 assert_eq!(packet_rb.ring_buffer.len(), 3); 205 206 let packet1 = packet_rb.ring_buffer.pop_back().unwrap(); 207 let packet2 = packet_rb.ring_buffer.pop_back().unwrap(); 208 let packet3 = packet_rb.ring_buffer.pop_back().unwrap(); 209 210 assert_eq!(packet1.buf, &[2]); 211 assert_eq!(packet1.timestamp.as_nanos(), 2); 212 assert_eq!(packet2.buf, &[3]); 213 assert_eq!(packet2.timestamp.as_nanos(), 3); 214 assert_eq!(packet3.buf, &[4]); 215 assert_eq!(packet3.timestamp.as_nanos(), 4); 216 assert_eq!( 217 packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(), 218 1 219 ); 220 assert_eq!(packet_rb.current_size_in_bytes, 3 + 3 * 16); 221 } 222 223 #[test] test_add_exceeds_size_pop_multiple()224 fn test_add_exceeds_size_pop_multiple() { 225 let mut packet_rb = PacketRingBuffer::new( 226 /* max_size_in_bytes= */ 2 + PACKET_HEADER_SIZE_IN_BYTES, 227 ); 228 229 packet_rb 230 .add_packet(&[1], Duration::from_nanos(1)) 231 .expect("Failed to add packet."); 232 packet_rb 233 .add_packet(&[2], Duration::from_nanos(2)) 234 .expect("Failed to add packet."); 235 packet_rb 236 .add_packet(&[3, 4], Duration::from_nanos(3)) 237 .expect("Failed to add packet."); 238 packet_rb 239 .add_packet(&[5, 6], Duration::from_nanos(4)) 240 .expect("Failed to add packet."); 241 242 // The first 3 packets should've been popped 243 assert_eq!(packet_rb.ring_buffer.len(), 1); 244 245 let packet1 = packet_rb.ring_buffer.pop_back().unwrap(); 246 247 assert_eq!(packet1.buf, &[5, 6]); 248 assert_eq!(packet1.timestamp.as_nanos(), 4); 249 assert_eq!( 250 packet_rb.last_popped_packet_timestamp.unwrap().as_nanos(), 251 3 252 ); 253 assert_eq!( 254 packet_rb.current_size_in_bytes, 255 2 + PACKET_HEADER_SIZE_IN_BYTES 256 ); 257 } 258 259 #[test] test_aggregate_one_empty()260 fn test_aggregate_one_empty() { 261 let mut tx_packet_rb = PacketRingBuffer::new( 262 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 263 ); 264 265 let mut rx_packet_rb = PacketRingBuffer::new( 266 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 267 ); 268 rx_packet_rb 269 .add_packet(&[4], Duration::from_nanos(6)) 270 .expect("Failed to add packet."); 271 rx_packet_rb 272 .add_packet(&[5], Duration::from_nanos(10)) 273 .expect("Failed to add packet."); 274 275 let res = 276 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 277 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 278 279 assert_eq!(packet_data_list, [&[4], &[5]]); 280 } 281 282 #[test] test_aggregate_both_empty()283 fn test_aggregate_both_empty() { 284 let mut tx_packet_rb = PacketRingBuffer::new( 285 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 286 ); 287 288 let mut rx_packet_rb = PacketRingBuffer::new( 289 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 290 ); 291 292 let res = 293 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 294 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 295 296 assert!(packet_data_list.is_empty()); 297 } 298 299 #[test] test_aggregate_none_popped()300 fn test_aggregate_none_popped() { 301 let mut tx_packet_rb = PacketRingBuffer::new( 302 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 303 ); 304 tx_packet_rb 305 .add_packet(&[1], Duration::from_nanos(2)) 306 .expect("Failed to add packet."); 307 tx_packet_rb 308 .add_packet(&[2], Duration::from_nanos(8)) 309 .expect("Failed to add packet."); 310 tx_packet_rb 311 .add_packet(&[3], Duration::from_nanos(9)) 312 .expect("Failed to add packet."); 313 314 let mut rx_packet_rb = PacketRingBuffer::new( 315 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 316 ); 317 rx_packet_rb 318 .add_packet(&[4], Duration::from_nanos(6)) 319 .expect("Failed to add packet."); 320 rx_packet_rb 321 .add_packet(&[5], Duration::from_nanos(10)) 322 .expect("Failed to add packet."); 323 324 let res = 325 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 326 327 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 328 assert_eq!(packet_data_list, [&[1], &[4], &[2], &[3], &[5]]); 329 } 330 331 #[test] test_aggregate_with_one_ring_buffer_popped()332 fn test_aggregate_with_one_ring_buffer_popped() { 333 let mut tx_packet_rb = PacketRingBuffer::new( 334 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 335 ); 336 tx_packet_rb 337 .add_packet(&[1], Duration::from_nanos(2)) 338 .expect("Failed to add packet."); 339 tx_packet_rb 340 .add_packet(&[2], Duration::from_nanos(8)) 341 .expect("Failed to add packet."); 342 tx_packet_rb 343 .add_packet(&[3, 4], Duration::from_nanos(9)) 344 .expect("Failed to add packet."); 345 tx_packet_rb 346 .add_packet(&[5], Duration::from_nanos(14)) 347 .expect("Failed to add packet."); 348 349 let mut rx_packet_rb = PacketRingBuffer::new( 350 /* max_size_in_bytes= */ 2 + 2 * PACKET_HEADER_SIZE_IN_BYTES, 351 ); 352 rx_packet_rb 353 .add_packet(&[6], Duration::from_nanos(6)) 354 .expect("Failed to add packet."); 355 rx_packet_rb 356 .add_packet(&[7], Duration::from_nanos(10)) 357 .expect("Failed to add packet."); 358 359 let res = 360 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 361 362 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 363 assert_eq!(packet_data_list.len(), 3); 364 assert_eq!(packet_data_list[0], &[3, 4]); 365 assert_eq!(packet_data_list[1], &[7]); 366 assert_eq!(packet_data_list[2], &[5]); 367 } 368 369 #[test] test_aggregate_with_both_ring_buffers_popped()370 fn test_aggregate_with_both_ring_buffers_popped() { 371 let mut tx_packet_rb = PacketRingBuffer::new( 372 /* max_size_in_bytes= */ 3 + 3 * PACKET_HEADER_SIZE_IN_BYTES, 373 ); 374 tx_packet_rb 375 .add_packet(&[1], Duration::from_nanos(2)) 376 .expect("Failed to add packet."); 377 tx_packet_rb 378 .add_packet(&[2], Duration::from_nanos(8)) 379 .expect("Failed to add packet."); 380 tx_packet_rb 381 .add_packet(&[3, 4], Duration::from_nanos(9)) 382 .expect("Failed to add packet."); 383 tx_packet_rb 384 .add_packet(&[5], Duration::from_nanos(15)) 385 .expect("Failed to add packet."); 386 387 let mut rx_packet_rb = PacketRingBuffer::new( 388 /* max_size_in_bytes= */ 5 + 2 * PACKET_HEADER_SIZE_IN_BYTES, 389 ); 390 rx_packet_rb 391 .add_packet(&[6], Duration::from_nanos(6)) 392 .expect("Failed to add packet."); 393 rx_packet_rb 394 .add_packet(&[7, 8], Duration::from_nanos(10)) 395 .expect("Failed to add packet."); 396 rx_packet_rb 397 .add_packet(&[9], Duration::from_nanos(12)) 398 .expect("Failed to add packet."); 399 rx_packet_rb 400 .add_packet(&[10, 11, 12], Duration::from_nanos(13)) 401 .expect("Failed to add packet."); 402 rx_packet_rb 403 .add_packet(&[13, 14], Duration::from_nanos(16)) 404 .expect("Failed to add packet."); 405 406 let res = 407 PacketRingBuffer::pop_ring_buffers_and_aggregate(&mut tx_packet_rb, &mut rx_packet_rb); 408 409 let packet_data_list: Vec<&[u8]> = res.iter().map(|packet| packet.buf.as_ref()).collect(); 410 assert_eq!(packet_data_list.len(), 3); 411 assert_eq!(packet_data_list[0], &[10, 11, 12]); 412 assert_eq!(packet_data_list[1], &[5]); 413 assert_eq!(packet_data_list[2], &[13, 14]); 414 } 415 } 416