xref: /aosp_15_r20/external/virtio-media/extras/ffmpeg-decoder/src/event_queue.rs (revision 1b4853f54772485c5dd4001ae33a7a958bcc97a1)
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::{
6     collections::VecDeque,
7     os::fd::{AsFd, BorrowedFd},
8 };
9 
10 use nix::sys::eventfd::EventFd;
11 
12 /// Manages a pollable queue of events to be sent to the decoder or encoder.
13 pub struct EventQueue<T> {
14     /// Pipe used to signal available events.
15     event: EventFd,
16     /// FIFO of all pending events.
17     pending_events: VecDeque<T>,
18 }
19 
20 impl<T> EventQueue<T> {
21     /// Create a new event queue.
new() -> nix::Result<Self>22     pub fn new() -> nix::Result<Self> {
23         EventFd::new().map(|event| Self {
24             event,
25             pending_events: Default::default(),
26         })
27     }
28 
29     /// Add `event` to the queue.
30     ///
31     /// Returns an error if the poll FD could not be signaled.
queue_event(&mut self, event: T) -> nix::Result<()>32     pub fn queue_event(&mut self, event: T) -> nix::Result<()> {
33         self.pending_events.push_back(event);
34         if self.pending_events.len() == 1 {
35             let _ = self.event.write(1)?;
36         }
37 
38         Ok(())
39     }
40 
41     /// Read and return the next event, if any.
dequeue_event(&mut self) -> Option<T>42     pub fn dequeue_event(&mut self) -> Option<T> {
43         let event = self.pending_events.pop_front();
44 
45         if event.is_some() && self.pending_events.is_empty() {
46             let _ = self
47                 .event
48                 .read()
49                 .map_err(|e| log::error!("error while reading event queue fd: {:#}", e));
50         }
51 
52         event
53     }
54 
55     /// Remove all the posted events for which `predicate` returns `false`.
retain<P: FnMut(&T) -> bool>(&mut self, predicate: P)56     pub fn retain<P: FnMut(&T) -> bool>(&mut self, predicate: P) {
57         let was_empty = self.pending_events.is_empty();
58 
59         self.pending_events.retain(predicate);
60 
61         if !was_empty && self.pending_events.is_empty() {
62             let _ = self
63                 .event
64                 .read()
65                 .map_err(|e| log::error!("error while reading event queue fd: {:#}", e));
66         }
67     }
68 
69     /// Returns the number of events currently pending on this queue, i.e. the number of times
70     /// `dequeue_event` can be called without blocking.
71     #[cfg(test)]
len(&self) -> usize72     pub fn len(&self) -> usize {
73         self.pending_events.len()
74     }
75 }
76 
77 impl<T> AsFd for EventQueue<T> {
as_fd(&self) -> BorrowedFd78     fn as_fd(&self) -> BorrowedFd {
79         self.event.as_fd()
80     }
81 }
82 
83 #[cfg(test)]
84 mod tests {
85     use nix::sys::epoll::*;
86     use virtio_media::devices::video_decoder::VideoDecoderBackendEvent;
87     use virtio_media::v4l2r::bindings;
88 
89     use super::*;
90 
91     /// Test basic queue/dequeue functionality of `EventQueue`.
92     #[test]
event_queue()93     fn event_queue() {
94         let mut event_queue = EventQueue::new().unwrap();
95 
96         event_queue
97             .queue_event(VideoDecoderBackendEvent::InputBufferDone(1))
98             .unwrap();
99         assert_eq!(event_queue.len(), 1);
100         event_queue
101             .queue_event(VideoDecoderBackendEvent::FrameCompleted {
102                 buffer_id: 1,
103                 timestamp: bindings::timeval {
104                     tv_sec: 10,
105                     tv_usec: 42,
106                 },
107                 bytes_used: vec![1024],
108                 is_last: false,
109             })
110             .unwrap();
111         assert_eq!(event_queue.len(), 2);
112 
113         assert!(matches!(
114             event_queue.dequeue_event(),
115             Some(VideoDecoderBackendEvent::InputBufferDone(1))
116         ));
117         assert_eq!(event_queue.len(), 1);
118         assert_eq!(
119             event_queue.dequeue_event(),
120             Some(VideoDecoderBackendEvent::FrameCompleted {
121                 buffer_id: 1,
122                 timestamp: bindings::timeval {
123                     tv_sec: 10,
124                     tv_usec: 42
125                 },
126                 bytes_used: vec![1024],
127                 is_last: false,
128             })
129         );
130         assert_eq!(event_queue.len(), 0);
131     }
132 
133     /// Test polling of `TestEventQueue`'s `event_pipe`.
134     #[test]
decoder_event_queue_polling()135     fn decoder_event_queue_polling() {
136         let mut event_queue = EventQueue::new().unwrap();
137         let epoll = Epoll::new(EpollCreateFlags::empty()).unwrap();
138         epoll
139             .add(event_queue.as_fd(), EpollEvent::new(EpollFlags::EPOLLIN, 1))
140             .unwrap();
141 
142         // The queue is empty, so `event_pipe` should not signal.
143         let mut events = [EpollEvent::empty()];
144         let nb_fds = epoll.wait(&mut events, EpollTimeout::ZERO).unwrap();
145         assert_eq!(nb_fds, 0);
146         assert_eq!(event_queue.dequeue_event(), None);
147 
148         // `event_pipe` should signal as long as the queue is not empty.
149         event_queue
150             .queue_event(VideoDecoderBackendEvent::InputBufferDone(1))
151             .unwrap();
152         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
153         event_queue
154             .queue_event(VideoDecoderBackendEvent::InputBufferDone(2))
155             .unwrap();
156         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
157         event_queue
158             .queue_event(VideoDecoderBackendEvent::InputBufferDone(3))
159             .unwrap();
160         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
161 
162         assert_eq!(
163             event_queue.dequeue_event(),
164             Some(VideoDecoderBackendEvent::InputBufferDone(1))
165         );
166         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
167         assert_eq!(
168             event_queue.dequeue_event(),
169             Some(VideoDecoderBackendEvent::InputBufferDone(2))
170         );
171         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 1);
172         assert_eq!(
173             event_queue.dequeue_event(),
174             Some(VideoDecoderBackendEvent::InputBufferDone(3))
175         );
176 
177         // The queue is empty again, so `event_pipe` should not signal.
178         assert_eq!(epoll.wait(&mut events, EpollTimeout::ZERO).unwrap(), 0);
179         assert_eq!(event_queue.dequeue_event(), None);
180     }
181 }
182