1# Copyright 2023 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Verifies advertised FPS from device as webcam.""" 15 16import errno 17import fcntl 18import glob 19import logging 20import mmap 21import os 22import time 23import v4l2 24 25_DEVICE_NAME = 'android' # TODO b/277159494 26_TEST_DURATION_SECONDS = 10 27_WAIT_MS = 10000 # 10 seconds 28_REQUEST_BUFFER_COUNT = 10 29_VIDEO_DEVICES_PATH = '/dev/video*' 30 31 32def v4l2_fourcc_to_str(fourcc): 33 return ''.join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) 34 35 36def initialize_device_path(): 37 """Returns the device path of the device to be tested. 38 39 Returns: 40 Device path, /dev/video*, of device to be tested 41 """ 42 device_path = '' 43 selected_device = False 44 45 video_devices = glob.glob(_VIDEO_DEVICES_PATH) 46 47 for current_device_path in video_devices: 48 try: 49 video_device = os.open(current_device_path, os.O_RDWR | os.O_NONBLOCK) 50 caps = v4l2.v4l2_capability() 51 ioctl_retry_error(video_device, v4l2.VIDIOC_QUERYCAP, 52 caps, OSError, errno.EBUSY) 53 54 if (_DEVICE_NAME in caps.card.lower().decode('utf-8') and 55 not selected_device and 56 caps.capabilities & v4l2.V4L2_CAP_VIDEO_CAPTURE): 57 # Devices can mount multiple nodes at /dev/video* 58 # Check for one that is used for capturing by finding 59 # if formats can be retrieved from it 60 while True: 61 try: 62 fmtdesc = v4l2.v4l2_fmtdesc() 63 fmtdesc.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE 64 ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FMT, 65 fmtdesc, OSError, errno.EBUSY) 66 except OSError: 67 break 68 else: 69 selected_device = True 70 device_path = current_device_path 71 break 72 73 os.close(video_device) 74 except OSError: 75 pass 76 77 return device_path 78 79 80def initialize_formats_and_resolutions(video_device): 81 """Gets a list of the supported formats, resolutions and frame rates for the device. 82 83 Args: 84 video_device: Device to be checked 85 86 Returns: 87 List of formats, resolutions, and frame rates: 88 [ (Format (fmtdesc), [ (Resolution (frmsize), 89 [ FrameRates (v4l2_frmivalenum) ]) ]) ] 90 """ 91 # [(Format (fmtdesc),[(Resolution(frmsize),[FrameRates(v4l2_frmivalenum)])])] 92 formats_and_resolutions = [] 93 94 # Retrieve supported formats 95 format_index = 0 96 while True: 97 try: 98 fmtdesc = v4l2.v4l2_fmtdesc() 99 fmtdesc.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE 100 fmtdesc.index = format_index 101 ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FMT, 102 fmtdesc, OSError, errno.EBUSY) 103 except OSError: 104 break 105 else: 106 formats_and_resolutions.append((fmtdesc, [])) 107 format_index += 1 108 109 # Use the found formats to retrieve the supported 110 # resolutions per format 111 for index, elem in enumerate(formats_and_resolutions): 112 fmtdesc = elem[0] 113 frmsize_index = 0 114 115 while True: 116 try: 117 frmsize = v4l2.v4l2_frmsizeenum() 118 frmsize.pixel_format = fmtdesc.pixelformat 119 frmsize.index = frmsize_index 120 ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FRAMESIZES, 121 frmsize, OSError, errno.EBUSY) 122 except OSError: 123 break 124 else: 125 if frmsize.type == v4l2.V4L2_FRMSIZE_TYPE_DISCRETE: 126 formats_and_resolutions[index][1].append((frmsize, [])) 127 frmsize_index += 1 128 129 # Get advertised frame rates supported per format and resolution 130 for format_index, elem in enumerate(formats_and_resolutions): 131 fmtdesc = elem[0] 132 frmsize_list = elem[1] 133 134 for frmsize_index, frmsize_elem in enumerate(frmsize_list): 135 curr_frmsize = frmsize_elem[0] 136 frmival_index = 0 137 while True: 138 try: 139 frmivalenum = v4l2.v4l2_frmivalenum() 140 frmivalenum.index = frmival_index 141 frmivalenum.pixel_format = fmtdesc.pixelformat 142 frmivalenum.width = curr_frmsize.discrete.width 143 frmivalenum.height = curr_frmsize.discrete.height 144 ioctl_retry_error(video_device, v4l2.VIDIOC_ENUM_FRAMEINTERVALS, 145 frmivalenum, OSError, errno.EBUSY) 146 except OSError: 147 break 148 else: 149 formats_and_resolutions[format_index][1][ 150 frmsize_index][1].append(frmivalenum) 151 frmival_index += 1 152 153 return formats_and_resolutions 154 155 156def print_formats_and_resolutions(formats_and_resolutions): 157 """Helper function to print out device capabilities for debugging. 158 159 Args: 160 formats_and_resolutions: List to be printed 161 """ 162 for elem in formats_and_resolutions: 163 fmtdesc = elem[0] 164 print(f"""Format - {fmtdesc.description}, 165 {fmtdesc.pixelformat} ({v4l2_fourcc_to_str(fmtdesc.pixelformat)})""") 166 frmsize_list = elem[1] 167 for frmsize_elem in frmsize_list: 168 frmsize = frmsize_elem[0] 169 print(f'-Resolution: {frmsize.discrete.width}x{frmsize.discrete.height}') 170 frmivalenum_list = frmsize_elem[1] 171 for frmivalenum in frmivalenum_list: 172 print(f"""\t{fmtdesc.description} ({fmtdesc.pixelformat}), 173 {frmivalenum.discrete.denominator / frmivalenum.discrete.numerator} 174 fps""") 175 176 177def ioctl_retry_error(video_device, request, arg, error, errno_code): 178 """Adds wait check for specified ioctl call. 179 180 Args: 181 video_device: the device the ioctl call will interface with 182 request: request for the ioctl call 183 arg: arguments for ioctl 184 error: the error to be catched and waited on 185 errno_code: errno code of error to be waited on 186 """ 187 wait_time = _WAIT_MS 188 while True: 189 try: 190 fcntl.ioctl(video_device, request, arg) 191 break 192 except error as e: 193 # if the error is a blocking I/O error, wait a short time and try again 194 if e.errno == errno_code and wait_time >= 0: 195 time.sleep(0.01) # wait for 10 milliseconds 196 wait_time -= 10 197 continue 198 else: 199 raise # otherwise, re-raise the exception 200 201 202def setup_for_test_fps(video_device, formats_and_resolutions): 203 """Sets up and calls fps test for device. 204 205 Args: 206 video_device: device to be tested 207 formats_and_resolutions: device capabilities to be tested 208 209 Returns: 210 List of fps test results with expected fps and actual tested fps 211 [ (Expected, Actual )] 212 """ 213 res = [] 214 for elem in formats_and_resolutions: 215 fmtdesc = elem[0] 216 217 fmt = v4l2.v4l2_format() 218 fmt.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE 219 fmt.fmt.pix.pixelformat = fmtdesc.pixelformat 220 221 frmsize_list = elem[1] 222 for frmsize_elem in frmsize_list: 223 frmsize = frmsize_elem[0] 224 fmt.fmt.pix.width = frmsize.discrete.width 225 fmt.fmt.pix.height = frmsize.discrete.height 226 227 ioctl_retry_error(video_device, v4l2.VIDIOC_S_FMT, fmt, 228 OSError, errno.EBUSY) 229 230 ioctl_retry_error(video_device, v4l2.VIDIOC_G_FMT, fmt, 231 OSError, errno.EBUSY) 232 233 frmivalenum_list = frmsize_elem[1] 234 for frmivalenum_elem in frmivalenum_list: 235 streamparm = v4l2.v4l2_streamparm() 236 streamparm.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE 237 streamparm.parm.capture.timeperframe.numerator = ( 238 frmivalenum_elem.discrete.numerator) 239 streamparm.parm.capture.timeperframe.denominator = ( 240 frmivalenum_elem.discrete.denominator) 241 ioctl_retry_error(video_device, v4l2.VIDIOC_S_PARM, streamparm, 242 OSError, errno.EBUSY) 243 244 res.append((frmivalenum_elem.discrete.denominator, 245 test_fps(video_device, 246 frmivalenum_elem.discrete.denominator))) 247 return res 248 249 250def test_fps(video_device, fps): 251 """Runs fps test. 252 253 Args: 254 video_device: device to be tested 255 fps: fps being tested 256 257 Returns: 258 Actual fps achieved from device 259 """ 260 # Request buffers 261 req = v4l2.v4l2_requestbuffers() 262 req.count = _REQUEST_BUFFER_COUNT 263 req.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE 264 req.memory = v4l2.V4L2_MEMORY_MMAP 265 266 ioctl_retry_error(video_device, v4l2.VIDIOC_REQBUFS, req, 267 OSError, errno.EBUSY) 268 269 buffers = [] 270 for i in range(req.count): 271 buf = v4l2.v4l2_buffer() 272 buf.type = v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE 273 buf.memory = v4l2.V4L2_MEMORY_MMAP 274 buf.index = i 275 276 ioctl_retry_error(video_device, v4l2.VIDIOC_QUERYBUF, buf, 277 OSError, errno.EBUSY) 278 279 buf.buffer = mmap.mmap(video_device, 280 buf.length, mmap.PROT_READ, 281 mmap.MAP_SHARED, offset=buf.m.offset) 282 buffers.append(buf) 283 ioctl_retry_error(video_device, v4l2.VIDIOC_QBUF, buf, 284 OSError, errno.EBUSY) 285 286 # Stream on 287 buf_type = v4l2.v4l2_buf_type(v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE) 288 ioctl_retry_error(video_device, v4l2.VIDIOC_STREAMON, buf_type, 289 OSError, errno.EBUSY) 290 291 # Test FPS 292 num_frames = fps * _TEST_DURATION_SECONDS 293 start_time = time.time() 294 295 for x in range(num_frames): 296 buf = buffers[x % _REQUEST_BUFFER_COUNT] 297 ioctl_retry_error(video_device, v4l2.VIDIOC_DQBUF, buf, 298 BlockingIOError, errno.EWOULDBLOCK) 299 ioctl_retry_error(video_device, v4l2.VIDIOC_QBUF, buf, 300 OSError, errno.EBUSY) 301 302 end_time = time.time() 303 elapsed_time = end_time - start_time 304 fps_res = num_frames / elapsed_time 305 306 # Stream off and clean up 307 ioctl_retry_error(video_device, v4l2.VIDIOC_STREAMOFF, buf_type, 308 OSError, errno.EBUSY) 309 req.count = 0 310 ioctl_retry_error(video_device, v4l2.VIDIOC_REQBUFS, req, 311 OSError, errno.EBUSY) 312 313 for buf in buffers: 314 buf.buffer.close() 315 316 return fps_res 317 318 319def main(): 320 # Open the webcam device 321 device_path = initialize_device_path() 322 if not device_path: 323 logging.error('Supported device not found!') 324 return [] 325 326 try: 327 video_device = os.open(device_path, os.O_RDWR | os.O_NONBLOCK) 328 except Exception as e: 329 print(f'Error: failed to open device {device_path}: error {e}') 330 return [] 331 332 formats_and_resolutions = initialize_formats_and_resolutions(video_device) 333 if not formats_and_resolutions: 334 logging.error('Error retrieving formats and resolutions') 335 return [] 336 337 res = setup_for_test_fps(video_device, formats_and_resolutions) 338 339 os.close(video_device) 340 341 return res 342 343if __name__ == '__main__': 344 main() 345