xref: /aosp_15_r20/cts/apps/CameraITS/utils/video_processing_utils.py (revision b7c941bb3fa97aba169d73cee0bed2de8ac964bf)
1# Copyright 2022 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utility functions for processing video recordings.
15"""
16# Each item in this list corresponds to quality levels defined per
17# CamcorderProfile. For Video ITS, we will currently test below qualities
18# only if supported by the camera device.
19
20
21import dataclasses
22import logging
23import math
24import os.path
25import re
26import subprocess
27import error_util
28import image_processing_utils
29
30
31COLORSPACE_HDR = 'bt2020'
32HR_TO_SEC = 3600
33INDEX_FIRST_SUBGROUP = 1
34MIN_TO_SEC = 60
35
36ITS_SUPPORTED_QUALITIES = (
37    'HIGH',
38    '2160P',
39    '1080P',
40    '720P',
41    '480P',
42    'CIF',
43    'QCIF',
44    'QVGA',
45    'LOW',
46    'VGA'
47)
48
49LOW_RESOLUTION_SIZES = (
50    '176x144',
51    '192x144',
52    '352x288',
53    '384x288',
54    '320x240',
55)
56
57LOWEST_RES_TESTED_AREA = 640*360
58
59VIDEO_QUALITY_SIZE = {
60    # '480P', '1080P', HIGH' & 'LOW' are not included as they are DUT-dependent
61    '2160P': '3840x2160',
62    '720P': '1280x720',
63    'VGA': '640x480',
64    'CIF': '352x288',
65    'QVGA': '320x240',
66    'QCIF': '176x144',
67}
68
69
70@dataclasses.dataclass
71class CommonPreviewSizeData:
72  """Class to store smallest and largest common sizes of preview and video."""
73  smallest_size: str
74  smallest_quality: str
75  largest_size: str
76  largest_quality: str
77
78
79def get_preview_video_sizes_union(cam, camera_id, min_area=0):
80  """Returns largest and smallest common size and quality of preview and video.
81
82  Args:
83    cam: camera object.
84    camera_id: str; camera ID.
85    min_area: int; Optional filter to eliminate smaller sizes (ex. 640*480).
86
87  Returns:
88    common_size_quality, CommonPreviewSizeData class
89  """
90  supported_preview_sizes = set(cam.get_all_supported_preview_sizes(camera_id))
91  supported_video_qualities = cam.get_supported_video_qualities(camera_id)
92  logging.debug('Supported video profiles & IDs: %s', supported_video_qualities)
93
94  # Make dictionary on video quality and size according to compatibility
95  supported_video_size_to_quality = {}
96  for quality in supported_video_qualities:
97    video_quality = quality.split(':')[0]
98    if video_quality in VIDEO_QUALITY_SIZE:
99      video_size = VIDEO_QUALITY_SIZE[video_quality]
100      supported_video_size_to_quality[video_size] = video_quality
101  logging.debug(
102      'Supported video size to quality: %s', supported_video_size_to_quality
103  )
104  # Find the intersection of supported preview sizes and video sizes
105  common_sizes = supported_preview_sizes.intersection(
106      supported_video_size_to_quality.keys()
107  )
108  if not common_sizes:
109    raise AssertionError('No common size between Preview and Video!')
110  # Filter common sizes based on min_area
111  size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
112  common_sizes = (
113      [size for size in common_sizes if size_to_area(size) >= min_area]
114  )
115  if not common_sizes:
116    raise AssertionError(
117        'No common size above min_area between Preview and Video!'
118    )
119  # Use areas of video sizes to find the smallest and largest common size
120  smallest_common_size = min(common_sizes, key=size_to_area)
121  largest_common_size = max(common_sizes, key=size_to_area)
122  logging.debug('Smallest common size: %s', smallest_common_size)
123  logging.debug('Largest common size: %s', largest_common_size)
124  # Find video quality of resolution with resolution as key
125  smallest_common_quality = (
126      supported_video_size_to_quality[smallest_common_size]
127  )
128  logging.debug('Smallest common quality: %s', smallest_common_quality)
129  largest_common_quality = supported_video_size_to_quality[largest_common_size]
130  logging.debug('Largest common quality: %s', largest_common_quality)
131  common_size_quality = CommonPreviewSizeData(
132      smallest_size=smallest_common_size,
133      smallest_quality=smallest_common_quality,
134      largest_size=largest_common_size,
135      largest_quality=largest_common_quality
136  )
137  return common_size_quality
138
139
140def clamp_preview_sizes(preview_sizes, min_area=0, max_area=math.inf):
141  """Returns a list of preview_sizes with areas between min/max_area.
142
143  Args:
144    preview_sizes: list; sizes to be filtered (ex. "1280x720")
145    min_area: int; optional filter to eliminate sizes <= to the specified
146        area (ex. 640*480).
147    max_area: int; optional filter to eliminate sizes >= to the specified
148        area (ex. 3840*2160).
149  Returns:
150    preview_sizes: list; filtered preview sizes clamped by min/max_area.
151  """
152  size_to_area = lambda size: int(size.split('x')[0])*int(size.split('x')[1])
153  filtered_preview_sizes = [
154      size for size in preview_sizes
155      if max_area >= size_to_area(size) >= min_area]
156  logging.debug('Filtered preview sizes: %s', filtered_preview_sizes)
157  if not filtered_preview_sizes:
158    raise AssertionError(f'No preview sizes between {min_area} and {max_area}')
159  return filtered_preview_sizes
160
161
162def log_ffmpeg_version():
163  """Logs the ffmpeg version being used."""
164
165  ffmpeg_version_cmd = ('ffmpeg -version')
166  p = subprocess.Popen(ffmpeg_version_cmd, shell=True, stdout=subprocess.PIPE)
167  output, _ = p.communicate()
168  if p.poll() != 0:
169    raise error_util.CameraItsError('Error running ffmpeg version cmd.')
170  decoded_output = output.decode('utf-8')
171  logging.debug('ffmpeg version: %s', decoded_output.split(' ')[2])
172
173
174def extract_key_frames_from_video(log_path, video_file_name):
175  """Returns a list of extracted key frames.
176
177  Ffmpeg tool is used to extract key frames from the video at path
178  os.path.join(log_path, video_file_name).
179  The extracted key frames will have the name video_file_name with "_key_frame"
180  suffix to identify the frames for video of each quality. Since there can be
181  multiple key frames, each key frame image will be differentiated with its
182  frame index. All the extracted key frames will be available in jpeg format
183  at the same path as the video file.
184
185  The run time flag '-loglevel quiet' hides the information from terminal.
186  In order to see the detailed output of ffmpeg command change the loglevel
187  option to 'info'.
188
189  Args:
190    log_path: path for video file directory.
191    video_file_name: name of the video file.
192  Returns:
193    key_frame_files: a sorted list of files which contains a name per key
194      frame. Ex: VID_20220325_050918_0_preview_1920x1440_key_frame_0001.png
195  """
196  ffmpeg_image_name = f'{os.path.splitext(video_file_name)[0]}_key_frame'
197  ffmpeg_image_file_path = os.path.join(
198      log_path, ffmpeg_image_name + '_%04d.png')
199  cmd = ['ffmpeg',
200         '-skip_frame',
201         'nokey',
202         '-i',
203         os.path.join(log_path, video_file_name),
204         '-vsync',
205         'vfr',
206         '-frame_pts',
207         'true',
208         ffmpeg_image_file_path,
209         '-loglevel',
210         'quiet',
211        ]
212  logging.debug('Extracting key frames from: %s', video_file_name)
213  _ = subprocess.call(cmd,
214                      stdin=subprocess.DEVNULL,
215                      stdout=subprocess.DEVNULL,
216                      stderr=subprocess.DEVNULL)
217  arr = os.listdir(os.path.join(log_path))
218  key_frame_files = []
219  for file in arr:
220    if '.png' in file and not os.path.isdir(file) and ffmpeg_image_name in file:
221      key_frame_files.append(file)
222  key_frame_files.sort()
223  logging.debug('Extracted key frames: %s', key_frame_files)
224  logging.debug('Length of key_frame_files: %d', len(key_frame_files))
225  if not key_frame_files:
226    raise AssertionError('No key frames extracted. Check source video.')
227
228  return key_frame_files
229
230
231def get_key_frame_to_process(key_frame_files):
232  """Returns the key frame file from the list of key_frame_files.
233
234  If the size of the list is 1 then the file in the list will be returned else
235  the file with highest frame_index will be returned for further processing.
236
237  Args:
238    key_frame_files: A list of key frame files.
239  Returns:
240    key_frame_file to be used for further processing.
241  """
242  if not key_frame_files:
243    raise AssertionError('key_frame_files list is empty.')
244  key_frame_files.sort()
245  return key_frame_files[-1]
246
247
248def extract_all_frames_from_video(
249    log_path, video_file_name, img_format, video_fps=None):
250  """Extracts and returns a list of frames from a video using FFmpeg.
251
252  Extract all frames from the video at path <log_path>/<video_file_name>.
253  The extracted frames will have the name video_file_name with "_frame"
254  suffix to identify the frames for video of each size. Each frame image
255  will be differentiated with its frame index. All extracted rames will be
256  available in the provided img_format format at the same path as the video.
257
258  The run time flag '-loglevel quiet' hides the information from terminal.
259  In order to see the detailed output of ffmpeg command change the loglevel
260  option to 'info'.
261
262  Args:
263    log_path: str; directory containing video file.
264    video_file_name: str; name of the video file.
265    img_format: str; desired image format for export frames. ex. 'png'
266    video_fps: str; fps of imported video.
267  Returns:
268    an ordered list of paths to the extracted frame images.
269  """
270  logging.debug('Extracting all frames')
271  ffmpeg_image_name = f"{video_file_name.split('.')[0]}_frame"
272  logging.debug('ffmpeg_image_name: %s', ffmpeg_image_name)
273  ffmpeg_image_file_names = (
274      f'{os.path.join(log_path, ffmpeg_image_name)}_%04d.{img_format}')
275  if video_fps:
276    cmd = [
277        'ffmpeg', '-i', os.path.join(log_path, video_file_name),
278        '-r', video_fps,  # force a constant frame rate for reliability
279        ffmpeg_image_file_names, '-loglevel', 'quiet'
280    ]
281  else:
282    cmd = [
283        'ffmpeg', '-i', os.path.join(log_path, video_file_name),
284        '-vsync', 'passthrough',  # prevents frame drops during decoding
285        ffmpeg_image_file_names, '-loglevel', 'quiet'
286    ]
287  subprocess.call(cmd,
288                  stdin=subprocess.DEVNULL,
289                  stdout=subprocess.DEVNULL,
290                  stderr=subprocess.DEVNULL)
291
292  files = sorted(
293      [file for file in os.listdir(log_path) if
294       (file.endswith(img_format) and ffmpeg_image_name in file)])
295  if not files:
296    raise AssertionError('No frames extracted. Check source video.')
297
298  return files
299
300
301def extract_last_key_frame_from_recording(log_path, file_name):
302  """Extract last key frame from recordings.
303
304  Args:
305    log_path: str; file location
306    file_name: str file name for saved video
307
308  Returns:
309    numpy image of last key frame
310  """
311  key_frame_files = extract_key_frames_from_video(log_path, file_name)
312  logging.debug('key_frame_files: %s', key_frame_files)
313
314  # Get the last_key_frame file to process.
315  last_key_frame_file = get_key_frame_to_process(key_frame_files)
316  logging.debug('last_key_frame: %s', last_key_frame_file)
317
318  # Convert last_key_frame to numpy array
319  np_image = image_processing_utils.convert_image_to_numpy_array(
320      os.path.join(log_path, last_key_frame_file))
321  logging.debug('last key frame image shape: %s', np_image.shape)
322
323  return np_image
324
325
326def get_avg_frame_rate(video_file_name_with_path):
327  """Get average frame rate assuming variable frame rate video.
328
329  Args:
330    video_file_name_with_path: path to the video to be analyzed
331  Returns:
332    Float. average frames per second.
333  """
334
335  cmd = ['ffprobe',
336         '-v',
337         'quiet',
338         '-show_streams',
339         '-select_streams',
340         'v:0',  # first video stream
341         video_file_name_with_path
342        ]
343  logging.debug('Getting frame rate')
344  raw_output = ''
345  try:
346    raw_output = subprocess.check_output(cmd,
347                                         stdin=subprocess.DEVNULL,
348                                         stderr=subprocess.STDOUT)
349  except subprocess.CalledProcessError as e:
350    raise AssertionError(str(e.output)) from e
351  if raw_output:
352    output = str(raw_output.decode('utf-8')).strip()
353    logging.debug('ffprobe command %s output: %s', ' '.join(cmd), output)
354    average_frame_rate_data = (
355        re.search(r'avg_frame_rate=*([0-9]+/[0-9]+)', output)
356        .group(INDEX_FIRST_SUBGROUP)
357    )
358    average_frame_rate = (int(average_frame_rate_data.split('/')[0]) /
359                          int(average_frame_rate_data.split('/')[1]))
360    logging.debug('Average FPS: %.4f', average_frame_rate)
361    return average_frame_rate
362  else:
363    raise AssertionError('ffprobe failed to provide frame rate data')
364
365
366def get_frame_deltas(video_file_name_with_path, timestamp_type='pts'):
367  """Get list of time diffs between frames.
368
369  Args:
370    video_file_name_with_path: path to the video to be analyzed
371    timestamp_type: 'pts' or 'dts'
372  Returns:
373    List of floats. Time diffs between frames in seconds.
374  """
375
376  cmd = ['ffprobe',
377         '-show_entries',
378         f'frame=pkt_{timestamp_type}_time',
379         '-select_streams',
380         'v',
381         video_file_name_with_path
382         ]
383  logging.debug('Getting frame deltas')
384  raw_output = ''
385  try:
386    raw_output = subprocess.check_output(cmd,
387                                         stdin=subprocess.DEVNULL,
388                                         stderr=subprocess.STDOUT)
389  except subprocess.CalledProcessError as e:
390    raise AssertionError(str(e.output)) from e
391  if raw_output:
392    output = str(raw_output.decode('utf-8')).strip().split('\n')
393    deltas = []
394    prev_time = None
395    for line in output:
396      if timestamp_type not in line:
397        continue
398      curr_time = float(re.search(r'time= *([0-9][0-9\.]*)', line)
399                        .group(INDEX_FIRST_SUBGROUP))
400      if prev_time is not None:
401        deltas.append(curr_time - prev_time)
402      prev_time = curr_time
403    logging.debug('Frame deltas: %s', deltas)
404    return deltas
405  else:
406    raise AssertionError('ffprobe failed to provide frame delta data')
407
408
409def get_video_colorspace(log_path, video_file_name):
410  """Get the video colorspace.
411
412  Args:
413    log_path: path for video file directory
414    video_file_name: name of the video file
415  Returns:
416    video colorspace, e.g. BT.2020 or BT.709
417  """
418
419  cmd = ['ffprobe',
420         '-show_streams',
421         '-select_streams',
422         'v:0',
423         '-of',
424         'json',
425         '-i',
426         os.path.join(log_path, video_file_name)
427         ]
428  logging.debug('Get the video colorspace')
429  raw_output = ''
430  try:
431    raw_output = subprocess.check_output(cmd,
432                                         stdin=subprocess.DEVNULL,
433                                         stderr=subprocess.STDOUT)
434  except subprocess.CalledProcessError as e:
435    raise AssertionError(str(e.output)) from e
436
437  logging.debug('raw_output: %s', raw_output)
438  if raw_output:
439    colorspace = ''
440    output = str(raw_output.decode('utf-8')).strip().split('\n')
441    logging.debug('output: %s', output)
442    for line in output:
443      logging.debug('line: %s', line)
444      metadata = re.search(r'"color_space": ("[a-z0-9]*")', line)
445      if metadata:
446        colorspace = metadata.group(INDEX_FIRST_SUBGROUP)
447    logging.debug('Colorspace: %s', colorspace)
448    return colorspace
449  else:
450    raise AssertionError('ffprobe failed to provide color space')
451