//! High-level interface for a [V4L2 video //! encoder](https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/dev-encoder.html). use crate::{ device::{ poller::{DeviceEvent, PollError, PollEvent, Poller, Waker}, queue::{ direction::{Capture, Output}, dqbuf::DqBuffer, handles_provider::HandlesProvider, qbuf::{ get_free::{GetFreeBufferError, GetFreeCaptureBuffer, GetFreeOutputBuffer}, get_indexed::GetCaptureBufferByIndex, CaptureQueueable, OutputQueueableProvider, }, BuffersAllocated, CanceledBuffer, CreateQueueError, FormatBuilder, Queue, QueueInit, RequestBuffersError, }, AllocatedQueue, Device, DeviceConfig, DeviceOpenError, Stream, TryDequeue, }, ioctl::{ self, DqBufError, DqBufIoctlError, EncoderCommand, FormatFlags, GFmtError, V4l2BufferFromError, }, memory::{BufferHandles, PrimitiveBufferHandles}, Format, }; use log::warn; use std::{ any::Any, io, path::Path, sync::{atomic::AtomicUsize, Arc}, task::Wake, thread::JoinHandle, }; use thiserror::Error; /// Trait implemented by all states of the encoder. pub trait EncoderState {} pub struct Encoder { // Make sure to keep the device alive as long as we are. device: Arc, state: S, } pub struct AwaitingCaptureFormat { output_queue: Queue, capture_queue: Queue, } impl EncoderState for AwaitingCaptureFormat {} #[derive(Debug, Error)] pub enum EncoderOpenError { #[error("error while opening device")] DeviceOpenError(#[from] DeviceOpenError), #[error("error while creating queue")] CreateQueueError(#[from] CreateQueueError), #[error("specified device is not an encoder")] NotAnEncoder, } impl Encoder { pub fn open(path: &Path) -> Result { let config = DeviceConfig::new().non_blocking_dqbuf(); let device = Arc::new(Device::open(path, config)?); // Check that the device is indeed an encoder. let capture_queue = Queue::get_capture_mplane_queue(device.clone())?; let output_queue = Queue::get_output_mplane_queue(device.clone())?; // On an encoder, the OUTPUT formats are not compressed, but the CAPTURE ones are. // Return an error if our device does not satisfy these conditions. output_queue .format_iter() .find(|fmt| !fmt.flags.contains(FormatFlags::COMPRESSED)) .and( capture_queue .format_iter() .find(|fmt| fmt.flags.contains(FormatFlags::COMPRESSED)), ) .ok_or(EncoderOpenError::NotAnEncoder) .map(|_| ())?; Ok(Encoder { device, state: AwaitingCaptureFormat { output_queue, capture_queue, }, }) } pub fn set_capture_format(mut self, f: F) -> anyhow::Result> where F: FnOnce(FormatBuilder) -> anyhow::Result<()>, { let builder = self.state.capture_queue.change_format()?; f(builder)?; Ok(Encoder { device: self.device, state: AwaitingOutputFormat { output_queue: self.state.output_queue, capture_queue: self.state.capture_queue, }, }) } } pub struct AwaitingOutputFormat { output_queue: Queue, capture_queue: Queue, } impl EncoderState for AwaitingOutputFormat {} impl Encoder { pub fn set_output_format(mut self, f: F) -> anyhow::Result> where F: FnOnce(FormatBuilder) -> anyhow::Result<()>, { let builder = self.state.output_queue.change_format()?; f(builder)?; Ok(Encoder { device: self.device, state: AwaitingOutputBuffers { output_queue: self.state.output_queue, capture_queue: self.state.capture_queue, }, }) } } pub struct AwaitingOutputBuffers { output_queue: Queue, capture_queue: Queue, } impl EncoderState for AwaitingOutputBuffers {} impl Encoder { pub fn allocate_output_buffers_generic( self, memory_type: OP::SupportedMemoryType, num_output: usize, ) -> Result>, RequestBuffersError> { Ok(Encoder { device: self.device, state: AwaitingCaptureBuffers { output_queue: self .state .output_queue .request_buffers_generic::(memory_type, num_output as u32)?, capture_queue: self.state.capture_queue, }, }) } pub fn allocate_output_buffers( self, num_output: usize, ) -> Result>, RequestBuffersError> { self.allocate_output_buffers_generic(OP::MEMORY_TYPE, num_output) } pub fn get_output_format(&self) -> Result { self.state.output_queue.get_format() } pub fn get_capture_format(&self) -> Result { self.state.capture_queue.get_format() } } pub struct AwaitingCaptureBuffers { output_queue: Queue>, capture_queue: Queue, } impl EncoderState for AwaitingCaptureBuffers {} impl Encoder> { pub fn allocate_capture_buffers_generic( self, memory_type: ::SupportedMemoryType, num_capture: usize, capture_memory_provider: P, ) -> Result>, RequestBuffersError> where for<'a> Queue>: GetFreeCaptureBuffer<'a, P::HandleType>, { Ok(Encoder { device: self.device, state: ReadyToEncode { output_queue: self.state.output_queue, capture_queue: self .state .capture_queue .request_buffers_generic::(memory_type, num_capture as u32)?, capture_memory_provider, poll_wakeups_counter: None, }, }) } pub fn allocate_capture_buffers( self, num_capture: usize, capture_memory_provider: P, ) -> Result>, RequestBuffersError> where P::HandleType: PrimitiveBufferHandles, for<'a> Queue>: GetFreeCaptureBuffer<'a, P::HandleType>, { self.allocate_capture_buffers_generic( P::HandleType::MEMORY_TYPE, num_capture, capture_memory_provider, ) } } pub struct ReadyToEncode { output_queue: Queue>, capture_queue: Queue>, capture_memory_provider: P, poll_wakeups_counter: Option>, } impl EncoderState for ReadyToEncode {} impl Encoder> where for<'a> Queue>: GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>, { pub fn set_poll_counter(mut self, poll_wakeups_counter: Arc) -> Self { self.state.poll_wakeups_counter = Some(poll_wakeups_counter); self } pub fn start( self, input_done_cb: InputDoneCb, output_ready_cb: OutputReadyCb, ) -> io::Result>> where InputDoneCb: Fn(CompletedOutputBuffer), OutputReadyCb: FnMut(DqBuffer) + Send + 'static, { self.state.output_queue.stream_on().unwrap(); self.state.capture_queue.stream_on().unwrap(); let mut output_poller = Poller::new(Arc::clone(&self.device))?; output_poller.enable_event(DeviceEvent::OutputReady)?; let mut encoder_thread = EncoderThread::new( &self.device, self.state.capture_queue, self.state.capture_memory_provider, output_ready_cb, )?; if let Some(counter) = &self.state.poll_wakeups_counter { output_poller.set_poll_counter(Arc::clone(counter)); encoder_thread.set_poll_counter(Arc::clone(counter)); } let handle = std::thread::Builder::new() .name("V4L2 Encoder".into()) .spawn(move || encoder_thread.run())?; Ok(Encoder { device: self.device, state: Encoding { output_queue: self.state.output_queue, input_done_cb, output_poller, handle, }, }) } } pub struct Encoding where P: HandlesProvider, InputDoneCb: Fn(CompletedOutputBuffer), OutputReadyCb: FnMut(DqBuffer) + Send, { output_queue: Queue>, input_done_cb: InputDoneCb, output_poller: Poller, handle: JoinHandle>, } impl EncoderState for Encoding where OP: BufferHandles, P: HandlesProvider, InputDoneCb: Fn(CompletedOutputBuffer), OutputReadyCb: FnMut(DqBuffer) + Send, { } // Safe because all Rcs are internal and never leaked outside of the struct. unsafe impl Send for Encoder {} pub enum CompletedOutputBuffer { Dequeued(DqBuffer), Canceled(CanceledBuffer), } #[derive(Debug, Error)] pub enum GetBufferError { #[error("error while dequeueing buffer")] DequeueError(#[from] DqBufError), #[error("error during poll")] PollError(#[from] PollError), #[error("error while obtaining buffer")] GetFreeBufferError(#[from] GetFreeBufferError), } #[derive(Debug, Error)] pub enum EncoderStopError { #[error("error while sending STOP command")] EncoderCmdError(#[from] ioctl::EncoderCmdError), #[error("thread has panicked")] ThreadPanickedError(Box), #[error("cannot streamoff capture queue")] CaptureQueueStreamoffError(ioctl::StreamOffError), #[error("cannot streamoff output queue")] OutputQueueStreamoffError(ioctl::StreamOffError), } impl Encoder> where OP: BufferHandles, P: HandlesProvider, InputDoneCb: Fn(CompletedOutputBuffer), OutputReadyCb: FnMut(DqBuffer) + Send, { /// Stop the encoder, and returns the encoder ready to be started again. pub fn stop(self) -> Result>, EncoderStopError> { ioctl::encoder_cmd::<_, ()>(&*self.device, &EncoderCommand::Stop(false))?; // The encoder thread should receive the LAST buffer and exit on its own. let encoding_thread = self .state .handle .join() .map_err(EncoderStopError::ThreadPanickedError)?; encoding_thread .capture_queue .stream_off() .map_err(EncoderStopError::CaptureQueueStreamoffError)?; /* Return all canceled buffers to the client */ let canceled_buffers = self .state .output_queue .stream_off() .map_err(EncoderStopError::OutputQueueStreamoffError)?; for buffer in canceled_buffers { (self.state.input_done_cb)(CompletedOutputBuffer::Canceled(buffer)); } Ok(Encoder { device: self.device, state: ReadyToEncode { output_queue: self.state.output_queue, capture_queue: encoding_thread.capture_queue, capture_memory_provider: encoding_thread.capture_memory_provider, poll_wakeups_counter: None, }, }) } /// Attempts to dequeue and release output buffers that the driver is done with. fn dequeue_output_buffers(&self) -> Result<(), DqBufError> { let output_queue = &self.state.output_queue; while output_queue.num_queued_buffers() > 0 { match output_queue.try_dequeue() { Ok(buf) => { (self.state.input_done_cb)(CompletedOutputBuffer::Dequeued(buf)); } Err(DqBufError::IoctlError(DqBufIoctlError::NotReady)) => break, // TODO buffers with the error flag set should not result in // a fatal error! Err(e) => return Err(e), } } Ok(()) } // Make this thread sleep until at least one OUTPUT buffer is ready to be // obtained through `try_get_buffer()`, dequeuing buffers if necessary. fn wait_for_output_buffer(&mut self) -> Result<(), GetBufferError> { for event in self.state.output_poller.poll(None)? { match event { PollEvent::Device(DeviceEvent::OutputReady) => { self.dequeue_output_buffers()?; } _ => panic!("Unexpected return from OUTPUT queue poll!"), } } Ok(()) } } impl<'a, OP, P, InputDoneCb, OutputReadyCb> OutputQueueableProvider<'a, OP> for Encoder> where Queue>: OutputQueueableProvider<'a, OP>, OP: BufferHandles, P: HandlesProvider, InputDoneCb: Fn(CompletedOutputBuffer), OutputReadyCb: FnMut(DqBuffer) + Send, { type Queueable = > as OutputQueueableProvider<'a, OP>>::Queueable; } /// Let the encoder provide the buffers from the OUTPUT queue. impl<'a, OP, P, InputDoneCb, OutputReadyCb> GetFreeOutputBuffer<'a, OP, GetBufferError> for Encoder> where Queue>: GetFreeOutputBuffer<'a, OP>, OP: BufferHandles, P: HandlesProvider, InputDoneCb: Fn(CompletedOutputBuffer), OutputReadyCb: FnMut(DqBuffer) + Send, { /// Returns a V4L2 buffer to be filled with a frame to encode if one /// is available. /// /// This method will return None immediately if all the allocated buffers /// are currently queued. fn try_get_free_buffer(&'a self) -> Result { self.dequeue_output_buffers()?; Ok(self.state.output_queue.try_get_free_buffer()?) } } // If `GetFreeBuffer` is implemented, we can also provide a blocking `get_buffer` // method. impl<'a, OP, P, InputDoneCb, OutputReadyCb> Encoder> where Self: GetFreeOutputBuffer<'a, OP, GetBufferError>, OP: BufferHandles, P: HandlesProvider, InputDoneCb: Fn(CompletedOutputBuffer), OutputReadyCb: FnMut(DqBuffer) + Send, { /// Returns a V4L2 buffer to be filled with a frame to encode, waiting for /// one to be available if needed. /// /// Contrary to `try_get_free_buffer(), this method will wait for a buffer /// to be available if needed. pub fn get_buffer( &'a mut self, ) -> Result<>::Queueable, GetBufferError> { let output_queue = &self.state.output_queue; // If all our buffers are queued, wait until we can dequeue some. if output_queue.num_queued_buffers() == output_queue.num_buffers() { self.wait_for_output_buffer()?; } self.try_get_free_buffer() } } struct EncoderThread where P: HandlesProvider, OutputReadyCb: FnMut(DqBuffer) + Send, { capture_queue: Queue>, capture_memory_provider: P, poller: Poller, waker: Arc, output_ready_cb: OutputReadyCb, } impl EncoderThread where P: HandlesProvider, OutputReadyCb: FnMut(DqBuffer) + Send, for<'a> Queue>: GetFreeCaptureBuffer<'a, P::HandleType> + GetCaptureBufferByIndex<'a, P::HandleType>, { fn new( device: &Arc, capture_queue: Queue>, capture_memory_provider: P, output_ready_cb: OutputReadyCb, ) -> io::Result { let mut poller = Poller::new(Arc::clone(device))?; poller.enable_event(DeviceEvent::CaptureReady)?; let waker = poller.add_waker(0)?; Ok(EncoderThread { capture_queue, capture_memory_provider, poller, waker, output_ready_cb, }) } fn set_poll_counter(&mut self, poll_wakeups_counter: Arc) { self.poller.set_poll_counter(poll_wakeups_counter); } fn run(mut self) -> Self { self.enqueue_capture_buffers(); 'polling: loop { match self.capture_queue.num_queued_buffers() { // If there are no buffers on the CAPTURE queue, poll() will return // immediately with EPOLLERR and we would loop indefinitely. // Prevent this by temporarily disabling polling the device in such // cases. 0 => { self.poller .disable_event(DeviceEvent::CaptureReady) .unwrap(); } // If device polling was disabled and we have buffers queued, we // can reenable it as poll will now wait for a CAPTURE buffer to // be ready for dequeue. _ => { self.poller.enable_event(DeviceEvent::CaptureReady).unwrap(); } } // TODO handle errors - this system call can be interrupted and we // should leave in this case. for event in self.poller.poll(None).unwrap() { match event { // A CAPTURE buffer has been released by the client. PollEvent::Waker(0) => { // Requeue all available CAPTURE buffers. self.enqueue_capture_buffers(); } // A CAPTURE buffer is ready to be dequeued. PollEvent::Device(DeviceEvent::CaptureReady) => { // Get the encoded buffer // TODO Manage errors here, including corrupted buffers! if let Ok(mut cap_buf) = self.capture_queue.try_dequeue() { let is_last = cap_buf.data.is_last(); let is_empty = *cap_buf.data.get_first_plane().bytesused == 0; // Add a drop callback to the dequeued buffer so we // re-queue it as soon as it is dropped. let cap_waker = Arc::clone(&self.waker); cap_buf.add_drop_callback(move |_dqbuf| { cap_waker.wake(); }); // Empty buffers do not need to be passed to the client. if !is_empty { (self.output_ready_cb)(cap_buf); } // Last buffer of the stream? Time for us to terminate. if is_last { break 'polling; } } else { // TODO we should not crash here. panic!("Expected a CAPTURE buffer but none available!"); } } _ => panic!("Unexpected return from CAPTURE queue poll!"), } } } self } fn enqueue_capture_buffers(&mut self) { 'enqueue: while let Some(handles) = self.capture_memory_provider.get_handles(&self.waker) { if let Ok(buffer) = self .capture_memory_provider .get_suitable_buffer_for(&handles, &self.capture_queue) { buffer.queue_with_handles(handles).unwrap(); } else { warn!("Handles potentially lost due to no V4L2 buffer being available"); break 'enqueue; } } } }