1# Copyright 2023 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Verifies advertised FPS from device as webcam."""
15
16import errno
17import fcntl
18import glob
19import logging
20import mmap
21import os
22import time
23import v4l2
24
25_DEVICE_NAME = 'android' # TODO b/277159494
26_TEST_DURATION_SECONDS = 10
27_WAIT_MS = 10000  # 10 seconds
28_REQUEST_BUFFER_COUNT = 10
29_VIDEO_DEVICES_PATH = '/dev/video*'
30
31
32def v4l2_fourcc_to_str(fourcc):
33  return ''.join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
34
35
36def initialize_device_path():
37  """Returns the device path of the device to be tested.
38
39  Returns:
40    Device path, /dev/video*, of device to be tested
41  """
42  device_path = ''
43  selected_device = False
44
45  video_devices = glob.glob(_VIDEO_DEVICES_PATH)
46
47  for current_device_path in video_devices:
48    try:
49      video_device = os.open(current_device_path, os.O_RDWR | os.O_NONBLOCK)
50      caps = v4l2.v4l2_capability()
51      ioctl_retry_error(video_device, v4l2.VIDIOC_QUERYCAP,
52                        caps, OSError, errno.EBUSY)
53
54      if (_DEVICE_NAME in caps.card.lower().decode('utf-8') and
55          not selected_device and
56          caps.capabilities & v4l2.V4L2_CAP_VIDEO_CAPTURE):
57        # Devices can mount multiple nodes at /dev/video*
58        # Check for one that is used for capturing by finding
59        # if formats can be retrieved from it
60        while True:
61          try:
62            fmtdesc = v4l2.v4l2_fmtdesc()
63            fmtdesc.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
64            ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FMT,
65                              fmtdesc, OSError, errno.EBUSY)
66          except OSError:
67            break
68          else:
69            selected_device = True
70            device_path = current_device_path
71            break
72
73      os.close(video_device)
74    except OSError:
75      pass
76
77  return device_path
78
79
80def initialize_formats_and_resolutions(video_device):
81  """Gets a list of the supported formats, resolutions and frame rates for the device.
82
83  Args:
84    video_device: Device to be checked
85
86  Returns:
87    List of formats, resolutions, and frame rates:
88      [ (Format (fmtdesc), [ (Resolution (frmsize),
89          [ FrameRates (v4l2_frmivalenum) ]) ]) ]
90  """
91  # [(Format (fmtdesc),[(Resolution(frmsize),[FrameRates(v4l2_frmivalenum)])])]
92  formats_and_resolutions = []
93
94  # Retrieve supported formats
95  format_index = 0
96  while True:
97    try:
98      fmtdesc = v4l2.v4l2_fmtdesc()
99      fmtdesc.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
100      fmtdesc.index = format_index
101      ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FMT,
102                        fmtdesc, OSError, errno.EBUSY)
103    except OSError:
104      break
105    else:
106      formats_and_resolutions.append((fmtdesc, []))
107      format_index += 1
108
109  # Use the found formats to retrieve the supported
110  # resolutions per format
111  for index, elem in enumerate(formats_and_resolutions):
112    fmtdesc = elem[0]
113    frmsize_index = 0
114
115    while True:
116      try:
117        frmsize = v4l2.v4l2_frmsizeenum()
118        frmsize.pixel_format = fmtdesc.pixelformat
119        frmsize.index = frmsize_index
120        ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FRAMESIZES,
121                          frmsize, OSError, errno.EBUSY)
122      except OSError:
123        break
124      else:
125        if frmsize.type == v4l2.V4L2_FRMSIZE_TYPE_DISCRETE:
126          formats_and_resolutions[index][1].append((frmsize, []))
127        frmsize_index += 1
128
129  # Get advertised frame rates supported per format and resolution
130  for format_index, elem in enumerate(formats_and_resolutions):
131    fmtdesc = elem[0]
132    frmsize_list = elem[1]
133
134    for frmsize_index, frmsize_elem in enumerate(frmsize_list):
135      curr_frmsize = frmsize_elem[0]
136      frmival_index = 0
137      while True:
138        try:
139          frmivalenum = v4l2.v4l2_frmivalenum()
140          frmivalenum.index = frmival_index
141          frmivalenum.pixel_format = fmtdesc.pixelformat
142          frmivalenum.width = curr_frmsize.discrete.width
143          frmivalenum.height = curr_frmsize.discrete.height
144          ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FRAMEINTERVALS,
145                            frmivalenum, OSError, errno.EBUSY)
146        except OSError:
147          break
148        else:
149          formats_and_resolutions[format_index][1][
150              frmsize_index][1].append(frmivalenum)
151          frmival_index += 1
152
153  return formats_and_resolutions
154
155
156def print_formats_and_resolutions(formats_and_resolutions):
157  """Helper function to print out device capabilities for debugging.
158
159  Args:
160    formats_and_resolutions: List to be printed
161  """
162  for elem in formats_and_resolutions:
163    fmtdesc = elem[0]
164    print(f"""Format - {fmtdesc.description},
165        {fmtdesc.pixelformat} ({v4l2_fourcc_to_str(fmtdesc.pixelformat)})""")
166    frmsize_list = elem[1]
167    for frmsize_elem in frmsize_list:
168      frmsize = frmsize_elem[0]
169      print(f'-Resolution: {frmsize.discrete.width}x{frmsize.discrete.height}')
170      frmivalenum_list = frmsize_elem[1]
171      for frmivalenum in frmivalenum_list:
172        print(f"""\t{fmtdesc.description} ({fmtdesc.pixelformat}),
173            {frmivalenum.discrete.denominator / frmivalenum.discrete.numerator}
174            fps""")
175
176
177def ioctl_retry_error(video_device, request, arg, error, errno_code):
178  """Adds wait check for specified ioctl call.
179
180  Args:
181    video_device: the device the ioctl call will interface with
182    request: request for the ioctl call
183    arg: arguments for ioctl
184    error: the error to be catched and waited on
185    errno_code: errno code of error to be waited on
186  """
187  wait_time = _WAIT_MS
188  while True:
189    try:
190      fcntl.ioctl(video_device, request, arg)
191      break
192    except error as e:
193      # if the error is a blocking I/O error, wait a short time and try again
194      if e.errno == errno_code and wait_time >= 0:
195        time.sleep(0.01)  # wait for 10 milliseconds
196        wait_time -= 10
197        continue
198      else:
199        raise  # otherwise, re-raise the exception
200
201
202def setup_for_test_fps(video_device, formats_and_resolutions):
203  """Sets up and calls fps test for device.
204
205  Args:
206    video_device: device to be tested
207    formats_and_resolutions: device capabilities to be tested
208
209  Returns:
210    List of fps test results with expected fps and actual tested fps
211      [ (Expected, Actual )]
212  """
213  res = []
214  for elem in formats_and_resolutions:
215    fmtdesc = elem[0]
216
217    fmt = v4l2.v4l2_format()
218    fmt.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
219    fmt.fmt.pix.pixelformat = fmtdesc.pixelformat
220
221    frmsize_list = elem[1]
222    for frmsize_elem in frmsize_list:
223      frmsize = frmsize_elem[0]
224      fmt.fmt.pix.width = frmsize.discrete.width
225      fmt.fmt.pix.height = frmsize.discrete.height
226
227      ioctl_retry_error(video_device, v4l2.VIDIOC_S_FMT, fmt,
228                        OSError, errno.EBUSY)
229
230      ioctl_retry_error(video_device, v4l2.VIDIOC_G_FMT, fmt,
231                        OSError, errno.EBUSY)
232
233      frmivalenum_list = frmsize_elem[1]
234      for frmivalenum_elem in frmivalenum_list:
235        streamparm = v4l2.v4l2_streamparm()
236        streamparm.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
237        streamparm.parm.capture.timeperframe.numerator = (
238            frmivalenum_elem.discrete.numerator)
239        streamparm.parm.capture.timeperframe.denominator = (
240            frmivalenum_elem.discrete.denominator)
241        ioctl_retry_error(video_device, v4l2.VIDIOC_S_PARM, streamparm,
242                          OSError, errno.EBUSY)
243
244        res.append((frmivalenum_elem.discrete.denominator,
245                    test_fps(video_device,
246                             frmivalenum_elem.discrete.denominator)))
247  return res
248
249
250def test_fps(video_device, fps):
251  """Runs fps test.
252
253  Args:
254    video_device: device to be tested
255    fps: fps being tested
256
257  Returns:
258    Actual fps achieved from device
259  """
260  # Request buffers
261  req = v4l2.v4l2_requestbuffers()
262  req.count = _REQUEST_BUFFER_COUNT
263  req.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
264  req.memory = v4l2.V4L2_MEMORY_MMAP
265
266  ioctl_retry_error(video_device, v4l2.VIDIOC_REQBUFS, req,
267                    OSError, errno.EBUSY)
268
269  buffers = []
270  for i in range(req.count):
271    buf = v4l2.v4l2_buffer()
272    buf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE
273    buf.memory = v4l2.V4L2_MEMORY_MMAP
274    buf.index = i
275
276    ioctl_retry_error(video_device, v4l2.VIDIOC_QUERYBUF, buf,
277                      OSError, errno.EBUSY)
278
279    buf.buffer = mmap.mmap(video_device,
280                           buf.length, mmap.PROT_READ,
281                           mmap.MAP_SHARED, offset=buf.m.offset)
282    buffers.append(buf)
283    ioctl_retry_error(video_device, v4l2.VIDIOC_QBUF, buf,
284                      OSError, errno.EBUSY)
285
286  # Stream on
287  buf_type = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE)
288  ioctl_retry_error(video_device, v4l2.VIDIOC_STREAMON, buf_type,
289                    OSError, errno.EBUSY)
290
291  # Test FPS
292  num_frames = fps * _TEST_DURATION_SECONDS
293  start_time = time.time()
294
295  for x in range(num_frames):
296    buf = buffers[x % _REQUEST_BUFFER_COUNT]
297    ioctl_retry_error(video_device, v4l2.VIDIOC_DQBUF, buf,
298                      BlockingIOError, errno.EWOULDBLOCK)
299    ioctl_retry_error(video_device, v4l2.VIDIOC_QBUF, buf,
300                      OSError, errno.EBUSY)
301
302  end_time = time.time()
303  elapsed_time = end_time - start_time
304  fps_res = num_frames / elapsed_time
305
306  # Stream off and clean up
307  ioctl_retry_error(video_device, v4l2.VIDIOC_STREAMOFF, buf_type,
308                    OSError, errno.EBUSY)
309  req.count = 0
310  ioctl_retry_error(video_device, v4l2.VIDIOC_REQBUFS, req,
311                    OSError, errno.EBUSY)
312
313  for buf in buffers:
314    buf.buffer.close()
315
316  return fps_res
317
318
319def main():
320  # Open the webcam device
321  device_path = initialize_device_path()
322  if not device_path:
323    logging.error('Supported device not found!')
324    return []
325
326  try:
327    video_device = os.open(device_path, os.O_RDWR | os.O_NONBLOCK)
328  except Exception as e:
329    print(f'Error: failed to open device {device_path}: error {e}')
330    return []
331
332  formats_and_resolutions = initialize_formats_and_resolutions(video_device)
333  if not formats_and_resolutions:
334    logging.error('Error retrieving formats and resolutions')
335    return []
336
337  res = setup_for_test_fps(video_device, formats_and_resolutions)
338
339  os.close(video_device)
340
341  return res
342
343if __name__ == '__main__':
344  main()
345