1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import logging 7import re 8import time 9from xml.parsers import expat 10 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.cros import ec 13from autotest_lib.server.cros.servo import servo 14 15# Hostevent codes, copied from: 16# ec/include/ec_commands.h 17HOSTEVENT_LID_CLOSED = 0x00000001 18HOSTEVENT_LID_OPEN = 0x00000002 19HOSTEVENT_POWER_BUTTON = 0x00000004 20HOSTEVENT_AC_CONNECTED = 0x00000008 21HOSTEVENT_AC_DISCONNECTED = 0x00000010 22HOSTEVENT_BATTERY_LOW = 0x00000020 23HOSTEVENT_BATTERY_CRITICAL = 0x00000040 24HOSTEVENT_BATTERY = 0x00000080 25HOSTEVENT_THERMAL_THRESHOLD = 0x00000100 26HOSTEVENT_THERMAL_OVERLOAD = 0x00000200 27HOSTEVENT_THERMAL = 0x00000400 28HOSTEVENT_USB_CHARGER = 0x00000800 29HOSTEVENT_KEY_PRESSED = 0x00001000 30HOSTEVENT_INTERFACE_READY = 0x00002000 31# Keyboard recovery combo has been pressed 32HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000 33# Shutdown due to thermal overload 34HOSTEVENT_THERMAL_SHUTDOWN = 0x00008000 35# Shutdown due to battery level too low 36HOSTEVENT_BATTERY_SHUTDOWN = 0x00010000 37HOSTEVENT_INVALID = 0x80000000 38 39# Time to wait after sending keypress commands. 40KEYPRESS_RECOVERY_TIME = 0.5 41 42# Wakemask types, copied from: 43# ec/include/ec_commands.h 44EC_HOST_EVENT_MAIN = 0 45EC_HOST_EVENT_B = 1 46EC_HOST_EVENT_SCI_MASK = 2 47EC_HOST_EVENT_SMI_MASK = 3 48EC_HOST_EVENT_ALWAYS_REPORT_MASK = 4 49EC_HOST_EVENT_ACTIVE_WAKE_MASK = 5 50EC_HOST_EVENT_LAZY_WAKE_MASK_S0IX = 6 51EC_HOST_EVENT_LAZY_WAKE_MASK_S3 = 7 52EC_HOST_EVENT_LAZY_WAKE_MASK_S5 = 8 53 54 55class ChromeConsole(object): 56 """Manages control of a Chrome console. 57 58 We control the Chrome console via the UART of a Servo board. Chrome console 59 provides many interfaces to set and get its behavior via console commands. 60 This class is to abstract these interfaces. 61 """ 62 63 CMD = "_cmd" 64 REGEXP = "_regexp" 65 MULTICMD = "_multicmd" 66 67 # EC Features 68 # Quoted from 'enum ec_feature_code' in platform/ec/include/ec_commands.h. 69 EC_FEATURE = { 70 'EC_FEATURE_LIMITED' : 0, 71 'EC_FEATURE_FLASH' : 1, 72 'EC_FEATURE_PWM_FAN' : 2, 73 'EC_FEATURE_PWM_KEYB' : 3, 74 'EC_FEATURE_LIGHTBAR' : 4, 75 'EC_FEATURE_LED' : 5, 76 'EC_FEATURE_MOTION_SENSE' : 6, 77 'EC_FEATURE_KEYB' : 7, 78 'EC_FEATURE_PSTORE' : 8, 79 'EC_FEATURE_PORT80' : 9, 80 'EC_FEATURE_THERMAL' : 10, 81 'EC_FEATURE_BKLIGHT_SWITCH' : 11, 82 'EC_FEATURE_WIFI_SWITCH' : 12, 83 'EC_FEATURE_HOST_EVENTS' : 13, 84 'EC_FEATURE_GPIO' : 14, 85 'EC_FEATURE_I2C' : 15, 86 'EC_FEATURE_CHARGER' : 16, 87 'EC_FEATURE_BATTERY' : 17, 88 'EC_FEATURE_SMART_BATTERY' : 18, 89 'EC_FEATURE_HANG_DETECT' : 19, 90 'EC_FEATURE_PMU' : 20, 91 'EC_FEATURE_SUB_MCU' : 21, 92 'EC_FEATURE_USB_PD' : 22, 93 'EC_FEATURE_USB_MUX' : 23, 94 'EC_FEATURE_MOTION_SENSE_FIFO' : 24, 95 'EC_FEATURE_VSTORE' : 25, 96 'EC_FEATURE_USBC_SS_MUX_VIRTUAL' : 26, 97 'EC_FEATURE_RTC' : 27, 98 'EC_FEATURE_FINGERPRINT' : 28, 99 'EC_FEATURE_TOUCHPAD' : 29, 100 'EC_FEATURE_RWSIG' : 30, 101 'EC_FEATURE_DEVICE_EVENT' : 31, 102 'EC_FEATURE_UNIFIED_WAKE_MASKS' : 32, 103 'EC_FEATURE_HOST_EVENT64' : 33, 104 'EC_FEATURE_EXEC_IN_RAM' : 34, 105 'EC_FEATURE_CEC' : 35, 106 'EC_FEATURE_MOTION_SENSE_TIGHT_TIMESTAMPS' : 36, 107 'EC_FEATURE_REFINED_TABLET_MODE_HYSTERESIS' : 37, 108 'EC_FEATURE_EFS2' : 38, 109 'EC_FEATURE_SCP' : 39, 110 'EC_FEATURE_ISH' : 40, 111 } 112 113 def __init__(self, servo, name): 114 """Initialize and keep the servo object. 115 116 Args: 117 servo: A Servo object. 118 name: The console name. 119 """ 120 self.name = name 121 self.uart_cmd = self.name + self.CMD 122 self.uart_regexp = self.name + self.REGEXP 123 self.uart_multicmd = self.name + self.MULTICMD 124 125 self._servo = servo 126 127 def __repr__(self): 128 """Return a string representation: <ChromeConsole 'foo_uart'>""" 129 return "<%s %r>" % (self.__class__.__name__, self.name) 130 131 def set_uart_regexp(self, regexp): 132 self._servo.set(self.uart_regexp, regexp) 133 134 def clear_uart_regex(self): 135 """Clear uart_regexp""" 136 self.set_uart_regexp('None') 137 138 def send_command(self, commands): 139 """Send command through UART. 140 141 This function opens UART pty when called, and then command is sent 142 through UART. 143 144 Args: 145 commands: The commands to send, either a list or a string. 146 """ 147 self.clear_uart_regex() 148 if isinstance(commands, list): 149 try: 150 self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands)) 151 except servo.ControlUnavailableError: 152 logging.warning('The servod is too old that uart_multicmd not ' 153 'supported. Use uart_cmd instead.') 154 for command in commands: 155 self._servo.set_nocheck(self.uart_cmd, command) 156 else: 157 self._servo.set_nocheck(self.uart_cmd, commands) 158 self.clear_uart_regex() 159 160 def has_command(self, command): 161 """Check whether EC console supports |command|. 162 163 Args: 164 command: Command to look for. 165 166 Returns: 167 True: If the |command| exist on the EC image of the device. 168 False: If the |command| does not exist on the EC image of the device. 169 """ 170 result = None 171 try: 172 # Throws error.TestFail (on timeout) if it cannot find a line with 173 # 'command' in the output. Thus return False in that case. 174 result = self.send_command_get_output('help', [command]) 175 except error.TestFail: 176 return False 177 return result is not None 178 179 def send_command_get_output(self, command, regexp_list, retries=1): 180 """Send command through UART and wait for response. 181 182 This function waits for response message matching regular expressions. 183 184 Args: 185 command: The command sent. 186 regexp_list: List of regular expressions used to match response 187 message. Note, list must be ordered. 188 189 Returns: 190 List of tuples, each of which contains the entire matched string and 191 all the subgroups of the match. None if not matched. 192 For example: 193 response of the given command: 194 High temp: 37.2 195 Low temp: 36.4 196 regexp_list: 197 ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)'] 198 returns: 199 [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')] 200 201 Raises: 202 error.TestError: An error when the given regexp_list is not valid. 203 """ 204 if not isinstance(regexp_list, list): 205 raise error.TestError('Arugment regexp_list is not a list: %s' % 206 str(regexp_list)) 207 208 while retries > 0: 209 retries -= 1 210 try: 211 self.set_uart_regexp(str(regexp_list)) 212 self._servo.set_nocheck(self.uart_cmd, command) 213 return ast.literal_eval(self._servo.get(self.uart_cmd)) 214 except (servo.UnresponsiveConsoleError, 215 servo.ResponsiveConsoleError, expat.ExpatError) as e: 216 if retries <= 0: 217 raise 218 logging.warning('Failed to send EC cmd. %s', e) 219 finally: 220 self.clear_uart_regex() 221 222 223 def is_dfp(self, port=0): 224 """This function checks if EC is DFP 225 226 Args: 227 port: Port of EC to check 228 229 Returns: 230 True: if EC is DFP 231 False: if EC is not DFP 232 """ 233 is_dfp = None 234 ret = None 235 try: 236 ret = self.send_command_get_output("pd %d state" % port, 237 ["DFP.*Flag"]) 238 is_dfp = True 239 except Exception as e: 240 is_dfp = False 241 242 # For TCPMv1, after disconnecting a device the data state remains 243 # the same, so even when pd state shows DPF, make sure the device is 244 # not disconnected 245 if is_dfp: 246 if "DRP_AUTO_TOGGLE" in ret[0] or "DISCONNECTED" in ret[0]: 247 is_dfp = False 248 249 return is_dfp 250 251 252class ChromeEC(ChromeConsole): 253 """Manages control of a Chrome EC. 254 255 We control the Chrome EC via the UART of a Servo board. Chrome EC 256 provides many interfaces to set and get its behavior via console commands. 257 This class is to abstract these interfaces. 258 """ 259 260 # The dict to cache the battery information 261 BATTERY_INFO = {} 262 263 def __init__(self, servo, name="ec_uart"): 264 super(ChromeEC, self).__init__(servo, name) 265 266 def __repr__(self): 267 """Return a string representation of the object: <ChromeEC 'ec_uart'>""" 268 return "<%s %r>" % (self.__class__.__name__, self.name) 269 270 def key_down(self, keyname): 271 """Simulate pressing a key. 272 273 Args: 274 keyname: Key name, one of the keys of KEYMATRIX. 275 """ 276 self.send_command('kbpress %d %d 1' % 277 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0])) 278 279 280 def key_up(self, keyname): 281 """Simulate releasing a key. 282 283 Args: 284 keyname: Key name, one of the keys of KEYMATRIX. 285 """ 286 self.send_command('kbpress %d %d 0' % 287 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0])) 288 289 290 def key_press(self, keyname): 291 """Press and then release a key. 292 293 Args: 294 keyname: Key name, one of the keys of KEYMATRIX. 295 """ 296 self.send_command([ 297 'kbpress %d %d 1' % 298 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]), 299 'kbpress %d %d 0' % 300 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]), 301 ]) 302 # Don't spam the EC console as fast as we can; leave some recovery time 303 # in between commands. 304 time.sleep(KEYPRESS_RECOVERY_TIME) 305 306 307 def send_key_string_raw(self, string): 308 """Send key strokes consisting of only characters. 309 310 Args: 311 string: Raw string. 312 """ 313 for c in string: 314 self.key_press(c) 315 316 317 def send_key_string(self, string): 318 """Send key strokes including special keys. 319 320 Args: 321 string: Character string including special keys. An example 322 is "this is an<tab>example<enter>". 323 """ 324 for m in re.finditer("(<[^>]+>)|([^<>]+)", string): 325 sp, raw = m.groups() 326 if raw is not None: 327 self.send_key_string_raw(raw) 328 else: 329 self.key_press(sp) 330 331 332 def reboot(self, flags=''): 333 """Reboot EC with given flags. 334 335 Args: 336 flags: Optional, a space-separated string of flags passed to the 337 reboot command, including: 338 default: EC soft reboot; 339 'hard': EC hard/cold reboot; 340 'ap-off': Leave AP off after EC reboot (by default, EC turns 341 AP on after reboot if lid is open). 342 343 Raises: 344 error.TestError: If the string of flags is invalid. 345 """ 346 for flag in flags.split(): 347 if flag not in ('hard', 'ap-off'): 348 raise error.TestError( 349 'The flag %s of EC reboot command is invalid.' % flag) 350 self.send_command("reboot %s" % flags) 351 352 353 def set_flash_write_protect(self, enable): 354 """Set the software write protect of EC flash. 355 356 Args: 357 enable: True to enable write protect, False to disable. 358 """ 359 if enable: 360 self.send_command("flashwp enable") 361 else: 362 self.send_command("flashwp disable") 363 364 365 def set_hostevent(self, codes): 366 """Set the EC hostevent codes. 367 368 Args: 369 codes: Hostevent codes, HOSTEVENT_* 370 """ 371 self.send_command("hostevent set %#x" % codes) 372 # Allow enough time for EC to process input and set flag. 373 # See chromium:371631 for details. 374 # FIXME: Stop importing time module if this hack becomes obsolete. 375 time.sleep(1) 376 377 378 def enable_console_channel(self, channel): 379 """Find console channel mask and enable that channel only 380 381 @param channel: console channel name 382 """ 383 # The 'chan' command returns a list of console channels, 384 # their channel masks and channel numbers 385 regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel) 386 l = self.send_command_get_output('chan', [regexp]) 387 # Use channel mask and append the 0x for proper hex input value 388 cmd = 'chan 0x' + l[0][2] 389 # Set console to only output the desired channel 390 self.send_command(cmd) 391 392 393 def get_version(self): 394 """Get version information from the Chrome EC console. 395 Additionally, can be used to verify if EC console is available. 396 """ 397 self.send_command("chan 0") 398 # Use "[ \t]" here and not \s because sometimes the version is blank, 399 # i.e. 'RO: \r\n' which matches RO:\s+ 400 expected_output = [ 401 "Chip:[ \t]+([^\r\n]*)\r\n", "RO:[ \t]+([^\r\n]*)\r\n", 402 "RW_?[AB]?:[ \t]+([^\r\n]*)\r\n", "Build:[ \t]+([^\r\n]*)\r\n" 403 ] 404 l = self.send_command_get_output("version", expected_output) 405 self.send_command("chan 0xffffffff") 406 return l 407 408 def check_ro_rw(self, img_exp): 409 """Tell if the current EC image matches the given input, 'RW' or 'RO. 410 411 Args: 412 img_exp: Expected image type. It should be either 'RW' or 'RO'. 413 Return: 414 True if the active EC image matches to 'img_exp'. 415 False otherwise. 416 Raise: 417 TestError if img_exp is neither 'RW' nor 'RO'. 418 """ 419 if img_exp not in ['RW', 'RO']: 420 raise error.TestError('Arugment img_exp is neither RW nor RO') 421 422 result = self.send_command_get_output('sysinfo', [r'Copy:\s*(RO|RW)']) 423 return result[0][1] == img_exp 424 425 def check_feature(self, feature): 426 """Return true if EC supports the given feature 427 428 Args: 429 feature: feature name as a string as in self.EC_FEATURE. 430 431 Returns: 432 True if 'feature' is in EC's feature set. 433 False otherwise 434 """ 435 feat_id = self.EC_FEATURE[feature] 436 if feat_id < 32: 437 feat_start = 0 438 else: 439 feat_start = 32 440 441 regexp = r'%d-%d:\s*(0x[0-9a-fA-F]{8})' % (feat_start, 442 feat_start + 31) 443 444 try: 445 result = self.send_command_get_output('feat', [regexp]) 446 except servo.ResponsiveConsoleError as e: 447 logging.warning("feat command is not available: %s", str(e)) 448 return False 449 450 feat_bitmap = int(result[0][1], 16) 451 452 return ((1 << (feat_id - feat_start)) & feat_bitmap) != 0 453 454 def update_battery_info(self): 455 """Get the battery info we care for this test.""" 456 # The battery parameters we care for this test. The order must match 457 # the output of EC battery command. 458 battery_params = [ 459 'V', 'V-desired', 'I', 'I-desired', 'Charging', 'Remaining' 460 ] 461 regex_str_list = [] 462 463 for p in battery_params: 464 if p == 'Remaining': 465 regex_str_list.append(p + ':\s+(\d+)\s+') 466 elif p == 'Charging': 467 regex_str_list.append(p + r':\s+(Not Allowed|Allowed)\s+') 468 else: 469 regex_str_list.append(p + 470 r':\s+0x[0-9a-f]*\s+=\s+([0-9-]+)\s+') 471 472 # For unknown reasons, servod doesn't always capture the ec 473 # command output. It doesn't happen often, but retry if it does. 474 retries = 3 475 while retries > 0: 476 retries -= 1 477 try: 478 battery_regex_match = self.send_command_get_output( 479 'battery', regex_str_list) 480 break 481 except (servo.UnresponsiveConsoleError, 482 servo.ResponsiveConsoleError) as e: 483 if retries <= 0: 484 raise 485 logging.warning('Failed to get battery status. %s', e) 486 else: 487 battery_regex_match = self.send_command_get_output( 488 'battery', regex_str_list) 489 490 for i in range(len(battery_params)): 491 if battery_params[i] == 'Charging': 492 self.BATTERY_INFO[ 493 battery_params[i]] = battery_regex_match[i][1] 494 else: 495 self.BATTERY_INFO[battery_params[i]] = int( 496 battery_regex_match[i][1]) 497 logging.debug('Battery info: %s', self.BATTERY_INFO) 498 499 def get_battery_desired_voltage(self, print_result=True): 500 """Get battery desired voltage value.""" 501 if not self.BATTERY_INFO: 502 self.update_battery_info() 503 if print_result: 504 logging.info('Battery desired voltage = %d mV', 505 self.BATTERY_INFO['V-desired']) 506 return self.BATTERY_INFO['V-desired'] 507 508 def get_battery_desired_current(self, print_result=True): 509 """Get battery desired current value.""" 510 if not self.BATTERY_INFO: 511 self.update_battery_info() 512 if print_result: 513 logging.info('Battery desired current = %d mA', 514 self.BATTERY_INFO['I-desired']) 515 return self.BATTERY_INFO['I-desired'] 516 517 def get_battery_actual_voltage(self, print_result=True): 518 """Get the actual voltage from charger to battery.""" 519 if not self.BATTERY_INFO: 520 self.update_battery_info() 521 if print_result: 522 logging.info('Battery actual voltage = %d mV', 523 self.BATTERY_INFO['V']) 524 return self.BATTERY_INFO['V'] 525 526 def get_battery_actual_current(self, print_result=True): 527 """Get the actual current from charger to battery.""" 528 if not self.BATTERY_INFO: 529 self.update_battery_info() 530 if print_result: 531 logging.info('Battery actual current = %d mA', 532 self.BATTERY_INFO['I']) 533 return self.BATTERY_INFO['I'] 534 535 def get_battery_remaining(self, print_result=True): 536 """Get battery charge remaining in mAh.""" 537 if not self.BATTERY_INFO: 538 self.update_battery_info() 539 if print_result: 540 logging.info("Battery charge remaining = %d mAh", 541 self.BATTERY_INFO['Remaining']) 542 return self.BATTERY_INFO['Remaining'] 543 544 def get_battery_charging_allowed(self, print_result=True): 545 """Get the battery charging state. 546 547 Returns True if charging is allowed. 548 """ 549 if not self.BATTERY_INFO: 550 self.update_battery_info() 551 if print_result: 552 logging.info("Battery charging = %s", 553 self.BATTERY_INFO['Charging']) 554 if self.BATTERY_INFO['Charging'] == 'Allowed': 555 return True 556 return False 557 558 559class ChromeUSBPD(ChromeEC): 560 """Manages control of a Chrome USBPD. 561 562 We control the Chrome EC via the UART of a Servo board. Chrome USBPD 563 provides many interfaces to set and get its behavior via console commands. 564 This class is to abstract these interfaces. 565 """ 566 567 def __init__(self, servo): 568 super(ChromeUSBPD, self).__init__(servo, "usbpd_uart") 569