xref: /aosp_15_r20/cts/apps/CameraITS/tests/sensor_fusion/test_video_stabilization_jca.py (revision b7c941bb3fa97aba169d73cee0bed2de8ac964bf)
1# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Verify video is stable during phone movement with JCA."""
15
16import logging
17import os
18import pathlib
19import threading
20import time
21
22import camera_properties_utils
23import image_processing_utils
24import its_base_test
25import its_session_utils
26from mobly import test_runner
27import sensor_fusion_utils
28import ui_interaction_utils
29import video_processing_utils
30
31_ASPECT_RATIO_16_9 = 16/9  # Determine if video fmt > 16:9.
32_IMG_FORMAT = 'png'
33_JETPACK_CAMERA_APP_PACKAGE_NAME = 'com.google.jetpackcamera'
34_MIN_PHONE_MOVEMENT_ANGLE = 5  # Degrees
35_NAME = os.path.splitext(os.path.basename(__file__))[0]
36_NUM_ROTATIONS = 24
37_START_FRAME = 30  # Give 3A 1s to warm up.
38_VIDEO_DELAY_TIME = 3  # Seconds
39_VIDEO_DURATION = 5.5  # Seconds
40_VIDEO_STABILIZATION_FACTOR = 0.7  # 70% of gyro movement allowed.
41_VIDEO_STABILIZATION_MODE = 1
42
43
44def _collect_data(cam, dut, lens_facing, log_path,
45                  aspect_ratio, rot_rig, servo_speed, serial_port):
46  """Capture a new set of data from the device.
47
48  Captures camera frames while the device is being rotated in the prescribed
49  manner.
50
51  Args:
52    cam: Camera object.
53    dut: An Android controller device object.
54    lens_facing: str; Facing of camera.
55    log_path: str; Log path where video will be saved.
56    aspect_ratio: str; Key string for video aspect ratio defined by JCA.
57    rot_rig: dict with 'cntl' and 'ch' defined.
58    servo_speed: int; Speed of servo motor.
59    serial_port: str; Serial port for Arduino controller.
60
61  Returns:
62    output path: Output path for the recording.
63  """
64  logging.debug('Starting sensor event collection for %s', aspect_ratio)
65  ui_interaction_utils.do_jca_video_setup(
66      dut,
67      log_path,
68      facing=lens_facing,
69      aspect_ratio=aspect_ratio,
70  )
71  # Start camera movement.
72  movement = threading.Thread(
73      target=sensor_fusion_utils.rotation_rig,
74      args=(
75          rot_rig['cntl'],
76          rot_rig['ch'],
77          _NUM_ROTATIONS,
78          sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION,
79          servo_speed,
80          sensor_fusion_utils.ARDUINO_MOVE_TIME_STABILIZATION,
81          serial_port,
82      ),
83  )
84  movement.start()
85  cam.start_sensor_events()
86  logging.debug('Gyro Sensor recording started')
87  # Record video with JCA.
88  time.sleep(_VIDEO_DELAY_TIME)  # Allow time for rig to start moving.
89  recording_path = pathlib.Path(
90      cam.do_jca_video_capture(
91          dut,
92          log_path,
93          duration=_VIDEO_DURATION * 1000,  # ms
94      )
95  )
96  prefix_name = str(recording_path).split('.')[0]
97  ratio_name = aspect_ratio.replace(' ', '_')
98  output_path = (f'{prefix_name}_{ratio_name}.mp4')
99  os.rename(recording_path, output_path)
100  logging.debug('Output path for recording %s : %s', aspect_ratio, output_path)
101  # Wait for movement to stop.
102  movement.join()
103
104  return output_path
105
106
107def _extract_frames_from_video(log_path, recording_path):
108  """Extract frames from video.
109
110  Extract frames from video and convert to numpy array.
111
112  Args:
113    log_path: str; Directory where video is saved.
114    recording_path: str; Path to video file.
115
116  Returns:
117    frames: List of numpy arrays.
118    frame_shape: Tuple of frame shape.
119  """
120  file_name = str(recording_path).split('/')[-1]
121  logging.debug('file_name: %s', file_name)
122  file_list = video_processing_utils.extract_all_frames_from_video(
123      log_path, file_name, _IMG_FORMAT)
124  frames = []
125  logging.debug('Number of frames %d', len(file_list))
126  for file in file_list:
127    img = image_processing_utils.convert_image_to_numpy_array(
128        os.path.join(log_path, file))
129    frames.append(img/255)
130  frame_shape = frames[0].shape
131  logging.debug('Frame size %d x %d', frame_shape[1], frame_shape[0])
132  return frames, frame_shape
133
134
135def _extract_camera_gyro_rotations(
136    lens_facing, frames, frame_shape, gyro_events, log_path, ratio_tested):
137  """Extract camera and gyro rotations from frames and gyro events.
138
139  Args:
140    lens_facing: str; Facing of camera.
141    frames: List of numpy arrays.
142    frame_shape: Tuple of frame shape.
143    gyro_events: List of gyro events.
144    log_path: str; Directory where video is saved.
145    ratio_tested: str; Key string for video aspect ratio defined by JCA.
146
147  Returns:
148    max_gyro_angle: float; Max angle of deflection in gyroscope movement.
149    max_camera_angle: float; Max angle of deflection in video movement.
150    frame_shape: Tuple of frame shape.
151    ratio_name: str; Name of ratio tested for logging.
152  """
153  ratio_name = ratio_tested.replace(' ', '_')
154  file_name_stem = f'{os.path.join(log_path, _NAME)}_{ratio_name}'
155  cam_rots = sensor_fusion_utils.get_cam_rotations(
156      frames[_START_FRAME:], lens_facing, frame_shape[0],
157      file_name_stem, _START_FRAME, stabilized_video=True)
158  sensor_fusion_utils.plot_camera_rotations(
159      cam_rots, _START_FRAME, ratio_name, file_name_stem)
160  max_camera_angle = sensor_fusion_utils.calc_max_rotation_angle(
161      cam_rots, 'Camera')
162
163  # Extract gyro rotations.
164  sensor_fusion_utils.plot_gyro_events(
165      gyro_events, f'{_NAME}_{ratio_name}', log_path)
166  gyro_rots = sensor_fusion_utils.conv_acceleration_to_movement(
167      gyro_events, _VIDEO_DELAY_TIME)
168  max_gyro_angle = sensor_fusion_utils.calc_max_rotation_angle(
169      gyro_rots, 'Gyro')
170  logging.debug(
171      'Max deflection (degrees) %s: video: %.3f, gyro: %.3f, ratio: %.4f',
172      ratio_tested, max_camera_angle, max_gyro_angle,
173      max_camera_angle / max_gyro_angle)
174  return max_gyro_angle, max_camera_angle, frame_shape, ratio_name
175
176
177def _initialize_rotation_rig(rot_rig, rotator_cntl, rotator_ch):
178  """Initialize rotation rig.
179
180  Args:
181    rot_rig: dict with 'cntl' and 'ch' defined
182    rotator_cntl: str; controller type
183    rotator_ch: str; controller channel
184
185  Returns:
186    rot_rig: updated dict with 'cntl' and 'ch' defined
187  """
188  rot_rig['cntl'] = rotator_cntl
189  rot_rig['ch'] = rotator_ch
190  if rot_rig['cntl'].lower() != sensor_fusion_utils.ARDUINO_STRING.lower():
191    raise AssertionError(f'You must use an arduino controller for {_NAME}.')
192  logging.debug('video qualities tested: %s', str(
193      ui_interaction_utils.RATIO_TO_UI_DESCRIPTION.keys()))
194  return rot_rig
195
196
197def _initialize_servo_controller(tablet_device, rot_rig):
198  """Initialize controller.
199
200  Args:
201    tablet_device: bool; True if tablet device is connected.
202    rot_rig: dict with 'cntl' and 'ch' defined.
203
204  Returns:
205    serial_port: str; Serial port for Arduino controller.
206    servo_speed: int; Speed of servo motor.
207  """
208  serial_port = None
209  if rot_rig['cntl'].lower() == sensor_fusion_utils.ARDUINO_STRING.lower():
210    # Identify port.
211    serial_port = sensor_fusion_utils.serial_port_def(
212        sensor_fusion_utils.ARDUINO_STRING)
213    # Send test cmd to arduino until cmd returns properly.
214    sensor_fusion_utils.establish_serial_comm(serial_port)
215  if tablet_device:
216    servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION_TABLET
217  else:
218    servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION
219  return serial_port, servo_speed
220
221
222class VideoStabilizationJCATest(its_base_test.UiAutomatorItsBaseTest):
223  """Tests if video is stabilized.
224
225  Camera is moved in sensor fusion rig on an arc of 15 degrees.
226  Speed is set to mimic hand movement and not be too fast.
227  Video is captured after rotation rig starts moving, and the
228  gyroscope data is dumped.
229
230  Video is processed to dump all of the frames to PNG files.
231  Camera movement is extracted from frames by determining max
232  angle of deflection in video movement vs max angle of deflection
233  in gyroscope movement. Test is a PASS if rotation is reduced in video.
234  """
235
236  def setup_class(self):
237    super().setup_class()
238    self.ui_app = _JETPACK_CAMERA_APP_PACKAGE_NAME
239    # Restart CtsVerifier to ensure that correct flags are set.
240    ui_interaction_utils.force_stop_app(
241        self.dut, its_base_test.CTS_VERIFIER_PKG)
242    self.dut.adb.shell(
243        'am start -n com.android.cts.verifier/.CtsVerifierActivity')
244
245  def teardown_test(self):
246    ui_interaction_utils.force_stop_app(self.dut, self.ui_app)
247
248  def test_video_stabilization_jca(self):
249    rot_rig = {}
250    log_path = self.log_path
251
252    with its_session_utils.ItsSession(
253        device_id=self.dut.serial,
254        camera_id=self.camera_id,
255        hidden_physical_id=self.hidden_physical_id) as cam:
256      props = cam.get_camera_properties()
257      props = cam.override_with_hidden_physical_camera_props(props)
258
259      # Close camera after props retrieved so that ItsTestActivity can open it.
260      cam.close_camera()
261
262      first_api_level = its_session_utils.get_first_api_level(self.dut.serial)
263      supported_stabilization_modes = props[
264          'android.control.availableVideoStabilizationModes']
265
266      camera_properties_utils.skip_unless(
267          first_api_level >= its_session_utils.ANDROID16_API_LEVEL and
268          _VIDEO_STABILIZATION_MODE in supported_stabilization_modes)
269
270      # Log ffmpeg version being used.
271      video_processing_utils.log_ffmpeg_version()
272
273      # Raise error if not FRONT or REAR facing camera.
274      lens_facing = props['android.lens.facing']
275      camera_properties_utils.check_front_or_rear_camera(props)
276
277      # Initialize rotation rig.
278      rot_rig = _initialize_rotation_rig(
279          rot_rig, self.rotator_cntl, self.rotator_ch)
280      # Initialize connection with controller.
281      serial_port, servo_speed = _initialize_servo_controller(
282          self.tablet_device, rot_rig)
283      max_cam_gyro_angles = {}
284
285      for ratio_tested in ui_interaction_utils.RATIO_TO_UI_DESCRIPTION.keys():
286        # Record video.
287        recording_path = _collect_data(
288            cam, self.dut, lens_facing, log_path, ratio_tested,
289            rot_rig, servo_speed, serial_port
290        )
291
292        # Get gyro events.
293        logging.debug('Reading out inertial sensor events')
294        gyro_events = cam.get_sensor_events()['gyro']
295        logging.debug('Number of gyro samples %d', len(gyro_events))
296
297        # Extract frames from video.
298        frames, frame_shape = _extract_frames_from_video(
299            log_path, recording_path)
300        # Extract camera and gyro rotations.
301        max_gyro_angle, max_camera_angle, frame_shape, ratio_name = (
302            _extract_camera_gyro_rotations(lens_facing, frames, frame_shape,
303                                           gyro_events, log_path, ratio_tested))
304        max_cam_gyro_angles[ratio_name] = {'gyro': max_gyro_angle,
305                                           'cam': max_camera_angle,
306                                           'frame_shape': frame_shape}
307        # Assert phone is moved enough during test.
308        if max_gyro_angle < _MIN_PHONE_MOVEMENT_ANGLE:
309          raise AssertionError(
310              f'Phone not moved enough! Movement: {max_gyro_angle}, '
311              f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees')
312
313      # Assert PASS/FAIL criteria.
314      test_failures = []
315      for ratio_name, max_angles in max_cam_gyro_angles.items():
316        aspect_ratio = (max_angles['frame_shape'][1] /
317                        max_angles['frame_shape'][0])
318        if aspect_ratio > _ASPECT_RATIO_16_9:
319          video_stabilization_factor = _VIDEO_STABILIZATION_FACTOR * 1.1
320        else:
321          video_stabilization_factor = _VIDEO_STABILIZATION_FACTOR
322        if max_angles['cam'] >= max_angles['gyro']*video_stabilization_factor:
323          test_failures.append(
324              f'{ratio_name} video not stabilized enough! '
325              f"Max video angle:  {max_angles['cam']:.3f}, "
326              f"Max gyro angle: {max_angles['gyro']:.3f}, "
327              f"ratio: {max_angles['cam']/max_angles['gyro']:.3f} "
328              f'THRESH: {video_stabilization_factor}.')
329        else:  # Remove frames if PASS.
330          its_session_utils.remove_tmp_files(log_path, 'ITS_JCA_*')
331      if test_failures:
332        raise AssertionError(test_failures)
333
334
335if __name__ == '__main__':
336  test_runner.main()
337