xref: /aosp_15_r20/external/crosvm/common/audio_streams/src/capture.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2019 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! ```
6 //! use audio_streams::{BoxError, capture::CaptureBuffer, SampleFormat, StreamSource,
7 //!     NoopStreamSource};
8 //! use std::io::Read;
9 //!
10 //! const buffer_size: usize = 120;
11 //! const num_channels: usize = 2;
12 //!
13 //! # fn main() -> std::result::Result<(),BoxError> {
14 //! let mut stream_source = NoopStreamSource::new();
15 //! let sample_format = SampleFormat::S16LE;
16 //! let frame_size = num_channels * sample_format.sample_bytes();
17 //!
18 //! let (_, mut stream) = stream_source
19 //!     .new_capture_stream(num_channels, sample_format, 48000, buffer_size, &[])?;
20 //! // Capture 10 buffers of zeros.
21 //! let mut buf = Vec::new();
22 //! buf.resize(buffer_size * frame_size, 0xa5u8);
23 //! for _ in 0..10 {
24 //!     let mut copy_func = |stream_buffer: &mut CaptureBuffer| {
25 //!         assert_eq!(stream_buffer.read(&mut buf)?, buffer_size * frame_size);
26 //!         Ok(())
27 //!     };
28 //!     stream.read_capture_buffer(&mut copy_func)?;
29 //! }
30 //! # Ok (())
31 //! # }
32 //! ```
33 
34 use std::io;
35 use std::io::Read;
36 use std::io::Write;
37 use std::time::Duration;
38 use std::time::Instant;
39 
40 use async_trait::async_trait;
41 use remain::sorted;
42 use thiserror::Error;
43 
44 use super::async_api::AudioStreamsExecutor;
45 use super::AsyncBufferCommit;
46 use super::AudioBuffer;
47 use super::BoxError;
48 use super::BufferCommit;
49 use super::NoopBufferCommit;
50 use super::SampleFormat;
51 
52 /// `CaptureBufferStream` provides `CaptureBuffer`s to read with audio samples from capture.
53 pub trait CaptureBufferStream: Send {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>54     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>;
55 
56     /// Call `f` with a `CaptureBuffer`, and trigger the buffer done call back after. `f` can read
57     /// the capture data from the given `CaptureBuffer`.
read_capture_buffer<'b, 's: 'b>( &'s mut self, f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>, ) -> Result<(), BoxError>58     fn read_capture_buffer<'b, 's: 'b>(
59         &'s mut self,
60         f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>,
61     ) -> Result<(), BoxError> {
62         let mut buf = self.next_capture_buffer()?;
63         f(&mut buf)?;
64         buf.commit();
65         Ok(())
66     }
67 }
68 
69 impl<S: CaptureBufferStream + ?Sized> CaptureBufferStream for &mut S {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>70     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
71         (**self).next_capture_buffer()
72     }
73 }
74 
75 #[async_trait(?Send)]
76 pub trait AsyncCaptureBufferStream: Send {
next_capture_buffer<'a>( &'a mut self, _ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>77     async fn next_capture_buffer<'a>(
78         &'a mut self,
79         _ex: &dyn AudioStreamsExecutor,
80     ) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
81 }
82 
83 #[async_trait(?Send)]
84 impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
next_capture_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>85     async fn next_capture_buffer<'a>(
86         &'a mut self,
87         ex: &dyn AudioStreamsExecutor,
88     ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
89         (**self).next_capture_buffer(ex).await
90     }
91 }
92 
93 /// `CaptureBuffer` contains a block of audio samples got from capture stream. It provides
94 /// temporary view to those samples and will notifies capture stream when dropped.
95 /// Note that it'll always send `buffer.len() / frame_size` to drop function when it got destroyed
96 /// since `CaptureBufferStream` assumes that users get all the samples from the buffer.
97 pub struct CaptureBuffer<'a> {
98     buffer: AudioBuffer<'a>,
99     drop: &'a mut dyn BufferCommit,
100 }
101 
102 /// Async version of 'CaptureBuffer`
103 pub struct AsyncCaptureBuffer<'a> {
104     buffer: AudioBuffer<'a>,
105     trigger: &'a mut dyn AsyncBufferCommit,
106 }
107 
108 /// Errors that are possible from a `CaptureBuffer`.
109 #[sorted]
110 #[derive(Error, Debug)]
111 pub enum CaptureBufferError {
112     #[error("Invalid buffer length")]
113     InvalidLength,
114 }
115 
116 impl<'a> CaptureBuffer<'a> {
117     /// Creates a new `CaptureBuffer` that holds a reference to the backing memory specified in
118     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], drop: &'a mut F, ) -> Result<Self, CaptureBufferError> where F: BufferCommit,119     pub fn new<F>(
120         frame_size: usize,
121         buffer: &'a mut [u8],
122         drop: &'a mut F,
123     ) -> Result<Self, CaptureBufferError>
124     where
125         F: BufferCommit,
126     {
127         if buffer.len() % frame_size != 0 {
128             return Err(CaptureBufferError::InvalidLength);
129         }
130 
131         Ok(CaptureBuffer {
132             buffer: AudioBuffer {
133                 buffer,
134                 frame_size,
135                 offset: 0,
136             },
137             drop,
138         })
139     }
140 
141     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize142     pub fn frame_capacity(&self) -> usize {
143         self.buffer.frame_capacity()
144     }
145 
146     /// This triggers the callback of `BufferCommit`. This should be called after the data is read
147     /// from the buffer.
148     ///
149     /// Always sends `frame_capacity`.
commit(&mut self)150     pub fn commit(&mut self) {
151         self.drop.commit(self.frame_capacity());
152     }
153 
latency_bytes(&self) -> u32154     pub fn latency_bytes(&self) -> u32 {
155         self.drop.latency_bytes()
156     }
157 
158     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>159     pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
160         self.buffer.read_copy_cb(size, cb)
161     }
162 }
163 
164 impl<'a> Read for CaptureBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>165     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166         self.buffer.read(buf)
167     }
168 }
169 
170 impl<'a> AsyncCaptureBuffer<'a> {
171     /// Creates a new `AsyncCaptureBuffer` that holds a reference to the backing memory specified in
172     /// `buffer`.
new<F>( frame_size: usize, buffer: &'a mut [u8], trigger: &'a mut F, ) -> Result<Self, CaptureBufferError> where F: AsyncBufferCommit,173     pub fn new<F>(
174         frame_size: usize,
175         buffer: &'a mut [u8],
176         trigger: &'a mut F,
177     ) -> Result<Self, CaptureBufferError>
178     where
179         F: AsyncBufferCommit,
180     {
181         if buffer.len() % frame_size != 0 {
182             return Err(CaptureBufferError::InvalidLength);
183         }
184 
185         Ok(AsyncCaptureBuffer {
186             buffer: AudioBuffer {
187                 buffer,
188                 frame_size,
189                 offset: 0,
190             },
191             trigger,
192         })
193     }
194 
195     /// Returns the number of audio frames that fit in the buffer.
frame_capacity(&self) -> usize196     pub fn frame_capacity(&self) -> usize {
197         self.buffer.frame_capacity()
198     }
199 
200     /// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
201     /// read from the buffer.
202     ///
203     /// Always sends `frame_capacity`.
commit(&mut self)204     pub async fn commit(&mut self) {
205         self.trigger.commit(self.frame_capacity()).await;
206     }
207 
latency_bytes(&self) -> u32208     pub fn latency_bytes(&self) -> u32 {
209         self.trigger.latency_bytes()
210     }
211 
212     /// Reads up to `size` bytes directly from this buffer inside of the given callback function.
copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize>213     pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
214         self.buffer.read_copy_cb(size, cb)
215     }
216 
217     /// Copy data to an io::Write
copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize>218     pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
219         self.buffer.copy_to(writer)
220     }
221 }
222 
223 impl<'a> Read for AsyncCaptureBuffer<'a> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>224     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
225         self.buffer.read(buf)
226     }
227 }
228 
229 /// Stream that provides null capture samples.
230 pub struct NoopCaptureStream {
231     buffer: Vec<u8>,
232     frame_size: usize,
233     interval: Duration,
234     next_frame: Duration,
235     start_time: Option<Instant>,
236     buffer_drop: NoopBufferCommit,
237 }
238 
239 impl NoopCaptureStream {
new( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ) -> Self240     pub fn new(
241         num_channels: usize,
242         format: SampleFormat,
243         frame_rate: u32,
244         buffer_size: usize,
245     ) -> Self {
246         let frame_size = format.sample_bytes() * num_channels;
247         let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
248         NoopCaptureStream {
249             buffer: vec![0; buffer_size * frame_size],
250             frame_size,
251             interval,
252             next_frame: interval,
253             start_time: None,
254             buffer_drop: NoopBufferCommit {
255                 which_buffer: false,
256             },
257         }
258     }
259 }
260 
261 impl CaptureBufferStream for NoopCaptureStream {
next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>262     fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
263         if let Some(start_time) = self.start_time {
264             let elapsed = start_time.elapsed();
265             if elapsed < self.next_frame {
266                 std::thread::sleep(self.next_frame - elapsed);
267             }
268             self.next_frame += self.interval;
269         } else {
270             self.start_time = Some(Instant::now());
271             self.next_frame = self.interval;
272         }
273         Ok(CaptureBuffer::new(
274             self.frame_size,
275             &mut self.buffer,
276             &mut self.buffer_drop,
277         )?)
278     }
279 }
280 
281 #[async_trait(?Send)]
282 impl AsyncCaptureBufferStream for NoopCaptureStream {
next_capture_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>283     async fn next_capture_buffer<'a>(
284         &'a mut self,
285         ex: &dyn AudioStreamsExecutor,
286     ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
287         if let Some(start_time) = self.start_time {
288             let elapsed = start_time.elapsed();
289             if elapsed < self.next_frame {
290                 ex.delay(self.next_frame - elapsed).await?;
291             }
292             self.next_frame += self.interval;
293         } else {
294             self.start_time = Some(Instant::now());
295             self.next_frame = self.interval;
296         }
297         Ok(AsyncCaptureBuffer::new(
298             self.frame_size,
299             &mut self.buffer,
300             &mut self.buffer_drop,
301         )?)
302     }
303 }
304 
305 /// Call `f` with a `AsyncCaptureBuffer`, and trigger the buffer done call back after. `f` can read
306 /// the capture data from the given `AsyncCaptureBuffer`.
307 ///
308 /// This cannot be a trait method because trait methods with generic parameters are not object safe.
async_read_capture_buffer<F>( stream: &mut dyn AsyncCaptureBufferStream, f: F, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError> where F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,309 pub async fn async_read_capture_buffer<F>(
310     stream: &mut dyn AsyncCaptureBufferStream,
311     f: F,
312     ex: &dyn AudioStreamsExecutor,
313 ) -> Result<(), BoxError>
314 where
315     F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,
316 {
317     let mut buf = stream.next_capture_buffer(ex).await?;
318     f(&mut buf)?;
319     buf.commit().await;
320     Ok(())
321 }
322 
323 #[cfg(test)]
324 mod tests {
325     use futures::FutureExt;
326 
327     use super::super::async_api::test::TestExecutor;
328     use super::super::*;
329     use super::*;
330 
331     #[test]
invalid_buffer_length()332     fn invalid_buffer_length() {
333         // Capture buffers can't be created with a size that isn't divisible by the frame size.
334         let mut cp_buf = [0xa5u8; 480 * 2 * 2 + 1];
335         let mut buffer_drop = NoopBufferCommit {
336             which_buffer: false,
337         };
338         assert!(CaptureBuffer::new(2, &mut cp_buf, &mut buffer_drop).is_err());
339     }
340 
341     #[test]
commit()342     fn commit() {
343         struct TestCommit {
344             frame_count: usize,
345         }
346         impl BufferCommit for TestCommit {
347             fn commit(&mut self, nwritten: usize) {
348                 self.frame_count += nwritten;
349             }
350         }
351         let mut test_commit = TestCommit { frame_count: 0 };
352         {
353             const FRAME_SIZE: usize = 4;
354             let mut buf = [0u8; 480 * FRAME_SIZE];
355             let mut cp_buf = CaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
356             let mut local_buf = [0u8; 240 * FRAME_SIZE];
357             assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
358             cp_buf.commit();
359         }
360         // This should be 480 no matter how many samples are read.
361         assert_eq!(test_commit.frame_count, 480);
362     }
363 
364     #[test]
sixteen_bit_stereo()365     fn sixteen_bit_stereo() {
366         let mut server = NoopStreamSource::new();
367         let (_, mut stream) = server
368             .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
369             .unwrap();
370         let mut copy_func = |b: &mut CaptureBuffer| {
371             assert_eq!(b.buffer.frame_capacity(), 480);
372             let mut pb_buf = [0xa5u8; 480 * 2 * 2];
373             assert_eq!(b.read(&mut pb_buf).unwrap(), 480 * 2 * 2);
374             Ok(())
375         };
376         stream.read_capture_buffer(&mut copy_func).unwrap();
377     }
378 
379     #[test]
consumption_rate()380     fn consumption_rate() {
381         let mut server = NoopStreamSource::new();
382         let (_, mut stream) = server
383             .new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
384             .unwrap();
385         let start = Instant::now();
386         {
387             let mut copy_func = |b: &mut CaptureBuffer| {
388                 let mut cp_buf = [0xa5u8; 480 * 2 * 2];
389                 assert_eq!(b.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
390                 for buf in cp_buf.iter() {
391                     assert_eq!(*buf, 0, "Read samples should all be zeros.");
392                 }
393                 Ok(())
394             };
395             stream.read_capture_buffer(&mut copy_func).unwrap();
396         }
397         // The second call should block until the first buffer is consumed.
398         let mut assert_func = |_: &mut CaptureBuffer| {
399             let elapsed = start.elapsed();
400             assert!(
401                 elapsed > Duration::from_millis(10),
402                 "next_capture_buffer didn't block long enough {}",
403                 elapsed.subsec_millis()
404             );
405             Ok(())
406         };
407         stream.read_capture_buffer(&mut assert_func).unwrap();
408     }
409 
410     #[test]
async_commit()411     fn async_commit() {
412         struct TestCommit {
413             frame_count: usize,
414         }
415         #[async_trait(?Send)]
416         impl AsyncBufferCommit for TestCommit {
417             async fn commit(&mut self, nwritten: usize) {
418                 self.frame_count += nwritten;
419             }
420         }
421         async fn this_test() {
422             let mut test_commit = TestCommit { frame_count: 0 };
423             {
424                 const FRAME_SIZE: usize = 4;
425                 let mut buf = [0u8; 480 * FRAME_SIZE];
426                 let mut cp_buf =
427                     AsyncCaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
428                 let mut local_buf = [0u8; 240 * FRAME_SIZE];
429                 assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
430                 cp_buf.commit().await;
431             }
432             // This should be 480 no matter how many samples are read.
433             assert_eq!(test_commit.frame_count, 480);
434         }
435 
436         this_test().now_or_never();
437     }
438 
439     #[test]
consumption_rate_async()440     fn consumption_rate_async() {
441         async fn this_test(ex: &TestExecutor) {
442             let mut server = NoopStreamSource::new();
443             let (_, mut stream) = server
444                 .new_async_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[], ex)
445                 .unwrap();
446             let start = Instant::now();
447             {
448                 let copy_func = |buf: &mut AsyncCaptureBuffer| {
449                     let mut cp_buf = [0xa5u8; 480 * 2 * 2];
450                     assert_eq!(buf.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
451                     for buf in cp_buf.iter() {
452                         assert_eq!(*buf, 0, "Read samples should all be zeros.");
453                     }
454                     Ok(())
455                 };
456                 async_read_capture_buffer(&mut *stream, copy_func, ex)
457                     .await
458                     .unwrap();
459             }
460             // The second call should block until the first buffer is consumed.
461             let assert_func = |_: &mut AsyncCaptureBuffer| {
462                 let elapsed = start.elapsed();
463                 assert!(
464                     elapsed > Duration::from_millis(10),
465                     "write_playback_buffer didn't block long enough {}",
466                     elapsed.subsec_millis()
467                 );
468                 Ok(())
469             };
470             async_read_capture_buffer(&mut *stream, assert_func, ex)
471                 .await
472                 .unwrap();
473         }
474 
475         let ex = TestExecutor {};
476         this_test(&ex).now_or_never();
477     }
478 }
479