xref: /aosp_15_r20/cts/apps/CameraITS/tests/feature_combination/test_feature_combination.py (revision b7c941bb3fa97aba169d73cee0bed2de8ac964bf)
1# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Verify feature combinations for stabilization, 10-bit, and frame rate."""
15
16import concurrent.futures
17from datetime import datetime  # pylint: disable=g-importing-member
18from google.protobuf import text_format
19import logging
20import os
21import time
22
23from mobly import test_runner
24
25import its_base_test
26import camera_properties_utils
27import capture_request_utils
28import its_session_utils
29import preview_processing_utils
30import video_processing_utils
31import feature_combination_info_pb2
32
33_BIT_HLG10 = 0x01  # bit 1 for feature mask
34_BIT_STABILIZATION = 0x02  # bit 2 for feature mask
35_FPS_30_60 = (30, 60)
36_FPS_SELECTION_ATOL = 0.01
37_FPS_ATOL_CODEC = 1.2
38_FPS_ATOL_METADATA = 0.8
39
40_NAME = os.path.splitext(os.path.basename(__file__))[0]
41_SEC_TO_NSEC = 1_000_000_000
42
43
44class FeatureCombinationTest(its_base_test.ItsBaseTest):
45  """Tests camera feature combinations.
46
47  The combination of camera features tested by this function are:
48  - Preview stabilization
49  - Target FPS range
50  - HLG 10-bit HDR
51
52  Camera is moved in sensor fusion rig on an arc of 15 degrees.
53  Speed is set to mimic hand movement (and not be too fast).
54  Preview is captured after rotation rig starts moving and the
55  gyroscope data is dumped.
56
57  Preview stabilization:
58  The recorded preview is processed to dump all of the frames to
59  PNG files. Camera movement is extracted from frames by determining
60  max angle of deflection in video movement vs max angle of deflection
61  in gyroscope movement. Test is a PASS if rotation is reduced in video.
62
63  Target FPS range:
64  The recorded preview has the expected fps range. For example,
65  if [60, 60] is set as targetFpsRange, the camera device is expected to
66  produce 60fps preview/video.
67
68  HLG 10-bit HDR:
69  The recorded preview has valid 10-bit HLG outputs.
70  """
71
72  def test_feature_combination(self):
73    # Use a pool of threads to execute calls asynchronously
74    with concurrent.futures.ThreadPoolExecutor() as executor:
75      self._test_feature_combination(executor)
76
77  def _create_feature_combo_proto(self):
78    """Start logging feature combination info for camera in proto."""
79    feature_combo_for_camera = (
80        feature_combination_info_pb2.FeatureCombinationForCamera())
81    feature_combo_for_camera.camera_id = self.camera_id
82
83    return feature_combo_for_camera
84
85  def _add_feature_combo_entry_to_proto(self, feature_combo_for_camera,
86                                        output_surfaces,
87                                        is_supported,
88                                        fps_range,
89                                        stabilization):
90    """Log whether a feature combination is supported."""
91    entry = feature_combination_info_pb2.FeatureCombinationEntry()
92    entry.is_supported = is_supported
93    for surface in output_surfaces:
94      config_entry = feature_combination_info_pb2.OutputConfiguration()
95      config_entry.image_format = surface['format_code']
96      config_entry.size.width = surface['width']
97      config_entry.size.height = surface['height']
98      config_entry.stream_usecase = feature_combination_info_pb2.USECASE_DEFAULT
99      config_entry.dynamic_range_profile = (
100          feature_combination_info_pb2.PROFILE_HLG10 if surface['hlg10']
101          else feature_combination_info_pb2.PROFILE_STANDARD)
102      entry.session_configuration.output_configurations.append(config_entry)
103    entry.session_configuration.stabilization = (
104        feature_combination_info_pb2.STABILIZATION_PREVIEW if stabilization
105        else feature_combination_info_pb2.STABILIZATION_OFF)
106    entry.session_configuration.frame_rate_range.max = fps_range[1]
107    entry.session_configuration.frame_rate_range.min = fps_range[0]
108
109    feature_combo_for_camera.entries.append(entry)
110
111  def _output_feature_combo_proto(self, feature_combo_for_camera):
112    """Finish logging feature combination info and write to ReportLogFiles."""
113    debug_mode = self.debug_mode
114    log_to_file = self.log_feature_combo_support
115    database = feature_combination_info_pb2.FeatureCombinationDatabase()
116    database.build_fingerprint = (
117        its_session_utils.get_build_fingerprint(self.dut.serial))
118    database.timestamp_in_sec = int(time.time())
119    database.feature_combination_for_camera.append(feature_combo_for_camera)
120
121    # Log the feature combination query result and send over to ItsService
122    database_str_oneline = text_format.MessageToString(
123        database, as_one_line=True)
124    print(f'feature_query_proto:{database_str_oneline}')
125
126    if log_to_file:
127      current_time = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
128      proto_file_name = (
129          f'{self.dut.serial}_camera_{self.camera_id}_{current_time}.pb'
130      )
131      logging.debug('proto_file_name %s', proto_file_name)
132
133      with open(proto_file_name, 'wb') as f:
134        f.write(database.SerializeToString())
135
136      if debug_mode:
137        txtpb_file_name = proto_file_name.replace('.pb', '.txtpb')
138        with open(txtpb_file_name, 'w') as tf:
139          database_str = text_format.MessageToString(database)
140          tf.write(database_str)
141
142  def _finish_combination(self, combination_name, is_stabilized, passed,
143                          recording_obj, gyro_events, test_name, log_path,
144                          facing, output_surfaces, fps_range):
145    """Finish verifying a feature combo & preview stabilization if necessary."""
146    result = {'name': combination_name,
147              'output_surfaces': output_surfaces,
148              'fps_range': fps_range,
149              'is_stabilized': is_stabilized,
150              'passed': passed}
151    if is_stabilized:
152      stabilization_result = (
153          preview_processing_utils.verify_preview_stabilization(
154              recording_obj, gyro_events, test_name, log_path, facing
155          )
156      )
157      if stabilization_result['failure']:
158        result['stabilization_failure'] = stabilization_result['failure']
159        result['passed'] = False
160
161    return result
162
163  def _test_feature_combination(self, executor):
164    """Tests features using an injected ThreadPoolExecutor for analysis.
165
166    Args:
167      executor: a ThreadPoolExecutor to analyze recordings asynchronously.
168    """
169    rot_rig = {}
170    log_path = self.log_path
171
172    with its_session_utils.ItsSession(
173        device_id=self.dut.serial,
174        camera_id=self.camera_id) as cam:
175
176      # Skip if the device doesn't support feature combination query
177      props = cam.get_camera_properties()
178      feature_combination_query_version = props.get(
179          'android.info.sessionConfigurationQueryVersion')
180      if not feature_combination_query_version:
181        feature_combination_query_version = (
182            its_session_utils.ANDROID14_API_LEVEL
183        )
184      support_query = (feature_combination_query_version >=
185                       its_session_utils.ANDROID15_API_LEVEL)
186
187      # Log ffmpeg version being used
188      video_processing_utils.log_ffmpeg_version()
189
190      # Raise error if not FRONT or REAR facing camera
191      facing = props['android.lens.facing']
192      camera_properties_utils.check_front_or_rear_camera(props)
193
194      # Initialize rotation rig
195      rot_rig['cntl'] = self.rotator_cntl
196      rot_rig['ch'] = self.rotator_ch
197      if rot_rig['cntl'].lower() != 'arduino':
198        raise AssertionError(
199            f'You must use the arduino controller for {_NAME}.')
200
201      # List of queryable stream combinations
202      combinations_str, combinations = cam.get_queryable_stream_combinations()
203      logging.debug('Queryable stream combinations: %s', combinations_str)
204
205      # Stabilization modes. Make sure to test ON first.
206      stabilization_params = []
207      stabilization_modes = props[
208          'android.control.availableVideoStabilizationModes']
209      if (camera_properties_utils.STABILIZATION_MODE_PREVIEW in
210          stabilization_modes):
211        stabilization_params.append(
212            camera_properties_utils.STABILIZATION_MODE_PREVIEW)
213      stabilization_params.append(
214          camera_properties_utils.STABILIZATION_MODE_OFF
215      )
216      logging.debug('stabilization modes: %s', stabilization_params)
217
218      configs = props['android.scaler.streamConfigurationMap'][
219          'availableStreamConfigurations']
220      fps_ranges = camera_properties_utils.get_ae_target_fps_ranges(props)
221
222      test_failures = []
223      feature_verification_futures = []
224      database = self._create_feature_combo_proto()
225      for stream_combination in combinations:
226        streams_name = stream_combination['name']
227        min_frame_duration = 0
228        configured_streams = []
229        skip = False
230        if (stream_combination['combination'][0]['format'] !=
231            its_session_utils.PRIVATE_FORMAT):
232          raise AssertionError(
233              f'First stream for {streams_name} must be PRIV')
234        preview_size = stream_combination['combination'][0]['size']
235        for stream in stream_combination['combination']:
236          fmt = None
237          size = [int(e) for e in stream['size'].split('x')]
238          if stream['format'] == its_session_utils.PRIVATE_FORMAT:
239            fmt = capture_request_utils.FMT_CODE_PRIV
240          elif stream['format'] == 'jpeg':
241            fmt = capture_request_utils.FMT_CODE_JPEG
242          elif stream['format'] == its_session_utils.JPEG_R_FMT_STR:
243            fmt = capture_request_utils.FMT_CODE_JPEG_R
244          config = [x for x in configs if
245                    x['format'] == fmt and
246                    x['width'] == size[0] and
247                    x['height'] == size[1]]
248          if not config:
249            logging.debug(
250                'stream combination %s not supported. Skip', streams_name)
251            skip = True
252            break
253
254          min_frame_duration = max(
255              config[0]['minFrameDuration'], min_frame_duration)
256          logging.debug(
257              'format is %s, min_frame_duration is %d}',
258              stream['format'], config[0]['minFrameDuration'])
259          configured_streams.append(
260              {'formatStr': stream['format'], 'format': fmt,
261               'width': size[0], 'height': size[1]})
262
263        if skip:
264          continue
265
266        # Fps ranges
267        max_achievable_fps = _SEC_TO_NSEC / min_frame_duration
268        fps_params = [fps for fps in fps_ranges if (
269            fps[1] in _FPS_30_60 and
270            max_achievable_fps >= fps[1] - _FPS_SELECTION_ATOL)]
271
272        for fps_range in fps_params:
273          # HLG10. Make sure to test ON first.
274          hlg10_params = []
275          if cam.is_hlg10_recording_supported_for_size_and_fps(
276              preview_size, fps_range[1]):
277            hlg10_params.append(True)
278          hlg10_params.append(False)
279
280          features_passed = []  # feature combinations already supported
281          for hlg10 in hlg10_params:
282            # Construct output surfaces
283            output_surfaces = []
284            for configured_stream in configured_streams:
285              hlg10_stream = (configured_stream['formatStr'] ==
286                              its_session_utils.PRIVATE_FORMAT and hlg10)
287              output_surfaces.append(
288                  {'format': configured_stream['formatStr'],
289                   'format_code': configured_stream['format'],
290                   'width': configured_stream['width'],
291                   'height': configured_stream['height'],
292                   'hlg10': hlg10_stream}
293              )
294
295            for stabilize in stabilization_params:
296              settings = {
297                  'android.control.videoStabilizationMode': stabilize,
298                  'android.control.aeTargetFpsRange': fps_range,
299              }
300              combination_name = (f'(streams: {streams_name}, hlg10: {hlg10}, '
301                                  f'stabilization: {stabilize}, fps_range: '
302                                  f'[{fps_range[0]}, {fps_range[1]}])')
303              logging.debug('combination name: %s', combination_name)
304
305              if support_query:
306                # Is the feature combination supported?
307                support_claimed = cam.is_stream_combination_supported(
308                    output_surfaces, settings)
309                if not support_claimed:
310                  logging.debug('%s not supported', combination_name)
311
312              passed = True
313              is_stabilized = False
314              if (stabilize ==
315                  camera_properties_utils.STABILIZATION_MODE_PREVIEW):
316                is_stabilized = True
317
318              # If a superset of features are already tested, skip and assuming
319              # the subset of those features are supported.
320              skip_test = its_session_utils.check_features_passed(
321                  features_passed, hlg10, is_stabilized)
322              if skip_test:
323                self._add_feature_combo_entry_to_proto(
324                    database, output_surfaces, passed,
325                    fps_range, is_stabilized)
326                continue
327
328              recording_obj = (
329                  preview_processing_utils.collect_data_with_surfaces(
330                      cam, self.tablet_device, output_surfaces, is_stabilized,
331                      rot_rig=rot_rig, fps_range=fps_range))
332
333              gyro_events = None
334              if is_stabilized:
335                # Get gyro events
336                logging.debug('Reading out inertial sensor events')
337                gyro_events = cam.get_sensor_events()['gyro']
338                logging.debug('Number of gyro samples %d', len(gyro_events))
339
340              # Grab the video from the file location on DUT
341              self.dut.adb.pull([recording_obj['recordedOutputPath'], log_path])
342
343              # Verify FPS by inspecting the video clip
344              preview_file_name = (
345                  recording_obj['recordedOutputPath'].split('/')[-1])
346              preview_file_name_with_path = os.path.join(
347                  self.log_path, preview_file_name)
348              avg_frame_rate_codec = (
349                  video_processing_utils.get_avg_frame_rate(
350                      preview_file_name_with_path))
351              logging.debug('Average codec frame rate for %s is %f',
352                            combination_name, avg_frame_rate_codec)
353              if (avg_frame_rate_codec > fps_range[1] + _FPS_ATOL_CODEC or
354                  avg_frame_rate_codec < fps_range[0] - _FPS_ATOL_CODEC):
355                failure_msg = (
356                    f'{combination_name}: Average video clip frame rate '
357                    f'{avg_frame_rate_codec} exceeding the allowed range of '
358                    f'({fps_range[0]}-{_FPS_ATOL_CODEC}, '
359                    f'{fps_range[1]}+{_FPS_ATOL_CODEC})')
360                test_failures.append(failure_msg)
361                passed = False
362
363              # Verify FPS by inspecting the result metadata
364              capture_results = recording_obj['captureMetadata']
365              if len(capture_results) <= 1:
366                raise AssertionError(
367                    f'{combination_name}: captureMetadata has only '
368                    f'{len(capture_results)} frames')
369              last_t = capture_results[-1]['android.sensor.timestamp']
370              first_t = capture_results[0]['android.sensor.timestamp']
371              avg_frame_duration = (
372                  (last_t - first_t) / (len(capture_results) - 1))
373              avg_frame_rate_metadata = _SEC_TO_NSEC / avg_frame_duration
374              logging.debug('Average metadata frame rate for %s is %f',
375                            combination_name, avg_frame_rate_metadata)
376              if (avg_frame_rate_metadata > fps_range[1] + _FPS_ATOL_METADATA or
377                  avg_frame_rate_metadata < fps_range[0] - _FPS_ATOL_METADATA):
378                failure_msg = (
379                    f'{combination_name}: Average frame rate '
380                    f'{avg_frame_rate_metadata} exceeding the allowed range of '
381                    f'({fps_range[0]}-{_FPS_ATOL_METADATA}, '
382                    f'{fps_range[1]}+{_FPS_ATOL_METADATA})')
383                test_failures.append(failure_msg)
384                passed = False
385
386              # Verify color space
387              color_space = video_processing_utils.get_video_colorspace(
388                  self.log_path, preview_file_name_with_path)
389              if (hlg10 and
390                  video_processing_utils.COLORSPACE_HDR not in color_space):
391                failure_msg = (
392                    f'{combination_name}: video color space {color_space} '
393                    'is missing COLORSPACE_HDR')
394                test_failures.append(failure_msg)
395                passed = False
396
397              if passed:
398                its_session_utils.mark_features_passed(
399                    features_passed, hlg10, is_stabilized)
400
401              # TODO: b/382255298 - Decouple stabilization test.
402              # Schedule finishing up of verification to run asynchronously
403              future = executor.submit(
404                  self._finish_combination, combination_name, is_stabilized,
405                  passed, recording_obj, gyro_events, _NAME, log_path, facing,
406                  output_surfaces, fps_range
407              )
408              feature_verification_futures.append(future)
409
410      # Verify feature combination results
411      for future in feature_verification_futures:
412        result = future.result()
413        logging.debug('Verification result: %s', result)
414        if 'stabilization_failure' in result:
415          failure_msg = f"{result['name']}: {result['stabilization_failure']}"
416          test_failures.append(failure_msg)
417
418        self._add_feature_combo_entry_to_proto(
419            database, result['output_surfaces'], result['passed'],
420            result['fps_range'], result['is_stabilized'])
421
422      # Output the feature combination proto to ItsService and optionally to
423      # file
424      self._output_feature_combo_proto(database)
425
426      # Assert PASS/FAIL criteria
427      if test_failures:
428        logging.debug(test_failures)
429        if support_query:
430          raise AssertionError(test_failures)
431
432if __name__ == '__main__':
433  test_runner.main()
434