1# Lint as: python2, python3 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""This module provides functions to record input events.""" 7 8from __future__ import division 9from __future__ import print_function 10 11import logging 12import re 13import select 14import subprocess 15import threading 16import time 17 18from autotest_lib.client.bin.input.linux_input import\ 19 EV_MSC, EV_SYN, MSC_SCAN, SYN_REPORT 20 21 22# Define extra misc events below as they are not defined in linux_input. 23MSC_SCAN_BTN_LEFT = 90001 24MSC_SCAN_BTN_RIGHT = 90002 25MSC_SCAN_BTN_MIDDLE = 90003 26 27 28class InputEventRecorderError(Exception): 29 """An exception class for input_event_recorder module.""" 30 pass 31 32 33class Event(object): 34 """An event class based on evtest constructed from an evtest event. 35 36 An ordinary event looks like: 37 Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316 38 39 A SYN_REPORT event looks like: 40 Event: time 10788.289613, -------------- SYN_REPORT ------------ 41 42 """ 43 44 def __init__(self, type=0, code=0, value=0): 45 """Construction of an input event. 46 47 @param type: the event type 48 @param code: the event code 49 @param value: the event value 50 51 """ 52 self.type = type 53 self.code = code 54 self.value= value 55 56 57 @staticmethod 58 def from_string(ev_string): 59 """Convert an event string to an event object. 60 61 @param ev_string: an event string. 62 63 @returns: an event object if the event string conforms to 64 event pattern format. None otherwise. 65 66 """ 67 # Get the pattern of an ordinary event 68 ev_pattern_time = r'Event:\s*time\s*(\d+\.\d+)' 69 ev_pattern_type = r'type\s*(\d+)\s*\(\w+\)' 70 ev_pattern_code = r'code\s*(\d+)\s*\(\w+\)' 71 ev_pattern_value = r'value\s*(-?\d+)' 72 ev_sep = r',\s*' 73 ev_pattern_str = ev_sep.join([ev_pattern_time, 74 ev_pattern_type, 75 ev_pattern_code, 76 ev_pattern_value]) 77 ev_pattern = re.compile(ev_pattern_str, re.I) 78 79 # Get the pattern of the SYN_REPORT event 80 ev_pattern_type_SYN_REPORT = r'-+\s*SYN_REPORT\s-+' 81 ev_pattern_SYN_REPORT_str = ev_sep.join([ev_pattern_time, 82 ev_pattern_type_SYN_REPORT]) 83 ev_pattern_SYN_REPORT = re.compile(ev_pattern_SYN_REPORT_str, re.I) 84 85 # Check if it is a SYN event. 86 result = ev_pattern_SYN_REPORT.search(ev_string) 87 if result: 88 return Event(EV_SYN, SYN_REPORT, 0) 89 else: 90 # Check if it is a regular event. 91 result = ev_pattern.search(ev_string) 92 if result: 93 ev_type = int(result.group(2)) 94 ev_code = int(result.group(3)) 95 ev_value = int(result.group(4)) 96 return Event(ev_type, ev_code, ev_value) 97 else: 98 return None 99 100 101 def is_syn(self): 102 """Determine if the event is a SYN report event. 103 104 @returns: True if it is a SYN report event. False otherwise. 105 106 """ 107 return self.type == EV_SYN and self.code == SYN_REPORT 108 109 110 def value_tuple(self): 111 """A tuple of the event type, code, and value. 112 113 @returns: the tuple of the event type, code, and value. 114 115 """ 116 return (self.type, self.code, self.value) 117 118 119 def __eq__(self, other): 120 """determine if two events are equal. 121 122 @param line: an event string line. 123 124 @returns: True if two events are equal. False otherwise. 125 126 """ 127 return (self.type == other.type and 128 self.code == other.code and 129 self.value == other.value) 130 131 132 def __str__(self): 133 """A string representation of the event. 134 135 @returns: a string representation of the event. 136 137 """ 138 return '%d %d %d' % (self.type, self.code, self.value) 139 140 141class InputEventRecorder(object): 142 """An input event recorder. 143 144 Refer to recording_example() below about how to record input events. 145 146 """ 147 148 INPUT_DEVICE_INFO_FILE = '/proc/bus/input/devices' 149 SELECT_TIMEOUT_SECS = 1 150 151 def __init__(self, device_name, uniq): 152 """Construction of input event recorder. 153 154 @param device_name: the device name of the input device node to record. 155 @param uniq: Unique address of input device (None if not used) 156 157 """ 158 self.device_name = device_name 159 self.uniq = uniq 160 self.device_node = self.get_device_node_by_name(device_name, uniq) 161 if self.device_node is None: 162 err_msg = 'Failed to find the device node of %s' % device_name 163 raise InputEventRecorderError(err_msg) 164 self._recording_thread = None 165 self._stop_recording_thread_event = threading.Event() 166 self.tmp_file = '/tmp/evtest.dat' 167 self.events = [] 168 169 170 def get_device_node_by_name(self, device_name, uniq): 171 """Get the input device node by name. 172 173 Example of a RN-42 emulated mouse device information looks like 174 175 I: Bus=0005 Vendor=0000 Product=0000 Version=0000 176 N: Name="RNBT-A96F" 177 P: Phys=6c:29:95:1a:b8:18 178 S: Sysfs=/devices/pci0000:00/0000:00:14.0/usb1/1-8/1-8:1.0/bluetooth/hci0/hci0:512:29/0005:0000:0000.0004/input/input15 179 U: Uniq=00:06:66:75:a9:6f 180 H: Handlers=event12 181 B: PROP=0 182 B: EV=17 183 B: KEY=70000 0 0 0 0 184 B: REL=103 185 B: MSC=10 186 187 Each group of input devices is separated by an empty line. 188 189 @param device_name: the device name of the target input device node. 190 @param uniq: Unique address of the device. None if unused. 191 192 @returns: the corresponding device node of the device. 193 194 """ 195 device_node = None 196 device_found = None 197 event_number = None 198 uniq_found = None 199 uniq = uniq.lower() if uniq else None 200 201 entry_pattern = re.compile('^[A-Z]: ') 202 device_pattern = re.compile('N: Name=.*%s' % device_name, re.I) 203 event_number_pattern = re.compile('H: Handlers=.*event(\d*)', re.I) 204 uniq_pattern = re.compile('U: Uniq=([a-zA-Z0-9:]+)') 205 206 with open(self.INPUT_DEVICE_INFO_FILE) as info: 207 for line in info: 208 line = line.rstrip('\n') 209 210 if not entry_pattern.search(line): 211 device_found = None 212 event_number = None 213 uniq_found = None 214 elif device_found: 215 # Check if this is an event line 216 find_event = event_number_pattern.search(line) 217 if find_event: 218 event_number = int(find_event.group(1)) 219 220 # Check if this a uniq line 221 find_uniq = uniq_pattern.search(line) 222 if find_uniq: 223 uniq_found = find_uniq.group(1).lower() 224 225 # If uniq matches expectations, we found the device node 226 if event_number and (not uniq or uniq_found == uniq): 227 device_node = '/dev/input/event%d' % event_number 228 break 229 230 else: 231 device_found = device_pattern.search(line) 232 233 return device_node 234 235 236 def record(self): 237 """Record input events.""" 238 logging.info('Recording input events of %s.', self.device_node) 239 cmd = 'evtest %s' % self.device_node 240 recorder = subprocess.Popen(cmd, 241 bufsize=0, 242 stdout=subprocess.PIPE, 243 shell=True) 244 with open(self.tmp_file, 'w') as output_f: 245 while True: 246 read_list, _, _ = select.select( 247 [recorder.stdout], [], [], 1) 248 if read_list: 249 line = recorder.stdout.readline().decode() 250 output_f.write(line) 251 ev = Event.from_string(line) 252 if ev: 253 self.events.append(ev.value_tuple()) 254 elif self._stop_recording_thread_event.is_set(): 255 self._stop_recording_thread_event.clear() 256 break 257 258 recorder.terminate() 259 260 261 def start(self): 262 """Start the recording thread.""" 263 logging.info('Start recording thread.') 264 self._recording_thread = threading.Thread(target=self.record) 265 self._recording_thread.start() 266 267 268 def stop(self): 269 """Stop the recording thread.""" 270 logging.info('Stop recording thread.') 271 self._stop_recording_thread_event.set() 272 self._recording_thread.join() 273 274 275 def clear_events(self): 276 """Clear the event list.""" 277 self.events = [] 278 279 280 def get_events(self): 281 """Get the event list. 282 283 @returns: the event list. 284 """ 285 return self.events 286 287 288SYN_EVENT = Event(EV_SYN, SYN_REPORT, 0) 289MSC_SCAN_BTN_EVENT = {'LEFT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_LEFT), 290 'RIGHT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_RIGHT), 291 'MIDDLE': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_MIDDLE)} 292 293 294def recording_example(): 295 """Example code for capturing input events on a Samus. 296 297 For a quick swipe, it outputs events in numeric format: 298 299 (3, 57, 9) 300 (3, 53, 641) 301 (3, 54, 268) 302 (3, 58, 60) 303 (3, 48, 88) 304 (1, 330, 1) 305 (1, 325, 1) 306 (3, 0, 641) 307 (3, 1, 268) 308 (3, 24, 60) 309 (0, 0, 0) 310 (3, 53, 595) 311 (3, 54, 288) 312 (3, 0, 595) 313 (3, 1, 288) 314 (0, 0, 0) 315 (3, 57, -1) 316 (1, 330, 0) 317 (1, 325, 0) 318 (3, 24, 0) 319 (0, 0, 0) 320 321 The above events in corresponding evtest text format are: 322 323 Event: time .782950, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value 9 324 Event: time .782950, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 641 325 Event: time .782950, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 268 326 Event: time .782950, type 3 (EV_ABS), code 58 (ABS_MT_PRESSURE), value 60 327 Event: time .782950, type 3 (EV_ABS), code 59 (?), value 0 328 Event: time .782950, type 3 (EV_ABS), code 48 (ABS_MT_TOUCH_MAJOR), value 88 329 Event: time .782950, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 1 330 Event: time .782950, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 1 331 Event: time .782950, type 3 (EV_ABS), code 0 (ABS_X), value 641 332 Event: time .782950, type 3 (EV_ABS), code 1 (ABS_Y), value 268 333 Event: time .782950, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 60 334 Event: time .782950, -------------- SYN_REPORT ------------ 335 Event: time .798273, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 595 336 Event: time .798273, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 288 337 Event: time .798273, type 3 (EV_ABS), code 0 (ABS_X), value 595 338 Event: time .798273, type 3 (EV_ABS), code 1 (ABS_Y), value 288 339 Event: time .798273, -------------- SYN_REPORT ------------ 340 Event: time .821437, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value -1 341 Event: time .821437, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 0 342 Event: time .821437, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 0 343 Event: time .821437, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 0 344 Event: time .821437, -------------- SYN_REPORT ------------ 345 """ 346 device_name = 'Atmel maXTouch Touchpad' 347 recorder = InputEventRecorder(device_name) 348 print('Samus touchpad device name:', recorder.device_name) 349 print('Samus touchpad device node:', recorder.device_node) 350 print('Please make gestures on the touchpad for up to 5 seconds.') 351 recorder.clear_events() 352 recorder.start() 353 time.sleep(5) 354 recorder.stop() 355 for e in recorder.get_events(): 356 print(e) 357 358 359if __name__ == '__main__': 360 recording_example() 361