xref: /aosp_15_r20/external/crosvm/common/audio_streams/src/capture.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1*bb4ee6a4SAndroid Build Coastguard Worker // Copyright 2019 The ChromiumOS Authors
2*bb4ee6a4SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*bb4ee6a4SAndroid Build Coastguard Worker // found in the LICENSE file.
4*bb4ee6a4SAndroid Build Coastguard Worker 
5*bb4ee6a4SAndroid Build Coastguard Worker //! ```
6*bb4ee6a4SAndroid Build Coastguard Worker //! use audio_streams::{BoxError, capture::CaptureBuffer, SampleFormat, StreamSource,
7*bb4ee6a4SAndroid Build Coastguard Worker //!     NoopStreamSource};
8*bb4ee6a4SAndroid Build Coastguard Worker //! use std::io::Read;
9*bb4ee6a4SAndroid Build Coastguard Worker //!
10*bb4ee6a4SAndroid Build Coastguard Worker //! const buffer_size: usize = 120;
11*bb4ee6a4SAndroid Build Coastguard Worker //! const num_channels: usize = 2;
12*bb4ee6a4SAndroid Build Coastguard Worker //!
13*bb4ee6a4SAndroid Build Coastguard Worker //! # fn main() -> std::result::Result<(),BoxError> {
14*bb4ee6a4SAndroid Build Coastguard Worker //! let mut stream_source = NoopStreamSource::new();
15*bb4ee6a4SAndroid Build Coastguard Worker //! let sample_format = SampleFormat::S16LE;
16*bb4ee6a4SAndroid Build Coastguard Worker //! let frame_size = num_channels * sample_format.sample_bytes();
17*bb4ee6a4SAndroid Build Coastguard Worker //!
18*bb4ee6a4SAndroid Build Coastguard Worker //! let (_, mut stream) = stream_source
19*bb4ee6a4SAndroid Build Coastguard Worker //!     .new_capture_stream(num_channels, sample_format, 48000, buffer_size, &[])?;
20*bb4ee6a4SAndroid Build Coastguard Worker //! // Capture 10 buffers of zeros.
21*bb4ee6a4SAndroid Build Coastguard Worker //! let mut buf = Vec::new();
22*bb4ee6a4SAndroid Build Coastguard Worker //! buf.resize(buffer_size * frame_size, 0xa5u8);
23*bb4ee6a4SAndroid Build Coastguard Worker //! for _ in 0..10 {
24*bb4ee6a4SAndroid Build Coastguard Worker //!     let mut copy_func = |stream_buffer: &mut CaptureBuffer| {
25*bb4ee6a4SAndroid Build Coastguard Worker //!         assert_eq!(stream_buffer.read(&mut buf)?, buffer_size * frame_size);
26*bb4ee6a4SAndroid Build Coastguard Worker //!         Ok(())
27*bb4ee6a4SAndroid Build Coastguard Worker //!     };
28*bb4ee6a4SAndroid Build Coastguard Worker //!     stream.read_capture_buffer(&mut copy_func)?;
29*bb4ee6a4SAndroid Build Coastguard Worker //! }
30*bb4ee6a4SAndroid Build Coastguard Worker //! # Ok (())
31*bb4ee6a4SAndroid Build Coastguard Worker //! # }
32*bb4ee6a4SAndroid Build Coastguard Worker //! ```
33*bb4ee6a4SAndroid Build Coastguard Worker 
34*bb4ee6a4SAndroid Build Coastguard Worker use std::io;
35*bb4ee6a4SAndroid Build Coastguard Worker use std::io::Read;
36*bb4ee6a4SAndroid Build Coastguard Worker use std::io::Write;
37*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Duration;
38*bb4ee6a4SAndroid Build Coastguard Worker use std::time::Instant;
39*bb4ee6a4SAndroid Build Coastguard Worker 
40*bb4ee6a4SAndroid Build Coastguard Worker use async_trait::async_trait;
41*bb4ee6a4SAndroid Build Coastguard Worker use remain::sorted;
42*bb4ee6a4SAndroid Build Coastguard Worker use thiserror::Error;
43*bb4ee6a4SAndroid Build Coastguard Worker 
44*bb4ee6a4SAndroid Build Coastguard Worker use super::async_api::AudioStreamsExecutor;
45*bb4ee6a4SAndroid Build Coastguard Worker use super::AsyncBufferCommit;
46*bb4ee6a4SAndroid Build Coastguard Worker use super::AudioBuffer;
47*bb4ee6a4SAndroid Build Coastguard Worker use super::BoxError;
48*bb4ee6a4SAndroid Build Coastguard Worker use super::BufferCommit;
49*bb4ee6a4SAndroid Build Coastguard Worker use super::NoopBufferCommit;
50*bb4ee6a4SAndroid Build Coastguard Worker use super::SampleFormat;
51*bb4ee6a4SAndroid Build Coastguard Worker 
52*bb4ee6a4SAndroid Build Coastguard Worker /// `CaptureBufferStream` provides `CaptureBuffer`s to read with audio samples from capture.
53*bb4ee6a4SAndroid Build Coastguard Worker pub trait CaptureBufferStream: Send {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>54*bb4ee6a4SAndroid Build Coastguard Worker     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>;
55*bb4ee6a4SAndroid Build Coastguard Worker 
56*bb4ee6a4SAndroid Build Coastguard Worker     /// Call `f` with a `CaptureBuffer`, and trigger the buffer done call back after. `f` can read
57*bb4ee6a4SAndroid Build Coastguard Worker     /// the capture data from the given `CaptureBuffer`.
read_capture_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>58*bb4ee6a4SAndroid Build Coastguard Worker     fn read_capture_buffer<'b, 's: 'b>(
59*bb4ee6a4SAndroid Build Coastguard Worker         &'s mut self,
60*bb4ee6a4SAndroid Build Coastguard Worker         f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>,
61*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Result<(), BoxError> {
62*bb4ee6a4SAndroid Build Coastguard Worker         let mut buf = self.next_capture_buffer()?;
63*bb4ee6a4SAndroid Build Coastguard Worker         f(&mut buf)?;
64*bb4ee6a4SAndroid Build Coastguard Worker         buf.commit();
65*bb4ee6a4SAndroid Build Coastguard Worker         Ok(())
66*bb4ee6a4SAndroid Build Coastguard Worker     }
67*bb4ee6a4SAndroid Build Coastguard Worker }
68*bb4ee6a4SAndroid Build Coastguard Worker 
69*bb4ee6a4SAndroid Build Coastguard Worker impl<S: CaptureBufferStream + ?Sized> CaptureBufferStream for &mut S {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>70*bb4ee6a4SAndroid Build Coastguard Worker     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
71*bb4ee6a4SAndroid Build Coastguard Worker         (**self).next_capture_buffer()
72*bb4ee6a4SAndroid Build Coastguard Worker     }
73*bb4ee6a4SAndroid Build Coastguard Worker }
74*bb4ee6a4SAndroid Build Coastguard Worker 
75*bb4ee6a4SAndroid Build Coastguard Worker #[async_trait(?Send)]
76*bb4ee6a4SAndroid Build Coastguard Worker pub trait AsyncCaptureBufferStream: Send {
next_capture_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>77*bb4ee6a4SAndroid Build Coastguard Worker     async fn next_capture_buffer<'a>(
78*bb4ee6a4SAndroid Build Coastguard Worker         &'a mut self,
79*bb4ee6a4SAndroid Build Coastguard Worker         _ex: &dyn AudioStreamsExecutor,
80*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
81*bb4ee6a4SAndroid Build Coastguard Worker }
82*bb4ee6a4SAndroid Build Coastguard Worker 
83*bb4ee6a4SAndroid Build Coastguard Worker #[async_trait(?Send)]
84*bb4ee6a4SAndroid Build Coastguard Worker impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
next_capture_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>85*bb4ee6a4SAndroid Build Coastguard Worker     async fn next_capture_buffer<'a>(
86*bb4ee6a4SAndroid Build Coastguard Worker         &'a mut self,
87*bb4ee6a4SAndroid Build Coastguard Worker         ex: &dyn AudioStreamsExecutor,
88*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
89*bb4ee6a4SAndroid Build Coastguard Worker         (**self).next_capture_buffer(ex).await
90*bb4ee6a4SAndroid Build Coastguard Worker     }
91*bb4ee6a4SAndroid Build Coastguard Worker }
92*bb4ee6a4SAndroid Build Coastguard Worker 
93*bb4ee6a4SAndroid Build Coastguard Worker /// `CaptureBuffer` contains a block of audio samples got from capture stream. It provides
94*bb4ee6a4SAndroid Build Coastguard Worker /// temporary view to those samples and will notifies capture stream when dropped.
95*bb4ee6a4SAndroid Build Coastguard Worker /// Note that it'll always send `buffer.len() / frame_size` to drop function when it got destroyed
96*bb4ee6a4SAndroid Build Coastguard Worker /// since `CaptureBufferStream` assumes that users get all the samples from the buffer.
97*bb4ee6a4SAndroid Build Coastguard Worker pub struct CaptureBuffer<'a> {
98*bb4ee6a4SAndroid Build Coastguard Worker     buffer: AudioBuffer<'a>,
99*bb4ee6a4SAndroid Build Coastguard Worker     drop: &'a mut dyn BufferCommit,
100*bb4ee6a4SAndroid Build Coastguard Worker }
101*bb4ee6a4SAndroid Build Coastguard Worker 
102*bb4ee6a4SAndroid Build Coastguard Worker /// Async version of 'CaptureBuffer`
103*bb4ee6a4SAndroid Build Coastguard Worker pub struct AsyncCaptureBuffer<'a> {
104*bb4ee6a4SAndroid Build Coastguard Worker     buffer: AudioBuffer<'a>,
105*bb4ee6a4SAndroid Build Coastguard Worker     trigger: &'a mut dyn AsyncBufferCommit,
106*bb4ee6a4SAndroid Build Coastguard Worker }
107*bb4ee6a4SAndroid Build Coastguard Worker 
108*bb4ee6a4SAndroid Build Coastguard Worker /// Errors that are possible from a `CaptureBuffer`.
109*bb4ee6a4SAndroid Build Coastguard Worker #[sorted]
110*bb4ee6a4SAndroid Build Coastguard Worker #[derive(Error, Debug)]
111*bb4ee6a4SAndroid Build Coastguard Worker pub enum CaptureBufferError {
112*bb4ee6a4SAndroid Build Coastguard Worker     #[error("Invalid buffer length")]
113*bb4ee6a4SAndroid Build Coastguard Worker     InvalidLength,
114*bb4ee6a4SAndroid Build Coastguard Worker }
115*bb4ee6a4SAndroid Build Coastguard Worker 
116*bb4ee6a4SAndroid Build Coastguard Worker impl<'a> CaptureBuffer<'a> {
117*bb4ee6a4SAndroid Build Coastguard Worker     /// Creates a new `CaptureBuffer` that holds a reference to the backing memory specified in
118*bb4ee6a4SAndroid Build Coastguard Worker     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, CaptureBufferError> where F: BufferCommit,119*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new<F>(
120*bb4ee6a4SAndroid Build Coastguard Worker         frame_size: usize,
121*bb4ee6a4SAndroid Build Coastguard Worker         buffer: &'a mut [u8],
122*bb4ee6a4SAndroid Build Coastguard Worker         drop: &'a mut F,
123*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Result<Self, CaptureBufferError>
124*bb4ee6a4SAndroid Build Coastguard Worker     where
125*bb4ee6a4SAndroid Build Coastguard Worker         F: BufferCommit,
126*bb4ee6a4SAndroid Build Coastguard Worker     {
127*bb4ee6a4SAndroid Build Coastguard Worker         if buffer.len() % frame_size != 0 {
128*bb4ee6a4SAndroid Build Coastguard Worker             return Err(CaptureBufferError::InvalidLength);
129*bb4ee6a4SAndroid Build Coastguard Worker         }
130*bb4ee6a4SAndroid Build Coastguard Worker 
131*bb4ee6a4SAndroid Build Coastguard Worker         Ok(CaptureBuffer {
132*bb4ee6a4SAndroid Build Coastguard Worker             buffer: AudioBuffer {
133*bb4ee6a4SAndroid Build Coastguard Worker                 buffer,
134*bb4ee6a4SAndroid Build Coastguard Worker                 frame_size,
135*bb4ee6a4SAndroid Build Coastguard Worker                 offset: 0,
136*bb4ee6a4SAndroid Build Coastguard Worker             },
137*bb4ee6a4SAndroid Build Coastguard Worker             drop,
138*bb4ee6a4SAndroid Build Coastguard Worker         })
139*bb4ee6a4SAndroid Build Coastguard Worker     }
140*bb4ee6a4SAndroid Build Coastguard Worker 
141*bb4ee6a4SAndroid Build Coastguard Worker     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize142*bb4ee6a4SAndroid Build Coastguard Worker     pub fn frame_capacity(&self) -> usize {
143*bb4ee6a4SAndroid Build Coastguard Worker         self.buffer.frame_capacity()
144*bb4ee6a4SAndroid Build Coastguard Worker     }
145*bb4ee6a4SAndroid Build Coastguard Worker 
146*bb4ee6a4SAndroid Build Coastguard Worker     /// This triggers the callback of `BufferCommit`. This should be called after the data is read
147*bb4ee6a4SAndroid Build Coastguard Worker     /// from the buffer.
148*bb4ee6a4SAndroid Build Coastguard Worker     ///
149*bb4ee6a4SAndroid Build Coastguard Worker     /// Always sends `frame_capacity`.
commit(&mut self)150*bb4ee6a4SAndroid Build Coastguard Worker     pub fn commit(&mut self) {
151*bb4ee6a4SAndroid Build Coastguard Worker         self.drop.commit(self.frame_capacity());
152*bb4ee6a4SAndroid Build Coastguard Worker     }
153*bb4ee6a4SAndroid Build Coastguard Worker 
latency_bytes(&self) -> u32154*bb4ee6a4SAndroid Build Coastguard Worker     pub fn latency_bytes(&self) -> u32 {
155*bb4ee6a4SAndroid Build Coastguard Worker         self.drop.latency_bytes()
156*bb4ee6a4SAndroid Build Coastguard Worker     }
157*bb4ee6a4SAndroid Build Coastguard Worker 
158*bb4ee6a4SAndroid Build Coastguard Worker     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>159*bb4ee6a4SAndroid Build Coastguard Worker     pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
160*bb4ee6a4SAndroid Build Coastguard Worker         self.buffer.read_copy_cb(size, cb)
161*bb4ee6a4SAndroid Build Coastguard Worker     }
162*bb4ee6a4SAndroid Build Coastguard Worker }
163*bb4ee6a4SAndroid Build Coastguard Worker 
164*bb4ee6a4SAndroid Build Coastguard Worker impl<'a> Read for CaptureBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>165*bb4ee6a4SAndroid Build Coastguard Worker     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166*bb4ee6a4SAndroid Build Coastguard Worker         self.buffer.read(buf)
167*bb4ee6a4SAndroid Build Coastguard Worker     }
168*bb4ee6a4SAndroid Build Coastguard Worker }
169*bb4ee6a4SAndroid Build Coastguard Worker 
170*bb4ee6a4SAndroid Build Coastguard Worker impl<'a> AsyncCaptureBuffer<'a> {
171*bb4ee6a4SAndroid Build Coastguard Worker     /// Creates a new `AsyncCaptureBuffer` that holds a reference to the backing memory specified in
172*bb4ee6a4SAndroid Build Coastguard Worker     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, CaptureBufferError> where F: AsyncBufferCommit,173*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new<F>(
174*bb4ee6a4SAndroid Build Coastguard Worker         frame_size: usize,
175*bb4ee6a4SAndroid Build Coastguard Worker         buffer: &'a mut [u8],
176*bb4ee6a4SAndroid Build Coastguard Worker         trigger: &'a mut F,
177*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Result<Self, CaptureBufferError>
178*bb4ee6a4SAndroid Build Coastguard Worker     where
179*bb4ee6a4SAndroid Build Coastguard Worker         F: AsyncBufferCommit,
180*bb4ee6a4SAndroid Build Coastguard Worker     {
181*bb4ee6a4SAndroid Build Coastguard Worker         if buffer.len() % frame_size != 0 {
182*bb4ee6a4SAndroid Build Coastguard Worker             return Err(CaptureBufferError::InvalidLength);
183*bb4ee6a4SAndroid Build Coastguard Worker         }
184*bb4ee6a4SAndroid Build Coastguard Worker 
185*bb4ee6a4SAndroid Build Coastguard Worker         Ok(AsyncCaptureBuffer {
186*bb4ee6a4SAndroid Build Coastguard Worker             buffer: AudioBuffer {
187*bb4ee6a4SAndroid Build Coastguard Worker                 buffer,
188*bb4ee6a4SAndroid Build Coastguard Worker                 frame_size,
189*bb4ee6a4SAndroid Build Coastguard Worker                 offset: 0,
190*bb4ee6a4SAndroid Build Coastguard Worker             },
191*bb4ee6a4SAndroid Build Coastguard Worker             trigger,
192*bb4ee6a4SAndroid Build Coastguard Worker         })
193*bb4ee6a4SAndroid Build Coastguard Worker     }
194*bb4ee6a4SAndroid Build Coastguard Worker 
195*bb4ee6a4SAndroid Build Coastguard Worker     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize196*bb4ee6a4SAndroid Build Coastguard Worker     pub fn frame_capacity(&self) -> usize {
197*bb4ee6a4SAndroid Build Coastguard Worker         self.buffer.frame_capacity()
198*bb4ee6a4SAndroid Build Coastguard Worker     }
199*bb4ee6a4SAndroid Build Coastguard Worker 
200*bb4ee6a4SAndroid Build Coastguard Worker     /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
201*bb4ee6a4SAndroid Build Coastguard Worker     /// read from the buffer.
202*bb4ee6a4SAndroid Build Coastguard Worker     ///
203*bb4ee6a4SAndroid Build Coastguard Worker     /// Always sends `frame_capacity`.
commit(&mut self)204*bb4ee6a4SAndroid Build Coastguard Worker     pub async fn commit(&mut self) {
205*bb4ee6a4SAndroid Build Coastguard Worker         self.trigger.commit(self.frame_capacity()).await;
206*bb4ee6a4SAndroid Build Coastguard Worker     }
207*bb4ee6a4SAndroid Build Coastguard Worker 
latency_bytes(&self) -> u32208*bb4ee6a4SAndroid Build Coastguard Worker     pub fn latency_bytes(&self) -> u32 {
209*bb4ee6a4SAndroid Build Coastguard Worker         self.trigger.latency_bytes()
210*bb4ee6a4SAndroid Build Coastguard Worker     }
211*bb4ee6a4SAndroid Build Coastguard Worker 
212*bb4ee6a4SAndroid Build Coastguard Worker     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>213*bb4ee6a4SAndroid Build Coastguard Worker     pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
214*bb4ee6a4SAndroid Build Coastguard Worker         self.buffer.read_copy_cb(size, cb)
215*bb4ee6a4SAndroid Build Coastguard Worker     }
216*bb4ee6a4SAndroid Build Coastguard Worker 
217*bb4ee6a4SAndroid Build Coastguard Worker     /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>218*bb4ee6a4SAndroid Build Coastguard Worker     pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
219*bb4ee6a4SAndroid Build Coastguard Worker         self.buffer.copy_to(writer)
220*bb4ee6a4SAndroid Build Coastguard Worker     }
221*bb4ee6a4SAndroid Build Coastguard Worker }
222*bb4ee6a4SAndroid Build Coastguard Worker 
223*bb4ee6a4SAndroid Build Coastguard Worker impl<'a> Read for AsyncCaptureBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>224*bb4ee6a4SAndroid Build Coastguard Worker     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
225*bb4ee6a4SAndroid Build Coastguard Worker         self.buffer.read(buf)
226*bb4ee6a4SAndroid Build Coastguard Worker     }
227*bb4ee6a4SAndroid Build Coastguard Worker }
228*bb4ee6a4SAndroid Build Coastguard Worker 
229*bb4ee6a4SAndroid Build Coastguard Worker /// Stream that provides null capture samples.
230*bb4ee6a4SAndroid Build Coastguard Worker pub struct NoopCaptureStream {
231*bb4ee6a4SAndroid Build Coastguard Worker     buffer: Vec<u8>,
232*bb4ee6a4SAndroid Build Coastguard Worker     frame_size: usize,
233*bb4ee6a4SAndroid Build Coastguard Worker     interval: Duration,
234*bb4ee6a4SAndroid Build Coastguard Worker     next_frame: Duration,
235*bb4ee6a4SAndroid Build Coastguard Worker     start_time: Option<Instant>,
236*bb4ee6a4SAndroid Build Coastguard Worker     buffer_drop: NoopBufferCommit,
237*bb4ee6a4SAndroid Build Coastguard Worker }
238*bb4ee6a4SAndroid Build Coastguard Worker 
239*bb4ee6a4SAndroid Build Coastguard Worker impl NoopCaptureStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self240*bb4ee6a4SAndroid Build Coastguard Worker     pub fn new(
241*bb4ee6a4SAndroid Build Coastguard Worker         num_channels: usize,
242*bb4ee6a4SAndroid Build Coastguard Worker         format: SampleFormat,
243*bb4ee6a4SAndroid Build Coastguard Worker         frame_rate: u32,
244*bb4ee6a4SAndroid Build Coastguard Worker         buffer_size: usize,
245*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Self {
246*bb4ee6a4SAndroid Build Coastguard Worker         let frame_size = format.sample_bytes() * num_channels;
247*bb4ee6a4SAndroid Build Coastguard Worker         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
248*bb4ee6a4SAndroid Build Coastguard Worker         NoopCaptureStream {
249*bb4ee6a4SAndroid Build Coastguard Worker             buffer: vec![0; buffer_size * frame_size],
250*bb4ee6a4SAndroid Build Coastguard Worker             frame_size,
251*bb4ee6a4SAndroid Build Coastguard Worker             interval,
252*bb4ee6a4SAndroid Build Coastguard Worker             next_frame: interval,
253*bb4ee6a4SAndroid Build Coastguard Worker             start_time: None,
254*bb4ee6a4SAndroid Build Coastguard Worker             buffer_drop: NoopBufferCommit {
255*bb4ee6a4SAndroid Build Coastguard Worker                 which_buffer: false,
256*bb4ee6a4SAndroid Build Coastguard Worker             },
257*bb4ee6a4SAndroid Build Coastguard Worker         }
258*bb4ee6a4SAndroid Build Coastguard Worker     }
259*bb4ee6a4SAndroid Build Coastguard Worker }
260*bb4ee6a4SAndroid Build Coastguard Worker 
261*bb4ee6a4SAndroid Build Coastguard Worker impl CaptureBufferStream for NoopCaptureStream {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>262*bb4ee6a4SAndroid Build Coastguard Worker     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
263*bb4ee6a4SAndroid Build Coastguard Worker         if let Some(start_time) = self.start_time {
264*bb4ee6a4SAndroid Build Coastguard Worker             let elapsed = start_time.elapsed();
265*bb4ee6a4SAndroid Build Coastguard Worker             if elapsed < self.next_frame {
266*bb4ee6a4SAndroid Build Coastguard Worker                 std::thread::sleep(self.next_frame - elapsed);
267*bb4ee6a4SAndroid Build Coastguard Worker             }
268*bb4ee6a4SAndroid Build Coastguard Worker             self.next_frame += self.interval;
269*bb4ee6a4SAndroid Build Coastguard Worker         } else {
270*bb4ee6a4SAndroid Build Coastguard Worker             self.start_time = Some(Instant::now());
271*bb4ee6a4SAndroid Build Coastguard Worker             self.next_frame = self.interval;
272*bb4ee6a4SAndroid Build Coastguard Worker         }
273*bb4ee6a4SAndroid Build Coastguard Worker         Ok(CaptureBuffer::new(
274*bb4ee6a4SAndroid Build Coastguard Worker             self.frame_size,
275*bb4ee6a4SAndroid Build Coastguard Worker             &mut self.buffer,
276*bb4ee6a4SAndroid Build Coastguard Worker             &mut self.buffer_drop,
277*bb4ee6a4SAndroid Build Coastguard Worker         )?)
278*bb4ee6a4SAndroid Build Coastguard Worker     }
279*bb4ee6a4SAndroid Build Coastguard Worker }
280*bb4ee6a4SAndroid Build Coastguard Worker 
281*bb4ee6a4SAndroid Build Coastguard Worker #[async_trait(?Send)]
282*bb4ee6a4SAndroid Build Coastguard Worker impl AsyncCaptureBufferStream for NoopCaptureStream {
next_capture_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>283*bb4ee6a4SAndroid Build Coastguard Worker     async fn next_capture_buffer<'a>(
284*bb4ee6a4SAndroid Build Coastguard Worker         &'a mut self,
285*bb4ee6a4SAndroid Build Coastguard Worker         ex: &dyn AudioStreamsExecutor,
286*bb4ee6a4SAndroid Build Coastguard Worker     ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
287*bb4ee6a4SAndroid Build Coastguard Worker         if let Some(start_time) = self.start_time {
288*bb4ee6a4SAndroid Build Coastguard Worker             let elapsed = start_time.elapsed();
289*bb4ee6a4SAndroid Build Coastguard Worker             if elapsed < self.next_frame {
290*bb4ee6a4SAndroid Build Coastguard Worker                 ex.delay(self.next_frame - elapsed).await?;
291*bb4ee6a4SAndroid Build Coastguard Worker             }
292*bb4ee6a4SAndroid Build Coastguard Worker             self.next_frame += self.interval;
293*bb4ee6a4SAndroid Build Coastguard Worker         } else {
294*bb4ee6a4SAndroid Build Coastguard Worker             self.start_time = Some(Instant::now());
295*bb4ee6a4SAndroid Build Coastguard Worker             self.next_frame = self.interval;
296*bb4ee6a4SAndroid Build Coastguard Worker         }
297*bb4ee6a4SAndroid Build Coastguard Worker         Ok(AsyncCaptureBuffer::new(
298*bb4ee6a4SAndroid Build Coastguard Worker             self.frame_size,
299*bb4ee6a4SAndroid Build Coastguard Worker             &mut self.buffer,
300*bb4ee6a4SAndroid Build Coastguard Worker             &mut self.buffer_drop,
301*bb4ee6a4SAndroid Build Coastguard Worker         )?)
302*bb4ee6a4SAndroid Build Coastguard Worker     }
303*bb4ee6a4SAndroid Build Coastguard Worker }
304*bb4ee6a4SAndroid Build Coastguard Worker 
305*bb4ee6a4SAndroid Build Coastguard Worker /// Call `f` with a `AsyncCaptureBuffer`, and trigger the buffer done call back after. `f` can read
306*bb4ee6a4SAndroid Build Coastguard Worker /// the capture data from the given `AsyncCaptureBuffer`.
307*bb4ee6a4SAndroid Build Coastguard Worker ///
308*bb4ee6a4SAndroid Build Coastguard Worker /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_read_capture_buffer<F>( stream: &mut dyn AsyncCaptureBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,309*bb4ee6a4SAndroid Build Coastguard Worker pub async fn async_read_capture_buffer<F>(
310*bb4ee6a4SAndroid Build Coastguard Worker     stream: &mut dyn AsyncCaptureBufferStream,
311*bb4ee6a4SAndroid Build Coastguard Worker     f: F,
312*bb4ee6a4SAndroid Build Coastguard Worker     ex: &dyn AudioStreamsExecutor,
313*bb4ee6a4SAndroid Build Coastguard Worker ) -> Result<(), BoxError>
314*bb4ee6a4SAndroid Build Coastguard Worker where
315*bb4ee6a4SAndroid Build Coastguard Worker     F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,
316*bb4ee6a4SAndroid Build Coastguard Worker {
317*bb4ee6a4SAndroid Build Coastguard Worker     let mut buf = stream.next_capture_buffer(ex).await?;
318*bb4ee6a4SAndroid Build Coastguard Worker     f(&mut buf)?;
319*bb4ee6a4SAndroid Build Coastguard Worker     buf.commit().await;
320*bb4ee6a4SAndroid Build Coastguard Worker     Ok(())
321*bb4ee6a4SAndroid Build Coastguard Worker }
322*bb4ee6a4SAndroid Build Coastguard Worker 
323*bb4ee6a4SAndroid Build Coastguard Worker #[cfg(test)]
324*bb4ee6a4SAndroid Build Coastguard Worker mod tests {
325*bb4ee6a4SAndroid Build Coastguard Worker     use futures::FutureExt;
326*bb4ee6a4SAndroid Build Coastguard Worker 
327*bb4ee6a4SAndroid Build Coastguard Worker     use super::super::async_api::test::TestExecutor;
328*bb4ee6a4SAndroid Build Coastguard Worker     use super::super::*;
329*bb4ee6a4SAndroid Build Coastguard Worker     use super::*;
330*bb4ee6a4SAndroid Build Coastguard Worker 
331*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
invalid_buffer_length()332*bb4ee6a4SAndroid Build Coastguard Worker     fn invalid_buffer_length() {
333*bb4ee6a4SAndroid Build Coastguard Worker         // Capture buffers can't be created with a size that isn't divisible by the frame size.
334*bb4ee6a4SAndroid Build Coastguard Worker         let mut cp_buf = [0xa5u8; 480 * 2 * 2 + 1];
335*bb4ee6a4SAndroid Build Coastguard Worker         let mut buffer_drop = NoopBufferCommit {
336*bb4ee6a4SAndroid Build Coastguard Worker             which_buffer: false,
337*bb4ee6a4SAndroid Build Coastguard Worker         };
338*bb4ee6a4SAndroid Build Coastguard Worker         assert!(CaptureBuffer::new(2, &mut cp_buf, &mut buffer_drop).is_err());
339*bb4ee6a4SAndroid Build Coastguard Worker     }
340*bb4ee6a4SAndroid Build Coastguard Worker 
341*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
commit()342*bb4ee6a4SAndroid Build Coastguard Worker     fn commit() {
343*bb4ee6a4SAndroid Build Coastguard Worker         struct TestCommit {
344*bb4ee6a4SAndroid Build Coastguard Worker             frame_count: usize,
345*bb4ee6a4SAndroid Build Coastguard Worker         }
346*bb4ee6a4SAndroid Build Coastguard Worker         impl BufferCommit for TestCommit {
347*bb4ee6a4SAndroid Build Coastguard Worker             fn commit(&mut self, nwritten: usize) {
348*bb4ee6a4SAndroid Build Coastguard Worker                 self.frame_count += nwritten;
349*bb4ee6a4SAndroid Build Coastguard Worker             }
350*bb4ee6a4SAndroid Build Coastguard Worker         }
351*bb4ee6a4SAndroid Build Coastguard Worker         let mut test_commit = TestCommit { frame_count: 0 };
352*bb4ee6a4SAndroid Build Coastguard Worker         {
353*bb4ee6a4SAndroid Build Coastguard Worker             const FRAME_SIZE: usize = 4;
354*bb4ee6a4SAndroid Build Coastguard Worker             let mut buf = [0u8; 480 * FRAME_SIZE];
355*bb4ee6a4SAndroid Build Coastguard Worker             let mut cp_buf = CaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
356*bb4ee6a4SAndroid Build Coastguard Worker             let mut local_buf = [0u8; 240 * FRAME_SIZE];
357*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
358*bb4ee6a4SAndroid Build Coastguard Worker             cp_buf.commit();
359*bb4ee6a4SAndroid Build Coastguard Worker         }
360*bb4ee6a4SAndroid Build Coastguard Worker         // This should be 480 no matter how many samples are read.
361*bb4ee6a4SAndroid Build Coastguard Worker         assert_eq!(test_commit.frame_count, 480);
362*bb4ee6a4SAndroid Build Coastguard Worker     }
363*bb4ee6a4SAndroid Build Coastguard Worker 
364*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
sixteen_bit_stereo()365*bb4ee6a4SAndroid Build Coastguard Worker     fn sixteen_bit_stereo() {
366*bb4ee6a4SAndroid Build Coastguard Worker         let mut server = NoopStreamSource::new();
367*bb4ee6a4SAndroid Build Coastguard Worker         let (_, mut stream) = server
368*bb4ee6a4SAndroid Build Coastguard Worker             .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
369*bb4ee6a4SAndroid Build Coastguard Worker             .unwrap();
370*bb4ee6a4SAndroid Build Coastguard Worker         let mut copy_func = |b: &mut CaptureBuffer| {
371*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(b.buffer.frame_capacity(), 480);
372*bb4ee6a4SAndroid Build Coastguard Worker             let mut pb_buf = [0xa5u8; 480 * 2 * 2];
373*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(b.read(&mut pb_buf).unwrap(), 480 * 2 * 2);
374*bb4ee6a4SAndroid Build Coastguard Worker             Ok(())
375*bb4ee6a4SAndroid Build Coastguard Worker         };
376*bb4ee6a4SAndroid Build Coastguard Worker         stream.read_capture_buffer(&mut copy_func).unwrap();
377*bb4ee6a4SAndroid Build Coastguard Worker     }
378*bb4ee6a4SAndroid Build Coastguard Worker 
379*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
consumption_rate()380*bb4ee6a4SAndroid Build Coastguard Worker     fn consumption_rate() {
381*bb4ee6a4SAndroid Build Coastguard Worker         let mut server = NoopStreamSource::new();
382*bb4ee6a4SAndroid Build Coastguard Worker         let (_, mut stream) = server
383*bb4ee6a4SAndroid Build Coastguard Worker             .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
384*bb4ee6a4SAndroid Build Coastguard Worker             .unwrap();
385*bb4ee6a4SAndroid Build Coastguard Worker         let start = Instant::now();
386*bb4ee6a4SAndroid Build Coastguard Worker         {
387*bb4ee6a4SAndroid Build Coastguard Worker             let mut copy_func = |b: &mut CaptureBuffer| {
388*bb4ee6a4SAndroid Build Coastguard Worker                 let mut cp_buf = [0xa5u8; 480 * 2 * 2];
389*bb4ee6a4SAndroid Build Coastguard Worker                 assert_eq!(b.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
390*bb4ee6a4SAndroid Build Coastguard Worker                 for buf in cp_buf.iter() {
391*bb4ee6a4SAndroid Build Coastguard Worker                     assert_eq!(*buf, 0, "Read samples should all be zeros.");
392*bb4ee6a4SAndroid Build Coastguard Worker                 }
393*bb4ee6a4SAndroid Build Coastguard Worker                 Ok(())
394*bb4ee6a4SAndroid Build Coastguard Worker             };
395*bb4ee6a4SAndroid Build Coastguard Worker             stream.read_capture_buffer(&mut copy_func).unwrap();
396*bb4ee6a4SAndroid Build Coastguard Worker         }
397*bb4ee6a4SAndroid Build Coastguard Worker         // The second call should block until the first buffer is consumed.
398*bb4ee6a4SAndroid Build Coastguard Worker         let mut assert_func = |_: &mut CaptureBuffer| {
399*bb4ee6a4SAndroid Build Coastguard Worker             let elapsed = start.elapsed();
400*bb4ee6a4SAndroid Build Coastguard Worker             assert!(
401*bb4ee6a4SAndroid Build Coastguard Worker                 elapsed > Duration::from_millis(10),
402*bb4ee6a4SAndroid Build Coastguard Worker                 "next_capture_buffer didn't block long enough {}",
403*bb4ee6a4SAndroid Build Coastguard Worker                 elapsed.subsec_millis()
404*bb4ee6a4SAndroid Build Coastguard Worker             );
405*bb4ee6a4SAndroid Build Coastguard Worker             Ok(())
406*bb4ee6a4SAndroid Build Coastguard Worker         };
407*bb4ee6a4SAndroid Build Coastguard Worker         stream.read_capture_buffer(&mut assert_func).unwrap();
408*bb4ee6a4SAndroid Build Coastguard Worker     }
409*bb4ee6a4SAndroid Build Coastguard Worker 
410*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
async_commit()411*bb4ee6a4SAndroid Build Coastguard Worker     fn async_commit() {
412*bb4ee6a4SAndroid Build Coastguard Worker         struct TestCommit {
413*bb4ee6a4SAndroid Build Coastguard Worker             frame_count: usize,
414*bb4ee6a4SAndroid Build Coastguard Worker         }
415*bb4ee6a4SAndroid Build Coastguard Worker         #[async_trait(?Send)]
416*bb4ee6a4SAndroid Build Coastguard Worker         impl AsyncBufferCommit for TestCommit {
417*bb4ee6a4SAndroid Build Coastguard Worker             async fn commit(&mut self, nwritten: usize) {
418*bb4ee6a4SAndroid Build Coastguard Worker                 self.frame_count += nwritten;
419*bb4ee6a4SAndroid Build Coastguard Worker             }
420*bb4ee6a4SAndroid Build Coastguard Worker         }
421*bb4ee6a4SAndroid Build Coastguard Worker         async fn this_test() {
422*bb4ee6a4SAndroid Build Coastguard Worker             let mut test_commit = TestCommit { frame_count: 0 };
423*bb4ee6a4SAndroid Build Coastguard Worker             {
424*bb4ee6a4SAndroid Build Coastguard Worker                 const FRAME_SIZE: usize = 4;
425*bb4ee6a4SAndroid Build Coastguard Worker                 let mut buf = [0u8; 480 * FRAME_SIZE];
426*bb4ee6a4SAndroid Build Coastguard Worker                 let mut cp_buf =
427*bb4ee6a4SAndroid Build Coastguard Worker                     AsyncCaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
428*bb4ee6a4SAndroid Build Coastguard Worker                 let mut local_buf = [0u8; 240 * FRAME_SIZE];
429*bb4ee6a4SAndroid Build Coastguard Worker                 assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
430*bb4ee6a4SAndroid Build Coastguard Worker                 cp_buf.commit().await;
431*bb4ee6a4SAndroid Build Coastguard Worker             }
432*bb4ee6a4SAndroid Build Coastguard Worker             // This should be 480 no matter how many samples are read.
433*bb4ee6a4SAndroid Build Coastguard Worker             assert_eq!(test_commit.frame_count, 480);
434*bb4ee6a4SAndroid Build Coastguard Worker         }
435*bb4ee6a4SAndroid Build Coastguard Worker 
436*bb4ee6a4SAndroid Build Coastguard Worker         this_test().now_or_never();
437*bb4ee6a4SAndroid Build Coastguard Worker     }
438*bb4ee6a4SAndroid Build Coastguard Worker 
439*bb4ee6a4SAndroid Build Coastguard Worker     #[test]
consumption_rate_async()440*bb4ee6a4SAndroid Build Coastguard Worker     fn consumption_rate_async() {
441*bb4ee6a4SAndroid Build Coastguard Worker         async fn this_test(ex: &TestExecutor) {
442*bb4ee6a4SAndroid Build Coastguard Worker             let mut server = NoopStreamSource::new();
443*bb4ee6a4SAndroid Build Coastguard Worker             let (_, mut stream) = server
444*bb4ee6a4SAndroid Build Coastguard Worker                 .new_async_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[], ex)
445*bb4ee6a4SAndroid Build Coastguard Worker                 .unwrap();
446*bb4ee6a4SAndroid Build Coastguard Worker             let start = Instant::now();
447*bb4ee6a4SAndroid Build Coastguard Worker             {
448*bb4ee6a4SAndroid Build Coastguard Worker                 let copy_func = |buf: &mut AsyncCaptureBuffer| {
449*bb4ee6a4SAndroid Build Coastguard Worker                     let mut cp_buf = [0xa5u8; 480 * 2 * 2];
450*bb4ee6a4SAndroid Build Coastguard Worker                     assert_eq!(buf.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
451*bb4ee6a4SAndroid Build Coastguard Worker                     for buf in cp_buf.iter() {
452*bb4ee6a4SAndroid Build Coastguard Worker                         assert_eq!(*buf, 0, "Read samples should all be zeros.");
453*bb4ee6a4SAndroid Build Coastguard Worker                     }
454*bb4ee6a4SAndroid Build Coastguard Worker                     Ok(())
455*bb4ee6a4SAndroid Build Coastguard Worker                 };
456*bb4ee6a4SAndroid Build Coastguard Worker                 async_read_capture_buffer(&mut *stream, copy_func, ex)
457*bb4ee6a4SAndroid Build Coastguard Worker                     .await
458*bb4ee6a4SAndroid Build Coastguard Worker                     .unwrap();
459*bb4ee6a4SAndroid Build Coastguard Worker             }
460*bb4ee6a4SAndroid Build Coastguard Worker             // The second call should block until the first buffer is consumed.
461*bb4ee6a4SAndroid Build Coastguard Worker             let assert_func = |_: &mut AsyncCaptureBuffer| {
462*bb4ee6a4SAndroid Build Coastguard Worker                 let elapsed = start.elapsed();
463*bb4ee6a4SAndroid Build Coastguard Worker                 assert!(
464*bb4ee6a4SAndroid Build Coastguard Worker                     elapsed > Duration::from_millis(10),
465*bb4ee6a4SAndroid Build Coastguard Worker                     "write_playback_buffer didn't block long enough {}",
466*bb4ee6a4SAndroid Build Coastguard Worker                     elapsed.subsec_millis()
467*bb4ee6a4SAndroid Build Coastguard Worker                 );
468*bb4ee6a4SAndroid Build Coastguard Worker                 Ok(())
469*bb4ee6a4SAndroid Build Coastguard Worker             };
470*bb4ee6a4SAndroid Build Coastguard Worker             async_read_capture_buffer(&mut *stream, assert_func, ex)
471*bb4ee6a4SAndroid Build Coastguard Worker                 .await
472*bb4ee6a4SAndroid Build Coastguard Worker                 .unwrap();
473*bb4ee6a4SAndroid Build Coastguard Worker         }
474*bb4ee6a4SAndroid Build Coastguard Worker 
475*bb4ee6a4SAndroid Build Coastguard Worker         let ex = TestExecutor {};
476*bb4ee6a4SAndroid Build Coastguard Worker         this_test(&ex).now_or_never();
477*bb4ee6a4SAndroid Build Coastguard Worker     }
478*bb4ee6a4SAndroid Build Coastguard Worker }
479