1 // Copyright 2024 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::{ 6 collections::VecDeque, 7 os::fd::{AsFd, BorrowedFd}, 8 }; 9 10 use nix::sys::eventfd::EventFd; 11 12 /// Manages a pollable queue of events to be sent to the decoder or encoder. 13 pub struct EventQueue<T> { 14 /// Pipe used to signal available events. 15 event: EventFd, 16 /// FIFO of all pending events. 17 pending_events: VecDeque<T>, 18 } 19 20 impl<T> EventQueue<T> { 21 /// Create a new event queue. new() -> nix::Result<Self>22 pub fn new() -> nix::Result<Self> { 23 EventFd::new().map(|event| Self { 24 event, 25 pending_events: Default::default(), 26 }) 27 } 28 29 /// Add `event` to the queue. 30 /// 31 /// Returns an error if the poll FD could not be signaled. queue_event(&mut self, event: T) -> nix::Result<()>32 pub fn queue_event(&mut self, event: T) -> nix::Result<()> { 33 self.pending_events.push_back(event); 34 if self.pending_events.len() == 1 { 35 let _ = self.event.write(1)?; 36 } 37 38 Ok(()) 39 } 40 41 /// Read and return the next event, if any. dequeue_event(&mut self) -> Option<T>42 pub fn dequeue_event(&mut self) -> Option<T> { 43 let event = self.pending_events.pop_front(); 44 45 if event.is_some() && self.pending_events.is_empty() { 46 let _ = self 47 .event 48 .read() 49 .map_err(|e| log::error!("error while reading event queue fd: {:#}", e)); 50 } 51 52 event 53 } 54 55 /// Remove all the posted events for which `predicate` returns `false`. retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)56 pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) { 57 let was_empty = self.pending_events.is_empty(); 58 59 self.pending_events.retain(predicate); 60 61 if !was_empty && self.pending_events.is_empty() { 62 let _ = self 63 .event 64 .read() 65 .map_err(|e| log::error!("error while reading event queue fd: {:#}", e)); 66 } 67 } 68 69 /// Returns the number of events currently pending on this queue, i.e. the number of times 70 /// `dequeue_event` can be called without blocking. 71 #[cfg(test)] len(&self) -> usize72 pub fn len(&self) -> usize { 73 self.pending_events.len() 74 } 75 } 76 77 impl<T> AsFd for EventQueue<T> { as_fd(&self) -> BorrowedFd78 fn as_fd(&self) -> BorrowedFd { 79 self.event.as_fd() 80 } 81 } 82 83 #[cfg(test)] 84 mod tests { 85 use nix::sys::epoll::*; 86 use virtio_media::devices::video_decoder::VideoDecoderBackendEvent; 87 use virtio_media::v4l2r::bindings; 88 89 use super::*; 90 91 /// Test basic queue/dequeue functionality of `EventQueue`. 92 #[test] event_queue()93 fn event_queue() { 94 let mut event_queue = EventQueue::new().unwrap(); 95 96 event_queue 97 .queue_event(VideoDecoderBackendEvent::InputBufferDone(1)) 98 .unwrap(); 99 assert_eq!(event_queue.len(), 1); 100 event_queue 101 .queue_event(VideoDecoderBackendEvent::FrameCompleted { 102 buffer_id: 1, 103 timestamp: bindings::timeval { 104 tv_sec: 10, 105 tv_usec: 42, 106 }, 107 bytes_used: vec![1024], 108 is_last: false, 109 }) 110 .unwrap(); 111 assert_eq!(event_queue.len(), 2); 112 113 assert!(matches!( 114 event_queue.dequeue_event(), 115 Some(VideoDecoderBackendEvent::InputBufferDone(1)) 116 )); 117 assert_eq!(event_queue.len(), 1); 118 assert_eq!( 119 event_queue.dequeue_event(), 120 Some(VideoDecoderBackendEvent::FrameCompleted { 121 buffer_id: 1, 122 timestamp: bindings::timeval { 123 tv_sec: 10, 124 tv_usec: 42 125 }, 126 bytes_used: vec![1024], 127 is_last: false, 128 }) 129 ); 130 assert_eq!(event_queue.len(), 0); 131 } 132 133 /// Test polling of `TestEventQueue`'s `event_pipe`. 134 #[test] decoder_event_queue_polling()135 fn decoder_event_queue_polling() { 136 let mut event_queue = EventQueue::new().unwrap(); 137 let epoll = Epoll::new(EpollCreateFlags::empty()).unwrap(); 138 epoll 139 .add(event_queue.as_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 1)) 140 .unwrap(); 141 142 // The queue is empty, so `event_pipe` should not signal. 143 let mut events = [EpollEvent::empty()]; 144 let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(); 145 assert_eq!(nb_fds, 0); 146 assert_eq!(event_queue.dequeue_event(), None); 147 148 // `event_pipe` should signal as long as the queue is not empty. 149 event_queue 150 .queue_event(VideoDecoderBackendEvent::InputBufferDone(1)) 151 .unwrap(); 152 assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 153 event_queue 154 .queue_event(VideoDecoderBackendEvent::InputBufferDone(2)) 155 .unwrap(); 156 assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 157 event_queue 158 .queue_event(VideoDecoderBackendEvent::InputBufferDone(3)) 159 .unwrap(); 160 assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 161 162 assert_eq!( 163 event_queue.dequeue_event(), 164 Some(VideoDecoderBackendEvent::InputBufferDone(1)) 165 ); 166 assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 167 assert_eq!( 168 event_queue.dequeue_event(), 169 Some(VideoDecoderBackendEvent::InputBufferDone(2)) 170 ); 171 assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1); 172 assert_eq!( 173 event_queue.dequeue_event(), 174 Some(VideoDecoderBackendEvent::InputBufferDone(3)) 175 ); 176 177 // The queue is empty again, so `event_pipe` should not signal. 178 assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 0); 179 assert_eq!(event_queue.dequeue_event(), None); 180 } 181 } 182