xref: /aosp_15_r20/external/autotest/server/cros/servo/chrome_ec.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import logging
7import re
8import time
9from xml.parsers import expat
10
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.cros import ec
13from autotest_lib.server.cros.servo import servo
14
15# Hostevent codes, copied from:
16#     ec/include/ec_commands.h
17HOSTEVENT_LID_CLOSED        = 0x00000001
18HOSTEVENT_LID_OPEN          = 0x00000002
19HOSTEVENT_POWER_BUTTON      = 0x00000004
20HOSTEVENT_AC_CONNECTED      = 0x00000008
21HOSTEVENT_AC_DISCONNECTED   = 0x00000010
22HOSTEVENT_BATTERY_LOW       = 0x00000020
23HOSTEVENT_BATTERY_CRITICAL  = 0x00000040
24HOSTEVENT_BATTERY           = 0x00000080
25HOSTEVENT_THERMAL_THRESHOLD = 0x00000100
26HOSTEVENT_THERMAL_OVERLOAD  = 0x00000200
27HOSTEVENT_THERMAL           = 0x00000400
28HOSTEVENT_USB_CHARGER       = 0x00000800
29HOSTEVENT_KEY_PRESSED       = 0x00001000
30HOSTEVENT_INTERFACE_READY   = 0x00002000
31# Keyboard recovery combo has been pressed
32HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000
33# Shutdown due to thermal overload
34HOSTEVENT_THERMAL_SHUTDOWN  = 0x00008000
35# Shutdown due to battery level too low
36HOSTEVENT_BATTERY_SHUTDOWN  = 0x00010000
37HOSTEVENT_INVALID           = 0x80000000
38
39# Time to wait after sending keypress commands.
40KEYPRESS_RECOVERY_TIME = 0.5
41
42# Wakemask types, copied from:
43#     ec/include/ec_commands.h
44EC_HOST_EVENT_MAIN = 0
45EC_HOST_EVENT_B = 1
46EC_HOST_EVENT_SCI_MASK = 2
47EC_HOST_EVENT_SMI_MASK = 3
48EC_HOST_EVENT_ALWAYS_REPORT_MASK = 4
49EC_HOST_EVENT_ACTIVE_WAKE_MASK = 5
50EC_HOST_EVENT_LAZY_WAKE_MASK_S0IX = 6
51EC_HOST_EVENT_LAZY_WAKE_MASK_S3 = 7
52EC_HOST_EVENT_LAZY_WAKE_MASK_S5 = 8
53
54
55class ChromeConsole(object):
56    """Manages control of a Chrome console.
57
58    We control the Chrome console via the UART of a Servo board. Chrome console
59    provides many interfaces to set and get its behavior via console commands.
60    This class is to abstract these interfaces.
61    """
62
63    CMD = "_cmd"
64    REGEXP = "_regexp"
65    MULTICMD = "_multicmd"
66
67    # EC Features
68    # Quoted from 'enum ec_feature_code' in platform/ec/include/ec_commands.h.
69    EC_FEATURE = {
70        'EC_FEATURE_LIMITED'                            : 0,
71        'EC_FEATURE_FLASH'                              : 1,
72        'EC_FEATURE_PWM_FAN'                            : 2,
73        'EC_FEATURE_PWM_KEYB'                           : 3,
74        'EC_FEATURE_LIGHTBAR'                           : 4,
75        'EC_FEATURE_LED'                                : 5,
76        'EC_FEATURE_MOTION_SENSE'                       : 6,
77        'EC_FEATURE_KEYB'                               : 7,
78        'EC_FEATURE_PSTORE'                             : 8,
79        'EC_FEATURE_PORT80'                             : 9,
80        'EC_FEATURE_THERMAL'                            : 10,
81        'EC_FEATURE_BKLIGHT_SWITCH'                     : 11,
82        'EC_FEATURE_WIFI_SWITCH'                        : 12,
83        'EC_FEATURE_HOST_EVENTS'                        : 13,
84        'EC_FEATURE_GPIO'                               : 14,
85        'EC_FEATURE_I2C'                                : 15,
86        'EC_FEATURE_CHARGER'                            : 16,
87        'EC_FEATURE_BATTERY'                            : 17,
88        'EC_FEATURE_SMART_BATTERY'                      : 18,
89        'EC_FEATURE_HANG_DETECT'                        : 19,
90        'EC_FEATURE_PMU'                                : 20,
91        'EC_FEATURE_SUB_MCU'                            : 21,
92        'EC_FEATURE_USB_PD'                             : 22,
93        'EC_FEATURE_USB_MUX'                            : 23,
94        'EC_FEATURE_MOTION_SENSE_FIFO'                  : 24,
95        'EC_FEATURE_VSTORE'                             : 25,
96        'EC_FEATURE_USBC_SS_MUX_VIRTUAL'                : 26,
97        'EC_FEATURE_RTC'                                : 27,
98        'EC_FEATURE_FINGERPRINT'                        : 28,
99        'EC_FEATURE_TOUCHPAD'                           : 29,
100        'EC_FEATURE_RWSIG'                              : 30,
101        'EC_FEATURE_DEVICE_EVENT'                       : 31,
102        'EC_FEATURE_UNIFIED_WAKE_MASKS'                 : 32,
103        'EC_FEATURE_HOST_EVENT64'                       : 33,
104        'EC_FEATURE_EXEC_IN_RAM'                        : 34,
105        'EC_FEATURE_CEC'                                : 35,
106        'EC_FEATURE_MOTION_SENSE_TIGHT_TIMESTAMPS'      : 36,
107        'EC_FEATURE_REFINED_TABLET_MODE_HYSTERESIS'     : 37,
108        'EC_FEATURE_EFS2'                               : 38,
109        'EC_FEATURE_SCP'                                : 39,
110        'EC_FEATURE_ISH'                                : 40,
111    }
112
113    def __init__(self, servo, name):
114        """Initialize and keep the servo object.
115
116        Args:
117          servo: A Servo object.
118          name: The console name.
119        """
120        self.name = name
121        self.uart_cmd = self.name + self.CMD
122        self.uart_regexp = self.name + self.REGEXP
123        self.uart_multicmd = self.name + self.MULTICMD
124
125        self._servo = servo
126
127    def __repr__(self):
128        """Return a string representation: <ChromeConsole 'foo_uart'>"""
129        return "<%s %r>" % (self.__class__.__name__, self.name)
130
131    def set_uart_regexp(self, regexp):
132        self._servo.set(self.uart_regexp, regexp)
133
134    def clear_uart_regex(self):
135        """Clear uart_regexp"""
136        self.set_uart_regexp('None')
137
138    def send_command(self, commands):
139        """Send command through UART.
140
141        This function opens UART pty when called, and then command is sent
142        through UART.
143
144        Args:
145          commands: The commands to send, either a list or a string.
146        """
147        self.clear_uart_regex()
148        if isinstance(commands, list):
149            try:
150                self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands))
151            except servo.ControlUnavailableError:
152                logging.warning('The servod is too old that uart_multicmd not '
153                                'supported. Use uart_cmd instead.')
154                for command in commands:
155                    self._servo.set_nocheck(self.uart_cmd, command)
156        else:
157            self._servo.set_nocheck(self.uart_cmd, commands)
158        self.clear_uart_regex()
159
160    def has_command(self, command):
161        """Check whether EC console supports |command|.
162
163        Args:
164          command: Command to look for.
165
166        Returns:
167          True: If the |command| exist on the EC image of the device.
168          False: If the |command| does not exist on the EC image of the device.
169        """
170        result = None
171        try:
172            # Throws error.TestFail (on timeout) if it cannot find a line with
173            # 'command' in the output. Thus return False in that case.
174            result = self.send_command_get_output('help', [command])
175        except error.TestFail:
176            return False
177        return result is not None
178
179    def send_command_get_output(self, command, regexp_list, retries=1):
180        """Send command through UART and wait for response.
181
182        This function waits for response message matching regular expressions.
183
184        Args:
185          command: The command sent.
186          regexp_list: List of regular expressions used to match response
187            message. Note, list must be ordered.
188
189        Returns:
190          List of tuples, each of which contains the entire matched string and
191          all the subgroups of the match. None if not matched.
192          For example:
193            response of the given command:
194              High temp: 37.2
195              Low temp: 36.4
196            regexp_list:
197              ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
198            returns:
199              [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
200
201        Raises:
202          error.TestError: An error when the given regexp_list is not valid.
203        """
204        if not isinstance(regexp_list, list):
205            raise error.TestError('Arugment regexp_list is not a list: %s' %
206                                  str(regexp_list))
207
208        while retries > 0:
209            retries -= 1
210            try:
211                self.set_uart_regexp(str(regexp_list))
212                self._servo.set_nocheck(self.uart_cmd, command)
213                return ast.literal_eval(self._servo.get(self.uart_cmd))
214            except (servo.UnresponsiveConsoleError,
215                    servo.ResponsiveConsoleError, expat.ExpatError) as e:
216                if retries <= 0:
217                    raise
218                logging.warning('Failed to send EC cmd. %s', e)
219            finally:
220                self.clear_uart_regex()
221
222
223    def is_dfp(self, port=0):
224        """This function checks if EC is DFP
225
226        Args:
227          port: Port of EC to check
228
229        Returns:
230          True: if EC is DFP
231          False: if EC is not DFP
232        """
233        is_dfp = None
234        ret = None
235        try:
236            ret = self.send_command_get_output("pd %d state" % port,
237                                               ["DFP.*Flag"])
238            is_dfp = True
239        except Exception as e:
240            is_dfp = False
241
242        # For TCPMv1, after disconnecting a device the data state remains
243        # the same, so even when pd state shows DPF, make sure the device is
244        # not disconnected
245        if is_dfp:
246            if "DRP_AUTO_TOGGLE" in ret[0] or "DISCONNECTED" in ret[0]:
247                is_dfp = False
248
249        return is_dfp
250
251
252class ChromeEC(ChromeConsole):
253    """Manages control of a Chrome EC.
254
255    We control the Chrome EC via the UART of a Servo board. Chrome EC
256    provides many interfaces to set and get its behavior via console commands.
257    This class is to abstract these interfaces.
258    """
259
260    # The dict to cache the battery information
261    BATTERY_INFO = {}
262
263    def __init__(self, servo, name="ec_uart"):
264        super(ChromeEC, self).__init__(servo, name)
265
266    def __repr__(self):
267        """Return a string representation of the object: <ChromeEC 'ec_uart'>"""
268        return "<%s %r>" % (self.__class__.__name__, self.name)
269
270    def key_down(self, keyname):
271        """Simulate pressing a key.
272
273        Args:
274          keyname: Key name, one of the keys of KEYMATRIX.
275        """
276        self.send_command('kbpress %d %d 1' %
277                (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
278
279
280    def key_up(self, keyname):
281        """Simulate releasing a key.
282
283        Args:
284          keyname: Key name, one of the keys of KEYMATRIX.
285        """
286        self.send_command('kbpress %d %d 0' %
287                (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
288
289
290    def key_press(self, keyname):
291        """Press and then release a key.
292
293        Args:
294          keyname: Key name, one of the keys of KEYMATRIX.
295        """
296        self.send_command([
297                'kbpress %d %d 1' %
298                    (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
299                'kbpress %d %d 0' %
300                    (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
301                ])
302        # Don't spam the EC console as fast as we can; leave some recovery time
303        # in between commands.
304        time.sleep(KEYPRESS_RECOVERY_TIME)
305
306
307    def send_key_string_raw(self, string):
308        """Send key strokes consisting of only characters.
309
310        Args:
311          string: Raw string.
312        """
313        for c in string:
314            self.key_press(c)
315
316
317    def send_key_string(self, string):
318        """Send key strokes including special keys.
319
320        Args:
321          string: Character string including special keys. An example
322            is "this is an<tab>example<enter>".
323        """
324        for m in re.finditer("(<[^>]+>)|([^<>]+)", string):
325            sp, raw = m.groups()
326            if raw is not None:
327                self.send_key_string_raw(raw)
328            else:
329                self.key_press(sp)
330
331
332    def reboot(self, flags=''):
333        """Reboot EC with given flags.
334
335        Args:
336          flags: Optional, a space-separated string of flags passed to the
337                 reboot command, including:
338                   default: EC soft reboot;
339                   'hard': EC hard/cold reboot;
340                   'ap-off': Leave AP off after EC reboot (by default, EC turns
341                             AP on after reboot if lid is open).
342
343        Raises:
344          error.TestError: If the string of flags is invalid.
345        """
346        for flag in flags.split():
347            if flag not in ('hard', 'ap-off'):
348                raise error.TestError(
349                        'The flag %s of EC reboot command is invalid.' % flag)
350        self.send_command("reboot %s" % flags)
351
352
353    def set_flash_write_protect(self, enable):
354        """Set the software write protect of EC flash.
355
356        Args:
357          enable: True to enable write protect, False to disable.
358        """
359        if enable:
360            self.send_command("flashwp enable")
361        else:
362            self.send_command("flashwp disable")
363
364
365    def set_hostevent(self, codes):
366        """Set the EC hostevent codes.
367
368        Args:
369          codes: Hostevent codes, HOSTEVENT_*
370        """
371        self.send_command("hostevent set %#x" % codes)
372        # Allow enough time for EC to process input and set flag.
373        # See chromium:371631 for details.
374        # FIXME: Stop importing time module if this hack becomes obsolete.
375        time.sleep(1)
376
377
378    def enable_console_channel(self, channel):
379        """Find console channel mask and enable that channel only
380
381        @param channel: console channel name
382        """
383        # The 'chan' command returns a list of console channels,
384        # their channel masks and channel numbers
385        regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel)
386        l = self.send_command_get_output('chan', [regexp])
387        # Use channel mask and append the 0x for proper hex input value
388        cmd = 'chan 0x' + l[0][2]
389        # Set console to only output the desired channel
390        self.send_command(cmd)
391
392
393    def get_version(self):
394        """Get version information from the Chrome EC console.
395           Additionally, can be used to verify if EC console is available.
396        """
397        self.send_command("chan 0")
398        # Use "[ \t]" here and not \s because sometimes the version is blank,
399        # i.e. 'RO:   \r\n' which matches RO:\s+
400        expected_output = [
401                "Chip:[ \t]+([^\r\n]*)\r\n", "RO:[ \t]+([^\r\n]*)\r\n",
402                "RW_?[AB]?:[ \t]+([^\r\n]*)\r\n", "Build:[ \t]+([^\r\n]*)\r\n"
403        ]
404        l = self.send_command_get_output("version", expected_output)
405        self.send_command("chan 0xffffffff")
406        return l
407
408    def check_ro_rw(self, img_exp):
409        """Tell if the current EC image matches the given input, 'RW' or 'RO.
410
411        Args:
412            img_exp: Expected image type. It should be either 'RW' or 'RO'.
413        Return:
414            True if the active EC image matches to 'img_exp'.
415            False otherwise.
416        Raise:
417            TestError if img_exp is neither 'RW' nor 'RO'.
418        """
419        if img_exp not in ['RW', 'RO']:
420            raise error.TestError('Arugment img_exp is neither RW nor RO')
421
422        result = self.send_command_get_output('sysinfo', [r'Copy:\s*(RO|RW)'])
423        return result[0][1] == img_exp
424
425    def check_feature(self, feature):
426        """Return true if EC supports the given feature
427
428        Args:
429            feature: feature name as a string as in self.EC_FEATURE.
430
431        Returns:
432            True if 'feature' is in EC's feature set.
433            False otherwise
434        """
435        feat_id = self.EC_FEATURE[feature]
436        if feat_id < 32:
437            feat_start = 0
438        else:
439            feat_start = 32
440
441        regexp = r'%d-%d:\s*(0x[0-9a-fA-F]{8})' % (feat_start,
442                                                   feat_start + 31)
443
444        try:
445            result = self.send_command_get_output('feat', [regexp])
446        except servo.ResponsiveConsoleError as e:
447            logging.warning("feat command is not available: %s", str(e))
448            return False
449
450        feat_bitmap = int(result[0][1], 16)
451
452        return ((1 << (feat_id - feat_start)) & feat_bitmap) != 0
453
454    def update_battery_info(self):
455        """Get the battery info we care for this test."""
456        # The battery parameters we care for this test. The order must match
457        # the output of EC battery command.
458        battery_params = [
459                'V', 'V-desired', 'I', 'I-desired', 'Charging', 'Remaining'
460        ]
461        regex_str_list = []
462
463        for p in battery_params:
464            if p == 'Remaining':
465                regex_str_list.append(p + ':\s+(\d+)\s+')
466            elif p == 'Charging':
467                regex_str_list.append(p + r':\s+(Not Allowed|Allowed)\s+')
468            else:
469                regex_str_list.append(p +
470                                      r':\s+0x[0-9a-f]*\s+=\s+([0-9-]+)\s+')
471
472        # For unknown reasons, servod doesn't always capture the ec
473        # command output. It doesn't happen often, but retry if it does.
474        retries = 3
475        while retries > 0:
476            retries -= 1
477            try:
478                battery_regex_match = self.send_command_get_output(
479                        'battery', regex_str_list)
480                break
481            except (servo.UnresponsiveConsoleError,
482                    servo.ResponsiveConsoleError) as e:
483                if retries <= 0:
484                    raise
485                logging.warning('Failed to get battery status. %s', e)
486        else:
487            battery_regex_match = self.send_command_get_output(
488                    'battery', regex_str_list)
489
490        for i in range(len(battery_params)):
491            if battery_params[i] == 'Charging':
492                self.BATTERY_INFO[
493                        battery_params[i]] = battery_regex_match[i][1]
494            else:
495                self.BATTERY_INFO[battery_params[i]] = int(
496                        battery_regex_match[i][1])
497        logging.debug('Battery info: %s', self.BATTERY_INFO)
498
499    def get_battery_desired_voltage(self, print_result=True):
500        """Get battery desired voltage value."""
501        if not self.BATTERY_INFO:
502            self.update_battery_info()
503        if print_result:
504            logging.info('Battery desired voltage = %d mV',
505                         self.BATTERY_INFO['V-desired'])
506        return self.BATTERY_INFO['V-desired']
507
508    def get_battery_desired_current(self, print_result=True):
509        """Get battery desired current value."""
510        if not self.BATTERY_INFO:
511            self.update_battery_info()
512        if print_result:
513            logging.info('Battery desired current = %d mA',
514                         self.BATTERY_INFO['I-desired'])
515        return self.BATTERY_INFO['I-desired']
516
517    def get_battery_actual_voltage(self, print_result=True):
518        """Get the actual voltage from charger to battery."""
519        if not self.BATTERY_INFO:
520            self.update_battery_info()
521        if print_result:
522            logging.info('Battery actual voltage = %d mV',
523                         self.BATTERY_INFO['V'])
524        return self.BATTERY_INFO['V']
525
526    def get_battery_actual_current(self, print_result=True):
527        """Get the actual current from charger to battery."""
528        if not self.BATTERY_INFO:
529            self.update_battery_info()
530        if print_result:
531            logging.info('Battery actual current = %d mA',
532                         self.BATTERY_INFO['I'])
533        return self.BATTERY_INFO['I']
534
535    def get_battery_remaining(self, print_result=True):
536        """Get battery charge remaining in mAh."""
537        if not self.BATTERY_INFO:
538            self.update_battery_info()
539        if print_result:
540            logging.info("Battery charge remaining = %d mAh",
541                         self.BATTERY_INFO['Remaining'])
542        return self.BATTERY_INFO['Remaining']
543
544    def get_battery_charging_allowed(self, print_result=True):
545        """Get the battery charging state.
546
547        Returns True if charging is allowed.
548        """
549        if not self.BATTERY_INFO:
550            self.update_battery_info()
551        if print_result:
552            logging.info("Battery charging = %s",
553                         self.BATTERY_INFO['Charging'])
554        if self.BATTERY_INFO['Charging'] == 'Allowed':
555            return True
556        return False
557
558
559class ChromeUSBPD(ChromeEC):
560    """Manages control of a Chrome USBPD.
561
562    We control the Chrome EC via the UART of a Servo board. Chrome USBPD
563    provides many interfaces to set and get its behavior via console commands.
564    This class is to abstract these interfaces.
565    """
566
567    def __init__(self, servo):
568        super(ChromeUSBPD, self).__init__(servo, "usbpd_uart")
569