xref: /aosp_15_r20/external/autotest/server/cros/servo/chrome_cr50.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright 2017 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Lifrom __future__ import absolute_import
7*9c5db199SXin Lifrom __future__ import division
8*9c5db199SXin Lifrom __future__ import print_function
9*9c5db199SXin Li
10*9c5db199SXin Liimport functools
11*9c5db199SXin Liimport logging
12*9c5db199SXin Liimport pprint
13*9c5db199SXin Liimport re
14*9c5db199SXin Liimport six
15*9c5db199SXin Lifrom six.moves import range
16*9c5db199SXin Liimport time
17*9c5db199SXin Li
18*9c5db199SXin Lifrom autotest_lib.client.bin import utils
19*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
20*9c5db199SXin Lifrom autotest_lib.client.common_lib.cros import cr50_utils
21*9c5db199SXin Lifrom autotest_lib.server.cros.servo import chrome_ec
22*9c5db199SXin Lifrom autotest_lib.server.cros.servo import servo
23*9c5db199SXin Li
24*9c5db199SXin Li
25*9c5db199SXin Lidef dts_control_command(func):
26*9c5db199SXin Li    """For methods that should only run when dts mode control is supported."""
27*9c5db199SXin Li    @functools.wraps(func)
28*9c5db199SXin Li    def wrapper(instance, *args, **kwargs):
29*9c5db199SXin Li        """Ignore those functions if dts mode control is not supported."""
30*9c5db199SXin Li        if instance._servo.dts_mode_is_valid():
31*9c5db199SXin Li            return func(instance, *args, **kwargs)
32*9c5db199SXin Li        logging.info('Servo setup does not support DTS mode. ignoring %s',
33*9c5db199SXin Li                     func.__name__)
34*9c5db199SXin Li    return wrapper
35*9c5db199SXin Li
36*9c5db199SXin Li
37*9c5db199SXin Liclass ChromeCr50(chrome_ec.ChromeConsole):
38*9c5db199SXin Li    """Manages control of a Chrome Cr50.
39*9c5db199SXin Li
40*9c5db199SXin Li    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
41*9c5db199SXin Li    provides many interfaces to set and get its behavior via console commands.
42*9c5db199SXin Li    This class is to abstract these interfaces.
43*9c5db199SXin Li    """
44*9c5db199SXin Li    PROD_RW_KEYIDS = ['0x87b73b67', '0xde88588d']
45*9c5db199SXin Li    PROD_RO_KEYIDS = ['0xaa66150f']
46*9c5db199SXin Li    OPEN = 'open'
47*9c5db199SXin Li    UNLOCK = 'unlock'
48*9c5db199SXin Li    LOCK = 'lock'
49*9c5db199SXin Li    # The amount of time you need to show physical presence.
50*9c5db199SXin Li    PP_SHORT = 15
51*9c5db199SXin Li    PP_LONG = 300
52*9c5db199SXin Li    CCD_PASSWORD_RATE_LIMIT = 3
53*9c5db199SXin Li    IDLE_COUNT = 'count: (\d+)\s'
54*9c5db199SXin Li    SHORT_WAIT = 3
55*9c5db199SXin Li    # The version has four groups: the partition, the header version, debug
56*9c5db199SXin Li    # descriptor and then version string.
57*9c5db199SXin Li    # There are two partitions A and B. The active partition is marked with a
58*9c5db199SXin Li    # '*'. If it is a debug image '/DBG' is added to the version string. If the
59*9c5db199SXin Li    # image has been corrupted, the version information will be replaced with
60*9c5db199SXin Li    # 'Error'.
61*9c5db199SXin Li    # So the output may look something like this.
62*9c5db199SXin Li    #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
63*9c5db199SXin Li    #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
64*9c5db199SXin Li    # Or like this if the region was corrupted.
65*9c5db199SXin Li    #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
66*9c5db199SXin Li    #   RW_B:    Error
67*9c5db199SXin Li    VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
68*9c5db199SXin Li    INACTIVE_VERSION = VERSION_FORMAT % ''
69*9c5db199SXin Li    ACTIVE_VERSION = VERSION_FORMAT % '\*'
70*9c5db199SXin Li    # Following lines of the version output may print the image board id
71*9c5db199SXin Li    # information. eg.
72*9c5db199SXin Li    # BID A:   5a5a4146:ffffffff:00007f00 Yes
73*9c5db199SXin Li    # BID B:   00000000:00000000:00000000 Yes
74*9c5db199SXin Li    # Use the first group from ACTIVE_VERSION to match the active board id
75*9c5db199SXin Li    # partition.
76*9c5db199SXin Li    BID_ERROR = 'read_board_id: failed'
77*9c5db199SXin Li    BID_FORMAT = ':\s+[a-f0-9:]{26} '
78*9c5db199SXin Li    ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
79*9c5db199SXin Li            BID_ERROR)
80*9c5db199SXin Li    WAKE_CHAR = '\n\n\n\n'
81*9c5db199SXin Li    WAKE_RESPONSE = ['(>|Console is enabled)']
82*9c5db199SXin Li    START_UNLOCK_TIMEOUT = 20
83*9c5db199SXin Li    GETTIME = ['= (\S+)']
84*9c5db199SXin Li    FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
85*9c5db199SXin Li    FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
86*9c5db199SXin Li    MAX_RETRY_COUNT = 10
87*9c5db199SXin Li    CCDSTATE_MAX_RETRY_COUNT = 20
88*9c5db199SXin Li    START_STR = ['((Havn|UART).*Console is enabled;)']
89*9c5db199SXin Li    REBOOT_DELAY_WITH_CCD = 60
90*9c5db199SXin Li    REBOOT_DELAY_WITH_FLEX = 3
91*9c5db199SXin Li    ON_STRINGS = ['enable', 'enabled', 'on']
92*9c5db199SXin Li    CONSERVATIVE_CCD_WAIT = 10
93*9c5db199SXin Li    CCD_SHORT_PRESSES = 5
94*9c5db199SXin Li    CAP_IS_ACCESSIBLE = 0
95*9c5db199SXin Li    CAP_SETTING = 1
96*9c5db199SXin Li    CAP_REQ = 2
97*9c5db199SXin Li    GET_CAP_TRIES = 20
98*9c5db199SXin Li    CAP_ALWAYS = 'Always'
99*9c5db199SXin Li    # Regex to match the valid capability settings.
100*9c5db199SXin Li    CAP_STATES = '(%s|Default|IfOpened|UnlessLocked)' % CAP_ALWAYS
101*9c5db199SXin Li    # List of all cr50 ccd capabilities. Same order of 'ccd' output
102*9c5db199SXin Li    CAP_NAMES = [
103*9c5db199SXin Li            'UartGscRxAPTx', 'UartGscTxAPRx', 'UartGscRxECTx', 'UartGscTxECRx',
104*9c5db199SXin Li            'FlashAP', 'FlashEC', 'OverrideWP', 'RebootECAP', 'GscFullConsole',
105*9c5db199SXin Li            'UnlockNoReboot', 'UnlockNoShortPP', 'OpenNoTPMWipe',
106*9c5db199SXin Li            'OpenNoLongPP', 'BatteryBypassPP', '(UpdateNoTPMWipe|Unused)',
107*9c5db199SXin Li            'I2C', 'FlashRead', 'OpenNoDevMode', 'OpenFromUSB', 'OverrideBatt'
108*9c5db199SXin Li    ]
109*9c5db199SXin Li    # There are two capability formats. Match both.
110*9c5db199SXin Li    #  UartGscRxECTx   Y 3=IfOpened
111*9c5db199SXin Li    #  or
112*9c5db199SXin Li    #  UartGscRxECTx   Y 0=Default (Always)
113*9c5db199SXin Li    # Make sure the last word is at the end of the line. The next line will
114*9c5db199SXin Li    # start with some whitespace, so account for that too.
115*9c5db199SXin Li    CAP_FORMAT = '\s+(Y|-) \d\=%s( \(%s\))?[\r\n]+\s*' % (CAP_STATES,
116*9c5db199SXin Li                                                          CAP_STATES)
117*9c5db199SXin Li    # Be as specific as possible with the 'ccd' output, so the test will notice
118*9c5db199SXin Li    # missing characters and retry getting the output. Name each group, so the
119*9c5db199SXin Li    # test can extract the field information into a dictionary.
120*9c5db199SXin Li    # CCD_FIELDS is used to order the regex when searching for multiple fields
121*9c5db199SXin Li    CCD_FIELDS = ['State', 'Password', 'Flags', 'Capabilities', 'TPM']
122*9c5db199SXin Li    # CCD_FORMAT has the field names as keys and the expected output as the
123*9c5db199SXin Li    # value.
124*9c5db199SXin Li    CCD_FORMAT = {
125*9c5db199SXin Li        'State' : '(State: (?P<State>Opened|Locked|Unlocked))',
126*9c5db199SXin Li        'Password' : '(Password: (?P<Password>set|none))',
127*9c5db199SXin Li        'Flags' : '(Flags: (?P<Flags>\S*))',
128*9c5db199SXin Li        'Capabilities' : '(Capabilities:.*(?P<Capabilities>%s))' %
129*9c5db199SXin Li                         (CAP_FORMAT.join(CAP_NAMES) + CAP_FORMAT),
130*9c5db199SXin Li        'TPM' : '(TPM:(?P<TPM>[ \S]*)\r)',
131*9c5db199SXin Li    }
132*9c5db199SXin Li
133*9c5db199SXin Li    # CR50 Board Properties as defined in platform/ec/board/cr50/scratch-reg1.h
134*9c5db199SXin Li    BOARD_PROP = {
135*9c5db199SXin Li            'BOARD_PERIPH_CONFIG_SPI': (1 << 0, None),
136*9c5db199SXin Li            'BOARD_PERIPH_CONFIG_SPI': (1 << 0, None),
137*9c5db199SXin Li            'BOARD_PERIPH_CONFIG_I2C': (1 << 1, None),
138*9c5db199SXin Li            'BOARD_PERIPH_CONFIG_I2C': (1 << 1, None),
139*9c5db199SXin Li            'BOARD_NEEDS_SYS_RST_PULL_UP': (1 << 5, None),
140*9c5db199SXin Li            'BOARD_USE_PLT_RESET': (1 << 6, None),
141*9c5db199SXin Li            'BOARD_WP_ASSERTED': (1 << 8, None),
142*9c5db199SXin Li            'BOARD_FORCING_WP': (1 << 9, None),
143*9c5db199SXin Li            'BOARD_NO_RO_UART': (1 << 10, None),
144*9c5db199SXin Li            'BOARD_CCD_UNLOCKED': (1 << 11, 3 << 11),
145*9c5db199SXin Li            'BOARD_CCD_OPENED': (2 << 11, 3 << 11),
146*9c5db199SXin Li            'BOARD_DEEP_SLEEP_DISABLED': (1 << 13, None),
147*9c5db199SXin Li            'BOARD_DETECT_AP_WITH_UART': (1 << 14, None),
148*9c5db199SXin Li            'BOARD_ITE_EC_SYNC_NEEDED': (1 << 15, None),
149*9c5db199SXin Li            'BOARD_WP_DISABLE_DELAY': (1 << 16, None),
150*9c5db199SXin Li            'BOARD_CLOSED_SOURCE_SET1': (1 << 17, None),
151*9c5db199SXin Li            'BOARD_CLOSED_LOOP_RESET': (1 << 18, None),
152*9c5db199SXin Li            'BOARD_NO_INA_SUPPORT': (1 << 19, None),
153*9c5db199SXin Li            'BOARD_ALLOW_CHANGE_TPM_MODE': (1 << 20, None),
154*9c5db199SXin Li            'BOARD_EC_CR50_COMM_SUPPORT': (1 << 21, None),
155*9c5db199SXin Li            'BOARD_CCD_REC_LID_PIN_DIOA1': (1 << 22, 3 << 22),
156*9c5db199SXin Li            'BOARD_CCD_REC_LID_PIN_DIOA9': (2 << 22, 3 << 22),
157*9c5db199SXin Li            'BOARD_CCD_REC_LID_PIN_DIOA12': (3 << 22, 3 << 22)
158*9c5db199SXin Li    }
159*9c5db199SXin Li
160*9c5db199SXin Li    # CR50 reset flags as defined in platform ec_commands.h. These are only the
161*9c5db199SXin Li    # flags used by cr50.
162*9c5db199SXin Li    RESET_FLAGS = {
163*9c5db199SXin Li           'RESET_FLAG_OTHER'            : 1 << 0,
164*9c5db199SXin Li           'RESET_FLAG_BROWNOUT'         : 1 << 2,
165*9c5db199SXin Li           'RESET_FLAG_POWER_ON'         : 1 << 3,
166*9c5db199SXin Li           'RESET_FLAG_SOFT'             : 1 << 5,
167*9c5db199SXin Li           'RESET_FLAG_HIBERNATE'        : 1 << 6,
168*9c5db199SXin Li           'RESET_FLAG_RTC_ALARM'        : 3 << 7,
169*9c5db199SXin Li           'RESET_FLAG_WAKE_PIN'         : 1 << 8,
170*9c5db199SXin Li           'RESET_FLAG_HARD'             : 1 << 11,
171*9c5db199SXin Li           'RESET_FLAG_USB_RESUME'       : 1 << 14,
172*9c5db199SXin Li           'RESET_FLAG_RDD'              : 1 << 15,
173*9c5db199SXin Li           'RESET_FLAG_RBOX'             : 1 << 16,
174*9c5db199SXin Li           'RESET_FLAG_SECURITY'         : 1 << 17,
175*9c5db199SXin Li    }
176*9c5db199SXin Li    FIPS_RE = r' ([^ ]*)approved.*allowed: (1|0)'
177*9c5db199SXin Li    # CCD Capabilities used for c2d2 control drivers.
178*9c5db199SXin Li    SERVO_DRV_CAPS = ['OverrideWP', 'GscFullConsole', 'RebootECAP']
179*9c5db199SXin Li    # Cr50 may have flash operation errors during the test. Here's an example
180*9c5db199SXin Li    # of one error message.
181*9c5db199SXin Li    # do_flash_op:245 errors 20 fsh_pe_control 40720004
182*9c5db199SXin Li    # The stuff after the ':' may change, but all flash operation errors
183*9c5db199SXin Li    # contain do_flash_op. do_flash_op is only ever printed if there is an
184*9c5db199SXin Li    # error during the flash operation. Just search for do_flash_op to simplify
185*9c5db199SXin Li    # the search string and make it applicable to all flash op errors.
186*9c5db199SXin Li    FLASH_OP_ERROR_MSG = 'do_flash_op'
187*9c5db199SXin Li    # USB issues may show up with the timer sof calibration overflow interrupt.
188*9c5db199SXin Li    # Count these during cleanup.
189*9c5db199SXin Li    USB_ERROR = 'timer_sof_calibration_overflow_int'
190*9c5db199SXin Li    # Message printed during watchdog reset.
191*9c5db199SXin Li    WATCHDOG_RST = 'WATCHDOG PC'
192*9c5db199SXin Li    # ===============================================================
193*9c5db199SXin Li    # AP_RO strings
194*9c5db199SXin Li    # Cr50 only supports v2
195*9c5db199SXin Li    AP_RO_VERSIONS = [1]
196*9c5db199SXin Li    AP_RO_HASH_RE = r'sha256 (hash) ([0-9a-f]{64})'
197*9c5db199SXin Li    AP_RO_UNSUPPORTED_UNPROGRAMMED = 'RO verification not programmed'
198*9c5db199SXin Li    AP_RO_UNSUPPORTED_BID_BLOCKED = 'BID blocked'
199*9c5db199SXin Li    AP_RO_REASON_RE = r'(ap_ro_check_unsupported): (.*)\]'
200*9c5db199SXin Li    AP_RO_RESULT_RE = r'(result)\s*: (\d)'
201*9c5db199SXin Li    AP_RO_SUPPORTED_RE = r'(supported)\s*: (yes|no)'
202*9c5db199SXin Li    AP_RO_UNSUPPORTED_OUTPUT = [
203*9c5db199SXin Li            AP_RO_REASON_RE, AP_RO_RESULT_RE, AP_RO_SUPPORTED_RE
204*9c5db199SXin Li    ]
205*9c5db199SXin Li    AP_RO_SAVED_OUTPUT = [AP_RO_RESULT_RE, AP_RO_SUPPORTED_RE, AP_RO_HASH_RE]
206*9c5db199SXin Li
207*9c5db199SXin Li    # ===============================================================
208*9c5db199SXin Li
209*9c5db199SXin Li    def __init__(self, servo, faft_config):
210*9c5db199SXin Li        """Initializes a ChromeCr50 object.
211*9c5db199SXin Li
212*9c5db199SXin Li        @param servo: A servo object.
213*9c5db199SXin Li        @param faft_config: A faft config object.
214*9c5db199SXin Li        """
215*9c5db199SXin Li        super(ChromeCr50, self).__init__(servo, 'cr50_uart')
216*9c5db199SXin Li        self.faft_config = faft_config
217*9c5db199SXin Li
218*9c5db199SXin Li    def wake_cr50(self):
219*9c5db199SXin Li        """Wake up cr50 by sending some linebreaks and wait for the response"""
220*9c5db199SXin Li        for i in range(self.MAX_RETRY_COUNT):
221*9c5db199SXin Li            try:
222*9c5db199SXin Li                rv = super(ChromeCr50, self).send_command_get_output(
223*9c5db199SXin Li                        self.WAKE_CHAR, self.WAKE_RESPONSE)
224*9c5db199SXin Li                logging.debug('wake result %r', rv)
225*9c5db199SXin Li                return
226*9c5db199SXin Li            except servo.ResponsiveConsoleError as e:
227*9c5db199SXin Li                logging.info("Console responsive, but couldn't match wake "
228*9c5db199SXin Li                             "response %s", e)
229*9c5db199SXin Li        raise servo.ResponsiveConsoleError('Unable to wake cr50')
230*9c5db199SXin Li
231*9c5db199SXin Li
232*9c5db199SXin Li    def send_command(self, commands):
233*9c5db199SXin Li        """Send command through UART.
234*9c5db199SXin Li
235*9c5db199SXin Li        Cr50 will drop characters input to the UART when it resumes from sleep.
236*9c5db199SXin Li        If servo is not using ccd, send some characters before sending the
237*9c5db199SXin Li        real command to make sure cr50 is awake.
238*9c5db199SXin Li
239*9c5db199SXin Li        @param commands: the command string to send to cr50
240*9c5db199SXin Li        """
241*9c5db199SXin Li        if self._servo.main_device_is_flex():
242*9c5db199SXin Li            self.wake_cr50()
243*9c5db199SXin Li        super(ChromeCr50, self).send_command(commands)
244*9c5db199SXin Li
245*9c5db199SXin Li
246*9c5db199SXin Li    def set_cap(self, cap, setting):
247*9c5db199SXin Li        """Set the capability to setting
248*9c5db199SXin Li
249*9c5db199SXin Li        @param cap: The capability string
250*9c5db199SXin Li        @param setting: The setting to set the capability to.
251*9c5db199SXin Li        """
252*9c5db199SXin Li        self.set_caps({ cap : setting })
253*9c5db199SXin Li
254*9c5db199SXin Li
255*9c5db199SXin Li    def set_caps(self, cap_dict):
256*9c5db199SXin Li        """Use cap_dict to set all the cap values
257*9c5db199SXin Li
258*9c5db199SXin Li        Set all of the capabilities in cap_dict to the correct config.
259*9c5db199SXin Li
260*9c5db199SXin Li        @param cap_dict: A dictionary with the capability as key and the desired
261*9c5db199SXin Li                         setting as values
262*9c5db199SXin Li        """
263*9c5db199SXin Li        for cap, config in six.iteritems(cap_dict):
264*9c5db199SXin Li            self.send_command('ccd set %s %s' % (cap, config))
265*9c5db199SXin Li        current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING)
266*9c5db199SXin Li        for cap, config in six.iteritems(cap_dict):
267*9c5db199SXin Li            if (current_cap_settings[cap].lower() !=
268*9c5db199SXin Li                config.lower()):
269*9c5db199SXin Li                raise error.TestFail('Failed to set %s to %s' % (cap, config))
270*9c5db199SXin Li
271*9c5db199SXin Li
272*9c5db199SXin Li    def get_cap_overview(self, cap_dict):
273*9c5db199SXin Li        """Get a basic overview of the capability dictionary
274*9c5db199SXin Li
275*9c5db199SXin Li        If all capabilities are set to Default, ccd has been reset to default.
276*9c5db199SXin Li        If all capabilities are set to Always, ccd is in factory mode.
277*9c5db199SXin Li
278*9c5db199SXin Li        @param cap_dict: A dictionary of the capability settings
279*9c5db199SXin Li        @return: A tuple of the capability overview (in factory mode, is reset)
280*9c5db199SXin Li        """
281*9c5db199SXin Li        in_factory_mode = True
282*9c5db199SXin Li        is_reset = True
283*9c5db199SXin Li        for cap, cap_info in six.iteritems(cap_dict):
284*9c5db199SXin Li            cap_setting = cap_info[self.CAP_SETTING]
285*9c5db199SXin Li            if cap_setting != 'Always':
286*9c5db199SXin Li                in_factory_mode = False
287*9c5db199SXin Li            if cap_setting != 'Default':
288*9c5db199SXin Li                is_reset = False
289*9c5db199SXin Li        return in_factory_mode, is_reset
290*9c5db199SXin Li
291*9c5db199SXin Li
292*9c5db199SXin Li    def password_is_reset(self):
293*9c5db199SXin Li        """Returns True if the password is cleared"""
294*9c5db199SXin Li        return self.get_ccd_info('Password') == 'none'
295*9c5db199SXin Li
296*9c5db199SXin Li
297*9c5db199SXin Li    def ccd_is_reset(self):
298*9c5db199SXin Li        """Returns True if the ccd is reset
299*9c5db199SXin Li
300*9c5db199SXin Li        The password must be cleared, write protect and battery presence must
301*9c5db199SXin Li        follow battery presence, and all capabilities must be Always
302*9c5db199SXin Li        """
303*9c5db199SXin Li        return (self.password_is_reset() and self.wp_is_reset() and
304*9c5db199SXin Li                self.batt_pres_is_reset() and
305*9c5db199SXin Li                self.get_cap_overview(self.get_cap_dict())[1])
306*9c5db199SXin Li
307*9c5db199SXin Li
308*9c5db199SXin Li    def wp_is_reset(self):
309*9c5db199SXin Li        """Returns True if wp is reset to follow batt pres at all times"""
310*9c5db199SXin Li        follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state()
311*9c5db199SXin Li        return follow_batt_pres and follow_batt_pres_atboot
312*9c5db199SXin Li
313*9c5db199SXin Li
314*9c5db199SXin Li    def get_wp_state(self):
315*9c5db199SXin Li        """Get the current write protect and atboot state
316*9c5db199SXin Li
317*9c5db199SXin Li        The atboot setting cannot really be determined now if it is set to
318*9c5db199SXin Li        follow battery presence. It is likely to remain the same after reboot,
319*9c5db199SXin Li        but who knows. If the third element of the tuple is True, the last
320*9c5db199SXin Li        element will not be that useful
321*9c5db199SXin Li
322*9c5db199SXin Li        @return: a tuple with the current write protect state
323*9c5db199SXin Li                (True if current state is to follow batt presence,
324*9c5db199SXin Li                 True if write protect is enabled,
325*9c5db199SXin Li                 True if current state is to follow batt presence atboot,
326*9c5db199SXin Li                 True if write protect is enabled atboot)
327*9c5db199SXin Li        """
328*9c5db199SXin Li        rv = self.send_command_retry_get_output('wp',
329*9c5db199SXin Li                ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?'
330*9c5db199SXin Li                 '(follow|enabled|disabled)'], safe=True)[0]
331*9c5db199SXin Li        _, forced, enabled, _, atboot = rv
332*9c5db199SXin Li        logging.debug(rv)
333*9c5db199SXin Li        return (not forced, enabled =='enabled',
334*9c5db199SXin Li                atboot == 'follow', atboot == 'enabled')
335*9c5db199SXin Li
336*9c5db199SXin Li
337*9c5db199SXin Li    def in_dev_mode(self):
338*9c5db199SXin Li        """Return True if cr50 thinks the device is in dev mode"""
339*9c5db199SXin Li        return 'dev_mode' in self.get_ccd_info('TPM')
340*9c5db199SXin Li
341*9c5db199SXin Li
342*9c5db199SXin Li    def get_ccd_info(self, field=None):
343*9c5db199SXin Li        """Get the current ccd state.
344*9c5db199SXin Li
345*9c5db199SXin Li        Take the output of 'ccd' and convert it to a dictionary.
346*9c5db199SXin Li
347*9c5db199SXin Li        @param: the ccd info param to get or None to get the full ccd output
348*9c5db199SXin Li                dictionary.
349*9c5db199SXin Li        @return: the field value or a dictionary with the ccd field name as the
350*9c5db199SXin Li                 key and the setting as the value.
351*9c5db199SXin Li        """
352*9c5db199SXin Li
353*9c5db199SXin Li        if field:
354*9c5db199SXin Li            match_value = self.CCD_FORMAT[field]
355*9c5db199SXin Li        else:
356*9c5db199SXin Li            values = [ self.CCD_FORMAT[field] for field in self.CCD_FIELDS ]
357*9c5db199SXin Li            match_value = '.*'.join(values)
358*9c5db199SXin Li        matched_output = None
359*9c5db199SXin Li        original_timeout = float(self._servo.get('cr50_uart_timeout'))
360*9c5db199SXin Li        # Change the console timeout to 10s, it may take longer than 3s to read
361*9c5db199SXin Li        # ccd info
362*9c5db199SXin Li        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
363*9c5db199SXin Li        for i in range(self.GET_CAP_TRIES):
364*9c5db199SXin Li            try:
365*9c5db199SXin Li                # If some ccd output is dropped and the output doesn't match the
366*9c5db199SXin Li                # expected ccd output format, send_command_get_output will wait the
367*9c5db199SXin Li                # full CONSERVATIVE_CCD_WAIT even though ccd is done printing. Use
368*9c5db199SXin Li                # re to search the command output instead of
369*9c5db199SXin Li                # send_safe_command_get_output, so we don't have to wait the full
370*9c5db199SXin Li                # timeout if output is dropped.
371*9c5db199SXin Li                rv = self.send_command_retry_get_output('ccd', ['ccd.*>'],
372*9c5db199SXin Li                                                        safe=True)[0]
373*9c5db199SXin Li                matched_output = re.search(match_value, rv, re.DOTALL)
374*9c5db199SXin Li                if matched_output:
375*9c5db199SXin Li                    break
376*9c5db199SXin Li                logging.info('try %d: could not match ccd output %s', i, rv)
377*9c5db199SXin Li            except Exception as e:
378*9c5db199SXin Li                logging.info('try %d got error %s', i, str(e))
379*9c5db199SXin Li
380*9c5db199SXin Li        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
381*9c5db199SXin Li        if not matched_output:
382*9c5db199SXin Li            raise error.TestFail('Could not get ccd output')
383*9c5db199SXin Li        matched_dict = matched_output.groupdict()
384*9c5db199SXin Li        logging.info('Current CCD settings:\n%s', pprint.pformat(matched_dict))
385*9c5db199SXin Li        if field:
386*9c5db199SXin Li            return matched_dict.get(field)
387*9c5db199SXin Li        return matched_dict
388*9c5db199SXin Li
389*9c5db199SXin Li
390*9c5db199SXin Li    def get_cap(self, cap):
391*9c5db199SXin Li        """Returns the capabilitiy from the capability dictionary"""
392*9c5db199SXin Li        return self.get_cap_dict()[cap]
393*9c5db199SXin Li
394*9c5db199SXin Li
395*9c5db199SXin Li    def get_cap_dict(self, info=None):
396*9c5db199SXin Li        """Get the current ccd capability settings.
397*9c5db199SXin Li
398*9c5db199SXin Li        The capability may be using the 'Default' setting. That doesn't say much
399*9c5db199SXin Li        about the ccd state required to use the capability. Return all ccd
400*9c5db199SXin Li        information in the cap_dict
401*9c5db199SXin Li        [is accessible, setting, requirement]
402*9c5db199SXin Li
403*9c5db199SXin Li        @param info: Only fill the cap_dict with the requested information:
404*9c5db199SXin Li                     CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ
405*9c5db199SXin Li        @return: A dictionary with the capability as the key a list of the
406*9c5db199SXin Li                 current settings as the value [is_accessible, setting,
407*9c5db199SXin Li                 requirement]
408*9c5db199SXin Li        """
409*9c5db199SXin Li        # Add whitespace at the end, so we can still match the last line.
410*9c5db199SXin Li        cap_info_str = self.get_ccd_info('Capabilities') + '\r\n'
411*9c5db199SXin Li        cap_settings = re.findall('(\S+) ' + self.CAP_FORMAT,
412*9c5db199SXin Li                                  cap_info_str)
413*9c5db199SXin Li        caps = {}
414*9c5db199SXin Li        for cap, accessible, setting, _, required in cap_settings:
415*9c5db199SXin Li            # If there's only 1 value after =, then the setting is the
416*9c5db199SXin Li            # requirement.
417*9c5db199SXin Li            if not required:
418*9c5db199SXin Li                required = setting
419*9c5db199SXin Li            cap_info = [accessible == 'Y', setting, required]
420*9c5db199SXin Li            if info is not None:
421*9c5db199SXin Li                caps[cap] = cap_info[info]
422*9c5db199SXin Li            else:
423*9c5db199SXin Li                caps[cap] = cap_info
424*9c5db199SXin Li        logging.debug(pprint.pformat(caps))
425*9c5db199SXin Li        return caps
426*9c5db199SXin Li
427*9c5db199SXin Li
428*9c5db199SXin Li    def send_command_get_output(self, command, regexp_list):
429*9c5db199SXin Li        """Send command through UART and wait for response.
430*9c5db199SXin Li
431*9c5db199SXin Li        Cr50 will drop characters input to the UART when it resumes from sleep.
432*9c5db199SXin Li        If servo is not using ccd, send some characters before sending the
433*9c5db199SXin Li        real command to make sure cr50 is awake.
434*9c5db199SXin Li
435*9c5db199SXin Li        @param command: the command to send
436*9c5db199SXin Li        @param regexp_list: The list of regular expressions to match in the
437*9c5db199SXin Li                            command output
438*9c5db199SXin Li        @return: A list of matched output
439*9c5db199SXin Li        """
440*9c5db199SXin Li        if self._servo.main_device_is_flex():
441*9c5db199SXin Li            self.wake_cr50()
442*9c5db199SXin Li
443*9c5db199SXin Li        # We have started prepending '\n' to separate cr50 console junk from
444*9c5db199SXin Li        # the real command. If someone is just searching for .*>, then they will
445*9c5db199SXin Li        # only get the output from the first '\n' we added. Raise an error to
446*9c5db199SXin Li        # change the test to look for something more specific ex command.*>.
447*9c5db199SXin Li        # cr50 will print the command in the output, so that is an easy way to
448*9c5db199SXin Li        # modify '.*>' to match the real command output.
449*9c5db199SXin Li        if '.*>' in regexp_list:
450*9c5db199SXin Li            raise error.TestError('Send more specific regexp %r %r' % (command,
451*9c5db199SXin Li                    regexp_list))
452*9c5db199SXin Li
453*9c5db199SXin Li        # prepend \n to separate the command from any junk that may have been
454*9c5db199SXin Li        # sent to the cr50 uart.
455*9c5db199SXin Li        command = '\n' + command
456*9c5db199SXin Li        return super(ChromeCr50, self).send_command_get_output(command,
457*9c5db199SXin Li                                                               regexp_list)
458*9c5db199SXin Li
459*9c5db199SXin Li
460*9c5db199SXin Li    def send_safe_command_get_output(self, command, regexp_list,
461*9c5db199SXin Li            channel_mask=0x1):
462*9c5db199SXin Li        """Restrict the console channels while sending console commands.
463*9c5db199SXin Li
464*9c5db199SXin Li        @param command: the command to send
465*9c5db199SXin Li        @param regexp_list: The list of regular expressions to match in the
466*9c5db199SXin Li                            command output
467*9c5db199SXin Li        @param channel_mask: The mask to pass to 'chan' prior to running the
468*9c5db199SXin Li                             command, indicating which channels should remain
469*9c5db199SXin Li                             enabled (0x1 is command output)
470*9c5db199SXin Li        @return: A list of matched output
471*9c5db199SXin Li        """
472*9c5db199SXin Li        self.send_command('chan save')
473*9c5db199SXin Li        self.send_command('chan 0x%x' % channel_mask)
474*9c5db199SXin Li        try:
475*9c5db199SXin Li            rv = self.send_command_get_output(command, regexp_list)
476*9c5db199SXin Li        finally:
477*9c5db199SXin Li            self.send_command('chan restore')
478*9c5db199SXin Li        return rv
479*9c5db199SXin Li
480*9c5db199SXin Li
481*9c5db199SXin Li    def send_command_retry_get_output(self, command, regexp_list, safe=False,
482*9c5db199SXin Li                                      compare_output=False, retries=MAX_RETRY_COUNT):
483*9c5db199SXin Li        """Retry the command 5 times if you get a timeout or drop some output
484*9c5db199SXin Li
485*9c5db199SXin Li
486*9c5db199SXin Li        @param command: the command string
487*9c5db199SXin Li        @param regexp_list: the regex to search for
488*9c5db199SXin Li        @param safe: use send_safe_command_get_output if True otherwise use
489*9c5db199SXin Li                     send_command_get_output
490*9c5db199SXin Li        @param compare_output: look for reproducible output
491*9c5db199SXin Li        """
492*9c5db199SXin Li        send_command = (self.send_safe_command_get_output if safe else
493*9c5db199SXin Li                        self.send_command_get_output)
494*9c5db199SXin Li        err = 'no consistent output' if compare_output else 'unknown'
495*9c5db199SXin Li        past_rv = []
496*9c5db199SXin Li        for i in range(retries):
497*9c5db199SXin Li            try:
498*9c5db199SXin Li                rv = send_command(command, regexp_list)
499*9c5db199SXin Li                if not compare_output or rv in past_rv:
500*9c5db199SXin Li                    return rv
501*9c5db199SXin Li                if past_rv:
502*9c5db199SXin Li                    logging.debug('%d %s not in %s', i, rv, past_rv)
503*9c5db199SXin Li                past_rv.append(rv)
504*9c5db199SXin Li            except Exception as e:
505*9c5db199SXin Li                err = e
506*9c5db199SXin Li                logging.info('attempt %d %r: %s %s', i, command, type(e),
507*9c5db199SXin Li                             str(e))
508*9c5db199SXin Li        if compare_output:
509*9c5db199SXin Li            logging.info('No consistent output for %r %s', command,
510*9c5db199SXin Li                         pprint.pformat(past_rv))
511*9c5db199SXin Li        raise error.TestError('Issue sending %r command: %r' % (command, err))
512*9c5db199SXin Li
513*9c5db199SXin Li
514*9c5db199SXin Li    def get_deep_sleep_count(self):
515*9c5db199SXin Li        """Get the deep sleep count from the idle task"""
516*9c5db199SXin Li        result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT],
517*9c5db199SXin Li                                                    safe=True)
518*9c5db199SXin Li        return int(result[0][1])
519*9c5db199SXin Li
520*9c5db199SXin Li
521*9c5db199SXin Li    def clear_deep_sleep_count(self):
522*9c5db199SXin Li        """Clear the deep sleep count"""
523*9c5db199SXin Li        self.send_command('idle c')
524*9c5db199SXin Li        if self.get_deep_sleep_count():
525*9c5db199SXin Li            raise error.TestFail("Could not clear deep sleep count")
526*9c5db199SXin Li
527*9c5db199SXin Li
528*9c5db199SXin Li    def get_board_properties(self):
529*9c5db199SXin Li        """Get information from the version command"""
530*9c5db199SXin Li        rv = self.send_command_retry_get_output('brdprop',
531*9c5db199SXin Li                ['properties = (\S+)\s'], safe=True)
532*9c5db199SXin Li        return int(rv[0][1], 16)
533*9c5db199SXin Li
534*9c5db199SXin Li
535*9c5db199SXin Li    def uses_board_property(self, prop_name):
536*9c5db199SXin Li        """Returns 1 if the given property is set, or 0 otherwise
537*9c5db199SXin Li
538*9c5db199SXin Li        @param prop_name: a property name in string type.
539*9c5db199SXin Li        """
540*9c5db199SXin Li        brdprop = self.get_board_properties()
541*9c5db199SXin Li        (prop, mask) = self.BOARD_PROP[prop_name]
542*9c5db199SXin Li        # Use the board property value for the mask if no mask is given.
543*9c5db199SXin Li        mask = mask or prop
544*9c5db199SXin Li        return (brdprop & mask) == prop
545*9c5db199SXin Li
546*9c5db199SXin Li
547*9c5db199SXin Li    def has_command(self, cmd):
548*9c5db199SXin Li        """Returns 1 if cr50 has the command 0 if it doesn't"""
549*9c5db199SXin Li        try:
550*9c5db199SXin Li            self.send_command_retry_get_output('help', [cmd],
551*9c5db199SXin Li                                               safe=True,
552*9c5db199SXin Li                                               retries=3)
553*9c5db199SXin Li        except:
554*9c5db199SXin Li            logging.info("Image does not include '%s' command", cmd)
555*9c5db199SXin Li            return 0
556*9c5db199SXin Li        return 1
557*9c5db199SXin Li
558*9c5db199SXin Li
559*9c5db199SXin Li    def reboot(self):
560*9c5db199SXin Li        """Reboot Cr50 and wait for cr50 to reset"""
561*9c5db199SXin Li        self.wait_for_reboot(cmd='reboot', timeout=10)
562*9c5db199SXin Li
563*9c5db199SXin Li
564*9c5db199SXin Li    def _uart_wait_for_reboot(self, cmd='\n', timeout=60):
565*9c5db199SXin Li        """Use uart to wait for cr50 to reboot.
566*9c5db199SXin Li
567*9c5db199SXin Li        If a command is given run it and wait for cr50 to reboot. Monitor
568*9c5db199SXin Li        the cr50 uart to detect the reset. Wait up to timeout seconds
569*9c5db199SXin Li        for the reset.
570*9c5db199SXin Li
571*9c5db199SXin Li        @param cmd: the command to run to reset cr50.
572*9c5db199SXin Li        @param timeout: seconds to wait to detect the reboot.
573*9c5db199SXin Li        """
574*9c5db199SXin Li        original_timeout = float(self._servo.get('cr50_uart_timeout'))
575*9c5db199SXin Li        # Change the console timeout to timeout, so we wait at least that long
576*9c5db199SXin Li        # for cr50 to print the start string.
577*9c5db199SXin Li        self._servo.set_nocheck('cr50_uart_timeout', timeout)
578*9c5db199SXin Li        try:
579*9c5db199SXin Li            self.send_command_get_output(cmd, self.START_STR)
580*9c5db199SXin Li            logging.debug('Detected cr50 reboot')
581*9c5db199SXin Li        except error.TestFail as e:
582*9c5db199SXin Li            logging.debug('Failed to detect cr50 reboot')
583*9c5db199SXin Li        # Reset the timeout.
584*9c5db199SXin Li        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
585*9c5db199SXin Li
586*9c5db199SXin Li
587*9c5db199SXin Li    def wait_for_reboot(self, cmd='\n', timeout=60):
588*9c5db199SXin Li        """Wait for cr50 to reboot
589*9c5db199SXin Li
590*9c5db199SXin Li        Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if
591*9c5db199SXin Li        necessary.
592*9c5db199SXin Li
593*9c5db199SXin Li        @param cmd: the command to run to reset cr50.
594*9c5db199SXin Li        @param timeout: seconds to wait to detect the reboot.
595*9c5db199SXin Li        """
596*9c5db199SXin Li        logging.info('Wait up to %s seconds for reboot (%s)', timeout,
597*9c5db199SXin Li                     cmd.strip())
598*9c5db199SXin Li        if self._servo.main_device_is_ccd():
599*9c5db199SXin Li            self.send_command(cmd)
600*9c5db199SXin Li            # Cr50 USB is reset when it reboots. Wait for the CCD connection to
601*9c5db199SXin Li            # go down to detect the reboot.
602*9c5db199SXin Li            self.wait_for_ccd_disable(timeout, raise_error=False)
603*9c5db199SXin Li            self.ccd_enable()
604*9c5db199SXin Li        else:
605*9c5db199SXin Li            self._uart_wait_for_reboot(cmd, timeout)
606*9c5db199SXin Li
607*9c5db199SXin Li        # On most devices, a Cr50 reset will cause an AP reset. Force this to
608*9c5db199SXin Li        # happen on devices where the AP is left down.
609*9c5db199SXin Li        if not self.faft_config.ap_up_after_cr50_reboot:
610*9c5db199SXin Li            # Reset the DUT a few seconds after cr50 reboot.
611*9c5db199SXin Li            time.sleep(self.SHORT_WAIT)
612*9c5db199SXin Li            logging.info('Resetting DUT after Cr50 reset')
613*9c5db199SXin Li            self._servo.get_power_state_controller().reset()
614*9c5db199SXin Li
615*9c5db199SXin Li
616*9c5db199SXin Li    def set_board_id(self, chip_bid, chip_flags):
617*9c5db199SXin Li        """Set the chip board id type and flags."""
618*9c5db199SXin Li        self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
619*9c5db199SXin Li
620*9c5db199SXin Li
621*9c5db199SXin Li    def get_board_id(self):
622*9c5db199SXin Li        """Get the chip board id type and flags.
623*9c5db199SXin Li
624*9c5db199SXin Li        bid_type_inv will be '' if the bid output doesn't show it. If no board
625*9c5db199SXin Li        id type inv is shown, then board id is erased will just check the type
626*9c5db199SXin Li        and flags.
627*9c5db199SXin Li
628*9c5db199SXin Li        @returns a tuple (A string of bid_type:bid_type_inv:bid_flags,
629*9c5db199SXin Li                          True if board id is erased)
630*9c5db199SXin Li        """
631*9c5db199SXin Li        bid = self.send_command_retry_get_output('bid',
632*9c5db199SXin Li                    ['Board ID: (\S{8}):?(|\S{8}), flags (\S{8})\s'],
633*9c5db199SXin Li                    safe=True)[0][1:]
634*9c5db199SXin Li        bid_str = ':'.join(bid)
635*9c5db199SXin Li        bid_is_erased =  set(bid).issubset({'', 'ffffffff'})
636*9c5db199SXin Li        logging.info('chip board id: %s', bid_str)
637*9c5db199SXin Li        logging.info('chip board id is erased: %s',
638*9c5db199SXin Li                     'yes' if bid_is_erased else 'no')
639*9c5db199SXin Li        return bid_str, bid_is_erased
640*9c5db199SXin Li
641*9c5db199SXin Li
642*9c5db199SXin Li    def eraseflashinfo(self, retries=10):
643*9c5db199SXin Li        """Run eraseflashinfo.
644*9c5db199SXin Li
645*9c5db199SXin Li        @returns True if the board id is erased
646*9c5db199SXin Li        """
647*9c5db199SXin Li        for i in range(retries):
648*9c5db199SXin Li            # The console could drop characters while matching 'eraseflashinfo'.
649*9c5db199SXin Li            # Retry if the command times out. It's ok to run eraseflashinfo
650*9c5db199SXin Li            # multiple times.
651*9c5db199SXin Li            rv = self.send_command_retry_get_output(
652*9c5db199SXin Li                    'eraseflashinfo', ['eraseflashinfo(.*)>'])[0][1].strip()
653*9c5db199SXin Li            logging.info('eraseflashinfo output: %r', rv)
654*9c5db199SXin Li            bid_erased = self.get_board_id()[1]
655*9c5db199SXin Li            eraseflashinfo_issue = 'Busy' in rv or 'do_flash_op' in rv
656*9c5db199SXin Li            if not eraseflashinfo_issue and bid_erased:
657*9c5db199SXin Li                break
658*9c5db199SXin Li            logging.info('Retrying eraseflashinfo')
659*9c5db199SXin Li        return bid_erased
660*9c5db199SXin Li
661*9c5db199SXin Li
662*9c5db199SXin Li    def rollback(self):
663*9c5db199SXin Li        """Set the reset counter high enough to force a rollback and reboot."""
664*9c5db199SXin Li        if not self.has_command('rollback'):
665*9c5db199SXin Li            raise error.TestError("need image with 'rollback'")
666*9c5db199SXin Li
667*9c5db199SXin Li        inactive_partition = self.get_inactive_version_info()[0]
668*9c5db199SXin Li
669*9c5db199SXin Li        self.wait_for_reboot(cmd='rollback', timeout=10)
670*9c5db199SXin Li
671*9c5db199SXin Li        running_partition = self.get_active_version_info()[0]
672*9c5db199SXin Li        if inactive_partition != running_partition:
673*9c5db199SXin Li            raise error.TestError("Failed to rollback to inactive image")
674*9c5db199SXin Li
675*9c5db199SXin Li
676*9c5db199SXin Li    def rolledback(self):
677*9c5db199SXin Li        """Returns true if cr50 just rolled back"""
678*9c5db199SXin Li        return 'Rollback detected' in self.send_command_retry_get_output(
679*9c5db199SXin Li                'sysinfo', ['sysinfo.*>'], safe=True)[0]
680*9c5db199SXin Li
681*9c5db199SXin Li
682*9c5db199SXin Li    def get_version_info(self, regexp):
683*9c5db199SXin Li        """Get information from the version command"""
684*9c5db199SXin Li        return self.send_command_retry_get_output('version', [regexp],
685*9c5db199SXin Li                                                  safe=True,
686*9c5db199SXin Li                                                  compare_output=True)[0][1::]
687*9c5db199SXin Li
688*9c5db199SXin Li
689*9c5db199SXin Li    def get_inactive_version_info(self):
690*9c5db199SXin Li        """Get the active partition, version, and hash"""
691*9c5db199SXin Li        return self.get_version_info(self.INACTIVE_VERSION)
692*9c5db199SXin Li
693*9c5db199SXin Li
694*9c5db199SXin Li    def get_active_version_info(self):
695*9c5db199SXin Li        """Get the active partition, version, and hash"""
696*9c5db199SXin Li        return self.get_version_info(self.ACTIVE_VERSION)
697*9c5db199SXin Li
698*9c5db199SXin Li
699*9c5db199SXin Li    def using_prod_rw_keys(self):
700*9c5db199SXin Li        """Returns True if the RW keyid is prod"""
701*9c5db199SXin Li        rv = self.send_command_retry_get_output('sysinfo',
702*9c5db199SXin Li                ['RW keyid:\s+(0x[0-9a-f]{8})'], safe=True)[0][1]
703*9c5db199SXin Li        logging.info('RW Keyid: 0x%s', rv)
704*9c5db199SXin Li        return rv in self.PROD_RW_KEYIDS
705*9c5db199SXin Li
706*9c5db199SXin Li
707*9c5db199SXin Li    def get_active_board_id_str(self):
708*9c5db199SXin Li        """Get the running image board id.
709*9c5db199SXin Li
710*9c5db199SXin Li        @return: The board id string or None if the image does not support board
711*9c5db199SXin Li                 id or the image is not board id locked.
712*9c5db199SXin Li        """
713*9c5db199SXin Li        # Getting the board id from the version console command is only
714*9c5db199SXin Li        # supported in board id locked images .22 and above. Any image that is
715*9c5db199SXin Li        # board id locked will have support for getting the image board id.
716*9c5db199SXin Li        #
717*9c5db199SXin Li        # If board id is not supported on the device, return None. This is
718*9c5db199SXin Li        # still expected on all current non board id locked release images.
719*9c5db199SXin Li        try:
720*9c5db199SXin Li            version_info = self.get_version_info(self.ACTIVE_BID)
721*9c5db199SXin Li        except error.TestFail as e:
722*9c5db199SXin Li            logging.info(str(e))
723*9c5db199SXin Li            logging.info('Cannot use the version to get the board id')
724*9c5db199SXin Li            return None
725*9c5db199SXin Li
726*9c5db199SXin Li        if self.BID_ERROR in version_info[4]:
727*9c5db199SXin Li            raise error.TestError(version_info)
728*9c5db199SXin Li        bid = version_info[4].split()[1]
729*9c5db199SXin Li        return cr50_utils.GetBoardIdInfoString(bid)
730*9c5db199SXin Li
731*9c5db199SXin Li
732*9c5db199SXin Li    def get_version(self):
733*9c5db199SXin Li        """Get the RW version"""
734*9c5db199SXin Li        return self.get_active_version_info()[1].strip()
735*9c5db199SXin Li
736*9c5db199SXin Li
737*9c5db199SXin Li    def get_full_version(self):
738*9c5db199SXin Li        """Get the complete RW version string."""
739*9c5db199SXin Li        _, rw_ver, dbg, ver_str = self.get_active_version_info()
740*9c5db199SXin Li        return  rw_ver + (dbg if dbg else '') + ver_str
741*9c5db199SXin Li
742*9c5db199SXin Li
743*9c5db199SXin Li    def ccd_is_enabled(self):
744*9c5db199SXin Li        """Return True if ccd is enabled.
745*9c5db199SXin Li
746*9c5db199SXin Li        If the test is running through ccd, return the ccd_state value. If
747*9c5db199SXin Li        a flex cable is being used, use the CCD_MODE_L gpio setting to determine
748*9c5db199SXin Li        if Cr50 has ccd enabled.
749*9c5db199SXin Li
750*9c5db199SXin Li        @return: 'off' or 'on' based on whether the cr50 console is working.
751*9c5db199SXin Li        """
752*9c5db199SXin Li        if self._servo.main_device_is_ccd():
753*9c5db199SXin Li            return self._servo.get('ccd_state') == 'on'
754*9c5db199SXin Li        else:
755*9c5db199SXin Li            return not bool(self.gpioget('CCD_MODE_L'))
756*9c5db199SXin Li
757*9c5db199SXin Li
758*9c5db199SXin Li    @dts_control_command
759*9c5db199SXin Li    def wait_for_stable_ccd_state(self, state, timeout, raise_error):
760*9c5db199SXin Li        """Wait up to timeout seconds for CCD to be 'on' or 'off'
761*9c5db199SXin Li
762*9c5db199SXin Li        Verify ccd is off or on and remains in that state for 3 seconds.
763*9c5db199SXin Li
764*9c5db199SXin Li        @param state: a string either 'on' or 'off'.
765*9c5db199SXin Li        @param timeout: time in seconds to wait
766*9c5db199SXin Li        @param raise_error: Raise TestFail if the value is state is not reached.
767*9c5db199SXin Li        @raise TestFail: if ccd never reaches the specified state
768*9c5db199SXin Li        """
769*9c5db199SXin Li        wait_for_enable = state == 'on'
770*9c5db199SXin Li        logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off')
771*9c5db199SXin Li        enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
772*9c5db199SXin Li                                       timeout_sec=timeout)
773*9c5db199SXin Li        if enabled != wait_for_enable:
774*9c5db199SXin Li            error_msg = ("timed out before detecting ccd '%s'" %
775*9c5db199SXin Li                         ('on' if wait_for_enable else 'off'))
776*9c5db199SXin Li            if raise_error:
777*9c5db199SXin Li                raise error.TestFail(error_msg)
778*9c5db199SXin Li            logging.warning(error_msg)
779*9c5db199SXin Li        else:
780*9c5db199SXin Li            # Make sure the state doesn't change.
781*9c5db199SXin Li            enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled,
782*9c5db199SXin Li                                           timeout_sec=self.SHORT_WAIT)
783*9c5db199SXin Li            if enabled != wait_for_enable:
784*9c5db199SXin Li                error_msg = ("CCD switched %r after briefly being %r" %
785*9c5db199SXin Li                             ('on' if enabled else 'off', state))
786*9c5db199SXin Li                if raise_error:
787*9c5db199SXin Li                    raise error.TestFail(error_msg)
788*9c5db199SXin Li                logging.info(error_msg)
789*9c5db199SXin Li        logging.info("ccd is %r", 'on' if enabled else 'off')
790*9c5db199SXin Li
791*9c5db199SXin Li
792*9c5db199SXin Li    @dts_control_command
793*9c5db199SXin Li    def wait_for_ccd_disable(self, timeout=60, raise_error=True):
794*9c5db199SXin Li        """Wait for the cr50 console to stop working"""
795*9c5db199SXin Li        self.wait_for_stable_ccd_state('off', timeout, raise_error)
796*9c5db199SXin Li
797*9c5db199SXin Li
798*9c5db199SXin Li    @dts_control_command
799*9c5db199SXin Li    def wait_for_ccd_enable(self, timeout=60, raise_error=False):
800*9c5db199SXin Li        """Wait for the cr50 console to start working"""
801*9c5db199SXin Li        self.wait_for_stable_ccd_state('on', timeout, raise_error)
802*9c5db199SXin Li
803*9c5db199SXin Li
804*9c5db199SXin Li    @dts_control_command
805*9c5db199SXin Li    def ccd_disable(self, raise_error=True):
806*9c5db199SXin Li        """Change the values of the CC lines to disable CCD"""
807*9c5db199SXin Li        logging.info("disable ccd")
808*9c5db199SXin Li        self._servo.set_dts_mode('off')
809*9c5db199SXin Li        self.wait_for_ccd_disable(raise_error=raise_error)
810*9c5db199SXin Li
811*9c5db199SXin Li
812*9c5db199SXin Li    @dts_control_command
813*9c5db199SXin Li    def ccd_enable(self, raise_error=False):
814*9c5db199SXin Li        """Reenable CCD and reset servo interfaces"""
815*9c5db199SXin Li        logging.info("reenable ccd")
816*9c5db199SXin Li        self._servo.set_dts_mode('on')
817*9c5db199SXin Li        # If the test is actually running with ccd, wait for USB communication
818*9c5db199SXin Li        # to come up after reset.
819*9c5db199SXin Li        if self._servo.main_device_is_ccd():
820*9c5db199SXin Li            time.sleep(self._servo.USB_DETECTION_DELAY)
821*9c5db199SXin Li        self.wait_for_ccd_enable(raise_error=raise_error)
822*9c5db199SXin Li
823*9c5db199SXin Li
824*9c5db199SXin Li    def _level_change_req_pp(self, level):
825*9c5db199SXin Li        """Returns True if setting the level will require physical presence"""
826*9c5db199SXin Li        testlab_pp = level != 'testlab open' and 'testlab' in level
827*9c5db199SXin Li        # If the level is open and the ccd capabilities say physical presence
828*9c5db199SXin Li        # is required, then physical presence will be required.
829*9c5db199SXin Li        open_pp = (level == 'open' and
830*9c5db199SXin Li                   not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE])
831*9c5db199SXin Li        return testlab_pp or open_pp
832*9c5db199SXin Li
833*9c5db199SXin Li
834*9c5db199SXin Li    def _state_to_bool(self, state):
835*9c5db199SXin Li        """Converts the state string to True or False"""
836*9c5db199SXin Li        # TODO(mruthven): compare to 'on' once servo is up to date in the lab
837*9c5db199SXin Li        return state.lower() in self.ON_STRINGS
838*9c5db199SXin Li
839*9c5db199SXin Li
840*9c5db199SXin Li    def testlab_is_on(self):
841*9c5db199SXin Li        """Returns True of testlab mode is on"""
842*9c5db199SXin Li        return self._state_to_bool(self._servo.get('cr50_testlab'))
843*9c5db199SXin Li
844*9c5db199SXin Li
845*9c5db199SXin Li    def set_ccd_testlab(self, state):
846*9c5db199SXin Li        """Set the testlab mode
847*9c5db199SXin Li
848*9c5db199SXin Li        @param state: the desired testlab mode string: 'on' or 'off'
849*9c5db199SXin Li        @raise TestFail: if testlab mode was not changed
850*9c5db199SXin Li        """
851*9c5db199SXin Li        if self._servo.main_device_is_ccd():
852*9c5db199SXin Li            raise error.TestError('Cannot set testlab mode with CCD. Use flex '
853*9c5db199SXin Li                    'cable instead.')
854*9c5db199SXin Li        if not self.faft_config.has_powerbutton:
855*9c5db199SXin Li            raise error.TestError('No power button on device')
856*9c5db199SXin Li
857*9c5db199SXin Li        request_on = self._state_to_bool(state)
858*9c5db199SXin Li        testlab_on = self.testlab_is_on()
859*9c5db199SXin Li        request_str = 'on' if request_on else 'off'
860*9c5db199SXin Li
861*9c5db199SXin Li        if testlab_on == request_on:
862*9c5db199SXin Li            logging.info('ccd testlab already set to %s', request_str)
863*9c5db199SXin Li            return
864*9c5db199SXin Li
865*9c5db199SXin Li        original_level = self.get_ccd_level()
866*9c5db199SXin Li
867*9c5db199SXin Li        # We can only change the testlab mode when the device is open. If
868*9c5db199SXin Li        # testlab mode is already enabled, we can go directly to open using 'ccd
869*9c5db199SXin Li        # testlab open'. This will save 5 minutes, because we can skip the
870*9c5db199SXin Li        # physical presence check.
871*9c5db199SXin Li        if testlab_on:
872*9c5db199SXin Li            self.send_command('ccd testlab open')
873*9c5db199SXin Li        else:
874*9c5db199SXin Li            self.set_ccd_level('open')
875*9c5db199SXin Li
876*9c5db199SXin Li        ap_is_on = self.ap_is_on()
877*9c5db199SXin Li        # Set testlab mode
878*9c5db199SXin Li        rv = self.send_command_get_output('ccd testlab %s' % request_str,
879*9c5db199SXin Li                ['ccd.*>'])[0]
880*9c5db199SXin Li        if 'Access Denied' in rv:
881*9c5db199SXin Li            raise error.TestFail("'ccd %s' %s" % (request_str, rv))
882*9c5db199SXin Li
883*9c5db199SXin Li        # Press the power button once a second for 15 seconds. If the AP is
884*9c5db199SXin Li        # currently on, make sure it's on at the end of the open process.
885*9c5db199SXin Li        self.run_pp(self.PP_SHORT, ensure_ap_on=ap_is_on)
886*9c5db199SXin Li
887*9c5db199SXin Li        self.set_ccd_level(original_level)
888*9c5db199SXin Li        if request_on != self.testlab_is_on():
889*9c5db199SXin Li            raise error.TestFail('Failed to set ccd testlab to %s' % state)
890*9c5db199SXin Li
891*9c5db199SXin Li
892*9c5db199SXin Li    def get_ccd_level(self):
893*9c5db199SXin Li        """Returns the current ccd privilege level"""
894*9c5db199SXin Li        return self.get_ccd_info('State').lower().rstrip('ed')
895*9c5db199SXin Li
896*9c5db199SXin Li
897*9c5db199SXin Li    def set_ccd_level(self, level, password=''):
898*9c5db199SXin Li        """Set the Cr50 CCD privilege level.
899*9c5db199SXin Li
900*9c5db199SXin Li        @param level: a string of the ccd privilege level: 'open', 'lock', or
901*9c5db199SXin Li                      'unlock'.
902*9c5db199SXin Li        @param password: send the ccd command with password. This will still
903*9c5db199SXin Li                         require the same physical presence.
904*9c5db199SXin Li        @raise TestFail: if the level couldn't be set
905*9c5db199SXin Li        """
906*9c5db199SXin Li        # TODO(mruthven): add support for CCD password
907*9c5db199SXin Li        level = level.lower()
908*9c5db199SXin Li
909*9c5db199SXin Li        if level == self.get_ccd_level():
910*9c5db199SXin Li            logging.info('CCD privilege level is already %s', level)
911*9c5db199SXin Li            return
912*9c5db199SXin Li
913*9c5db199SXin Li        if 'testlab' in level:
914*9c5db199SXin Li            raise error.TestError("Can't change testlab mode using "
915*9c5db199SXin Li                "ccd_set_level")
916*9c5db199SXin Li
917*9c5db199SXin Li        testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
918*9c5db199SXin Li        batt_is_disconnected = self.get_batt_pres_state()[1]
919*9c5db199SXin Li        req_pp = self._level_change_req_pp(level)
920*9c5db199SXin Li        has_pp = not self._servo.main_device_is_ccd()
921*9c5db199SXin Li        dbg_en = self.get_active_version_info()[2]
922*9c5db199SXin Li
923*9c5db199SXin Li        if req_pp and not has_pp:
924*9c5db199SXin Li            raise error.TestError("Can't change privilege level to '%s' "
925*9c5db199SXin Li                "without physical presence." % level)
926*9c5db199SXin Li
927*9c5db199SXin Li        if not testlab_on and not has_pp:
928*9c5db199SXin Li            raise error.TestError("Wont change privilege level without "
929*9c5db199SXin Li                "physical presence or testlab mode enabled")
930*9c5db199SXin Li
931*9c5db199SXin Li        original_timeout = float(self._servo.get('cr50_uart_timeout'))
932*9c5db199SXin Li        # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may
933*9c5db199SXin Li        # take more than 3 seconds.
934*9c5db199SXin Li        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
935*9c5db199SXin Li        # Start the unlock process.
936*9c5db199SXin Li
937*9c5db199SXin Li        if level == 'open' or level == 'unlock':
938*9c5db199SXin Li            logging.info('waiting %d seconds, the minimum time between'
939*9c5db199SXin Li                         ' ccd password attempts',
940*9c5db199SXin Li                         self.CCD_PASSWORD_RATE_LIMIT)
941*9c5db199SXin Li            time.sleep(self.CCD_PASSWORD_RATE_LIMIT)
942*9c5db199SXin Li
943*9c5db199SXin Li        ap_is_on = self.ap_is_on()
944*9c5db199SXin Li        try:
945*9c5db199SXin Li            cmd = 'ccd %s%s' % (level, (' ' + password) if password else '')
946*9c5db199SXin Li            # ccd command outputs on the rbox, ccd, and console channels,
947*9c5db199SXin Li            # respectively. Cr50 uses these channels to print relevant ccd
948*9c5db199SXin Li            # information.
949*9c5db199SXin Li            # Restrict all other channels.
950*9c5db199SXin Li            ccd_output_channels = 0x20000 | 0x8 | 0x1
951*9c5db199SXin Li            rv = self.send_safe_command_get_output(
952*9c5db199SXin Li                    cmd, [cmd + '(.*)>'],
953*9c5db199SXin Li                    channel_mask=ccd_output_channels)[0][1]
954*9c5db199SXin Li        finally:
955*9c5db199SXin Li            self._servo.set('cr50_uart_timeout', original_timeout)
956*9c5db199SXin Li        logging.info(rv)
957*9c5db199SXin Li        if 'ccd_open denied: fwmp' in rv:
958*9c5db199SXin Li            raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv))
959*9c5db199SXin Li        if 'Access Denied' in rv:
960*9c5db199SXin Li            raise error.TestFail("%r %s" % (cmd, rv))
961*9c5db199SXin Li        if 'Busy' in rv:
962*9c5db199SXin Li            raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv))
963*9c5db199SXin Li
964*9c5db199SXin Li        # Press the power button once a second, if we need physical presence.
965*9c5db199SXin Li        if req_pp and batt_is_disconnected:
966*9c5db199SXin Li            # DBG images have shorter unlock processes. If the AP is currently
967*9c5db199SXin Li            # on, make sure it's on at the end of the open process.
968*9c5db199SXin Li            self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG,
969*9c5db199SXin Li                        ensure_ap_on=ap_is_on)
970*9c5db199SXin Li
971*9c5db199SXin Li        if level != self.get_ccd_level():
972*9c5db199SXin Li            self.check_for_console_errors('Running console ccd %s' % level)
973*9c5db199SXin Li            raise error.TestFail('Could not set privilege level to %s' % level)
974*9c5db199SXin Li
975*9c5db199SXin Li        logging.info('Successfully set CCD privelege level to %s', level)
976*9c5db199SXin Li
977*9c5db199SXin Li
978*9c5db199SXin Li    def run_pp(self, unlock_timeout, ensure_ap_on=False):
979*9c5db199SXin Li        """Press the power button a for unlock_timeout seconds.
980*9c5db199SXin Li
981*9c5db199SXin Li        This will press the power button many more times than it needs to be
982*9c5db199SXin Li        pressed. Cr50 doesn't care if you press it too often. It just cares that
983*9c5db199SXin Li        you press the power button at least once within the detect interval.
984*9c5db199SXin Li
985*9c5db199SXin Li        For privilege level changes you need to press the power button 5 times
986*9c5db199SXin Li        in the short interval and then 4 times within the long interval.
987*9c5db199SXin Li        Short Interval
988*9c5db199SXin Li        100msec < power button press < 5 seconds
989*9c5db199SXin Li        Long Interval
990*9c5db199SXin Li        60s < power button press < 300s
991*9c5db199SXin Li
992*9c5db199SXin Li        For testlab enable/disable you must press the power button 5 times
993*9c5db199SXin Li        spaced between 100msec and 5 seconds apart.
994*9c5db199SXin Li
995*9c5db199SXin Li        @param unlock_timeout: time to press the power button in seconds.
996*9c5db199SXin Li        @param ensure_ap_on: If true, press the power to turn on the AP.
997*9c5db199SXin Li        """
998*9c5db199SXin Li        end_time = time.time() + unlock_timeout
999*9c5db199SXin Li
1000*9c5db199SXin Li        logging.info('Pressing power button for %ds to unlock the console.',
1001*9c5db199SXin Li                     unlock_timeout)
1002*9c5db199SXin Li        logging.info('The process should end at %s', time.ctime(end_time))
1003*9c5db199SXin Li
1004*9c5db199SXin Li        # Press the power button once a second to unlock the console.
1005*9c5db199SXin Li        while time.time() < end_time:
1006*9c5db199SXin Li            self._servo.power_short_press()
1007*9c5db199SXin Li            time.sleep(1)
1008*9c5db199SXin Li
1009*9c5db199SXin Li        # If the last power button press left the AP powered off, and it was on
1010*9c5db199SXin Li        # before, turn it back on.
1011*9c5db199SXin Li        time.sleep(self.faft_config.shutdown)
1012*9c5db199SXin Li        if ensure_ap_on and not self.ap_is_on():
1013*9c5db199SXin Li            logging.info('AP is off. Pressing the power button to turn it on')
1014*9c5db199SXin Li            self._servo.power_short_press()
1015*9c5db199SXin Li            logging.debug('Pressing PP to turn back on')
1016*9c5db199SXin Li
1017*9c5db199SXin Li
1018*9c5db199SXin Li    def gettime(self):
1019*9c5db199SXin Li        """Get the current cr50 system time"""
1020*9c5db199SXin Li        result = self.send_safe_command_get_output('gettime', [' = (.*) s'])
1021*9c5db199SXin Li        return float(result[0][1])
1022*9c5db199SXin Li
1023*9c5db199SXin Li
1024*9c5db199SXin Li    def servo_dts_mode_is_valid(self):
1025*9c5db199SXin Li        """Returns True if cr50 registers change in servo dts mode."""
1026*9c5db199SXin Li        # This is to test that Cr50 actually recognizes the change in ccd state
1027*9c5db199SXin Li        # We cant do that with tests using ccd, because the cr50 communication
1028*9c5db199SXin Li        # goes down once ccd is enabled.
1029*9c5db199SXin Li        if not self._servo.dts_mode_is_safe():
1030*9c5db199SXin Li            return False
1031*9c5db199SXin Li
1032*9c5db199SXin Li        ccd_start = 'on' if self.ccd_is_enabled() else 'off'
1033*9c5db199SXin Li        dts_start = self._servo.get_dts_mode()
1034*9c5db199SXin Li        try:
1035*9c5db199SXin Li            # Verify both ccd enable and disable
1036*9c5db199SXin Li            self.ccd_disable(raise_error=True)
1037*9c5db199SXin Li            self.ccd_enable(raise_error=True)
1038*9c5db199SXin Li            rv = True
1039*9c5db199SXin Li        except Exception as e:
1040*9c5db199SXin Li            logging.info(e)
1041*9c5db199SXin Li            rv = False
1042*9c5db199SXin Li        self._servo.set_dts_mode(dts_start)
1043*9c5db199SXin Li        self.wait_for_stable_ccd_state(ccd_start, 60, True)
1044*9c5db199SXin Li        logging.info('Test setup does%s support servo DTS mode',
1045*9c5db199SXin Li                '' if rv else 'n\'t')
1046*9c5db199SXin Li        return rv
1047*9c5db199SXin Li
1048*9c5db199SXin Li
1049*9c5db199SXin Li    def wait_until_update_is_allowed(self):
1050*9c5db199SXin Li        """Wait until cr50 will be able to accept an update.
1051*9c5db199SXin Li
1052*9c5db199SXin Li        Cr50 rejects any attempt to update if it has been less than 60 seconds
1053*9c5db199SXin Li        since it last recovered from deep sleep or came up from reboot. This
1054*9c5db199SXin Li        will wait until cr50 gettime shows a time greater than 60.
1055*9c5db199SXin Li        """
1056*9c5db199SXin Li        if self.get_active_version_info()[2]:
1057*9c5db199SXin Li            logging.info("Running DBG image. Don't need to wait for update.")
1058*9c5db199SXin Li            return
1059*9c5db199SXin Li        cr50_time = self.gettime()
1060*9c5db199SXin Li        if cr50_time < 60:
1061*9c5db199SXin Li            sleep_time = 61 - cr50_time
1062*9c5db199SXin Li            logging.info('Cr50 has been up for %ds waiting %ds before update',
1063*9c5db199SXin Li                         cr50_time, sleep_time)
1064*9c5db199SXin Li            time.sleep(sleep_time)
1065*9c5db199SXin Li
1066*9c5db199SXin Li
1067*9c5db199SXin Li    def tpm_is_enabled(self):
1068*9c5db199SXin Li        """Query the current TPM mode.
1069*9c5db199SXin Li
1070*9c5db199SXin Li        @return:  True if TPM is enabled, False otherwise.
1071*9c5db199SXin Li        """
1072*9c5db199SXin Li        result = self.send_command_retry_get_output('sysinfo',
1073*9c5db199SXin Li                ['(?i)TPM\s+MODE:\s+(enabled|disabled)'], safe=True)[0][1]
1074*9c5db199SXin Li        logging.debug(result)
1075*9c5db199SXin Li
1076*9c5db199SXin Li        return result.lower() == 'enabled'
1077*9c5db199SXin Li
1078*9c5db199SXin Li
1079*9c5db199SXin Li    def get_keyladder_state(self):
1080*9c5db199SXin Li        """Get the status of H1 Key Ladder.
1081*9c5db199SXin Li
1082*9c5db199SXin Li        @return: The keyladder state string. prod or dev both mean enabled.
1083*9c5db199SXin Li        """
1084*9c5db199SXin Li        result = self.send_command_retry_get_output('sysinfo',
1085*9c5db199SXin Li                ['(?i)Key\s+Ladder:\s+(enabled|prod|dev|disabled)'],
1086*9c5db199SXin Li                safe=True)[0][1]
1087*9c5db199SXin Li        logging.debug(result)
1088*9c5db199SXin Li        return result
1089*9c5db199SXin Li
1090*9c5db199SXin Li
1091*9c5db199SXin Li    def keyladder_is_disabled(self):
1092*9c5db199SXin Li        """Get the status of H1 Key Ladder.
1093*9c5db199SXin Li
1094*9c5db199SXin Li        @return: True if H1 Key Ladder is disabled. False otherwise.
1095*9c5db199SXin Li        """
1096*9c5db199SXin Li        return self.get_keyladder_state() == 'disabled'
1097*9c5db199SXin Li
1098*9c5db199SXin Li
1099*9c5db199SXin Li    def get_sleepmask(self):
1100*9c5db199SXin Li        """Returns the sleepmask as an int"""
1101*9c5db199SXin Li        rv = self.send_command_retry_get_output('sleepmask',
1102*9c5db199SXin Li                ['sleep mask: (\S{8})\s+'], safe=True)[0][1]
1103*9c5db199SXin Li        logging.info('sleepmask %s', rv)
1104*9c5db199SXin Li        return int(rv, 16)
1105*9c5db199SXin Li
1106*9c5db199SXin Li
1107*9c5db199SXin Li    def get_ccdstate(self):
1108*9c5db199SXin Li        """Return a dictionary of the ccdstate once it's done debouncing"""
1109*9c5db199SXin Li        for i in range(self.CCDSTATE_MAX_RETRY_COUNT):
1110*9c5db199SXin Li            rv = self.send_command_retry_get_output('ccdstate',
1111*9c5db199SXin Li                    ['ccdstate(.*)>'], safe=True, compare_output=True)[0][0]
1112*9c5db199SXin Li
1113*9c5db199SXin Li            # Look for a line like 'AP: on' or 'AP: off'. 'debouncing' or
1114*9c5db199SXin Li            # 'unknown' may appear transiently. 'debouncing' should transition
1115*9c5db199SXin Li            # to 'on' or 'off' within 1 second, and 'unknown' should do so
1116*9c5db199SXin Li            # within 20 seconds.
1117*9c5db199SXin Li            if 'debouncing' not in rv and 'unknown' not in rv:
1118*9c5db199SXin Li                break
1119*9c5db199SXin Li            time.sleep(self.SHORT_WAIT)
1120*9c5db199SXin Li        ccdstate = {}
1121*9c5db199SXin Li        for line in rv.splitlines():
1122*9c5db199SXin Li            line = line.strip()
1123*9c5db199SXin Li            if ':' in line:
1124*9c5db199SXin Li                k, v = line.split(':', 1)
1125*9c5db199SXin Li                k = k.strip()
1126*9c5db199SXin Li                v = v.strip()
1127*9c5db199SXin Li                if '(' in v:
1128*9c5db199SXin Li                    ccdstate[k + ' full'] = v
1129*9c5db199SXin Li                    v = v.split('(')[0].strip()
1130*9c5db199SXin Li                ccdstate[k] = v
1131*9c5db199SXin Li        logging.info('Current CCD state:\n%s', pprint.pformat(ccdstate))
1132*9c5db199SXin Li        return ccdstate
1133*9c5db199SXin Li
1134*9c5db199SXin Li
1135*9c5db199SXin Li    def ap_is_on(self):
1136*9c5db199SXin Li        """Get the power state of the AP.
1137*9c5db199SXin Li
1138*9c5db199SXin Li        @return: True if the AP is on; False otherwise.
1139*9c5db199SXin Li        """
1140*9c5db199SXin Li        ap_state = self.get_ccdstate()['AP']
1141*9c5db199SXin Li        if ap_state.lower() == 'on':
1142*9c5db199SXin Li            return True
1143*9c5db199SXin Li        elif ap_state.lower() == 'off':
1144*9c5db199SXin Li            return False
1145*9c5db199SXin Li        else:
1146*9c5db199SXin Li            raise error.TestFail('Read unusable AP state from ccdstate: %r' %
1147*9c5db199SXin Li                                 ap_state)
1148*9c5db199SXin Li
1149*9c5db199SXin Li
1150*9c5db199SXin Li    def gpioget(self, signal_name):
1151*9c5db199SXin Li        """Get the current state of the signal
1152*9c5db199SXin Li
1153*9c5db199SXin Li        @return an integer 1 or 0 based on the gpioget value
1154*9c5db199SXin Li        """
1155*9c5db199SXin Li        result = self.send_command_retry_get_output('gpioget',
1156*9c5db199SXin Li                    ['(0|1)[ \S]*%s' % signal_name], safe=True)
1157*9c5db199SXin Li        return int(result[0][1])
1158*9c5db199SXin Li
1159*9c5db199SXin Li
1160*9c5db199SXin Li    def batt_pres_is_reset(self):
1161*9c5db199SXin Li        """Returns True if batt pres is reset to always follow batt pres"""
1162*9c5db199SXin Li        follow_bp, _, follow_bp_atboot, _ = self.get_batt_pres_state()
1163*9c5db199SXin Li        return follow_bp and follow_bp_atboot
1164*9c5db199SXin Li
1165*9c5db199SXin Li
1166*9c5db199SXin Li    def get_batt_pres_state(self):
1167*9c5db199SXin Li        """Get the current and atboot battery presence state
1168*9c5db199SXin Li
1169*9c5db199SXin Li        The atboot setting cannot really be determined now if it is set to
1170*9c5db199SXin Li        follow battery presence. It is likely to remain the same after reboot,
1171*9c5db199SXin Li        but who knows. If the third element of the tuple is True, the last
1172*9c5db199SXin Li        element will not be that useful
1173*9c5db199SXin Li
1174*9c5db199SXin Li        @return: a tuple of the current battery presence state
1175*9c5db199SXin Li                 (True if current state is to follow batt presence,
1176*9c5db199SXin Li                  True if battery is connected,
1177*9c5db199SXin Li                  True if current state is to follow batt presence atboot,
1178*9c5db199SXin Li                  True if battery is connected atboot)
1179*9c5db199SXin Li        """
1180*9c5db199SXin Li        # bpforce is added in 4.16. If the image doesn't have the command, cr50
1181*9c5db199SXin Li        # always follows battery presence. In these images 'gpioget BATT_PRES_L'
1182*9c5db199SXin Li        # accurately represents the battery presence state, because it can't be
1183*9c5db199SXin Li        # overidden.
1184*9c5db199SXin Li        if not self.has_command('bpforce'):
1185*9c5db199SXin Li            batt_pres = not bool(self.gpioget('BATT_PRES_L'))
1186*9c5db199SXin Li            return (True, batt_pres, True, batt_pres)
1187*9c5db199SXin Li
1188*9c5db199SXin Li        # The bpforce command is very similar to the wp command. It just
1189*9c5db199SXin Li        # substitutes 'connected' for 'enabled' and 'disconnected' for
1190*9c5db199SXin Li        # 'disabled'.
1191*9c5db199SXin Li        rv = self.send_command_retry_get_output('bpforce',
1192*9c5db199SXin Li                ['batt pres: (forced )?(con|dis).*at boot: (forced )?'
1193*9c5db199SXin Li                 '(follow|discon|con)'], safe=True)[0]
1194*9c5db199SXin Li        _, forced, connected, _, atboot = rv
1195*9c5db199SXin Li        logging.info(rv)
1196*9c5db199SXin Li        return (not forced, connected == 'con', atboot == 'follow',
1197*9c5db199SXin Li                atboot == 'con')
1198*9c5db199SXin Li
1199*9c5db199SXin Li
1200*9c5db199SXin Li    def set_batt_pres_state(self, state, atboot):
1201*9c5db199SXin Li        """Override the battery presence state.
1202*9c5db199SXin Li
1203*9c5db199SXin Li        @param state: a string of the battery presence setting: 'connected',
1204*9c5db199SXin Li                  'disconnected', or 'follow_batt_pres'
1205*9c5db199SXin Li        @param atboot: True if we're overriding battery presence atboot
1206*9c5db199SXin Li        """
1207*9c5db199SXin Li        cmd = 'bpforce %s%s' % (state, ' atboot' if atboot else '')
1208*9c5db199SXin Li        logging.info('running %r', cmd)
1209*9c5db199SXin Li        self.send_command(cmd)
1210*9c5db199SXin Li
1211*9c5db199SXin Li
1212*9c5db199SXin Li    def dump_nvmem(self):
1213*9c5db199SXin Li        """Print nvmem objects."""
1214*9c5db199SXin Li        rv = self.send_safe_command_get_output('dump_nvmem',
1215*9c5db199SXin Li                                               ['dump_nvmem(.*)>'])[0][1]
1216*9c5db199SXin Li        logging.info('NVMEM OUTPUT:\n%s', rv)
1217*9c5db199SXin Li
1218*9c5db199SXin Li
1219*9c5db199SXin Li    def get_reset_cause(self):
1220*9c5db199SXin Li        """Returns the reset flags for the last reset."""
1221*9c5db199SXin Li        rv = self.send_command_retry_get_output('sysinfo',
1222*9c5db199SXin Li                ['Reset flags:\s+0x([0-9a-f]{8})\s'], compare_output=True)[0][1]
1223*9c5db199SXin Li        logging.info('reset cause: %s', rv)
1224*9c5db199SXin Li        return int(rv, 16)
1225*9c5db199SXin Li
1226*9c5db199SXin Li
1227*9c5db199SXin Li    def was_reset(self, reset_type):
1228*9c5db199SXin Li        """Returns 1 if the reset type is found in the reset_cause.
1229*9c5db199SXin Li
1230*9c5db199SXin Li        @param reset_type: reset name in string type.
1231*9c5db199SXin Li        """
1232*9c5db199SXin Li        reset_cause = self.get_reset_cause()
1233*9c5db199SXin Li        reset_flag = self.RESET_FLAGS[reset_type]
1234*9c5db199SXin Li        return bool(reset_cause & reset_flag)
1235*9c5db199SXin Li
1236*9c5db199SXin Li
1237*9c5db199SXin Li    def get_devid(self):
1238*9c5db199SXin Li        """Returns the cr50 serial number."""
1239*9c5db199SXin Li        return self.send_command_retry_get_output('sysinfo',
1240*9c5db199SXin Li                ['DEV_ID:\s+(0x[0-9a-f]{8} 0x[0-9a-f]{8})'])[0][1]
1241*9c5db199SXin Li
1242*9c5db199SXin Li
1243*9c5db199SXin Li    def get_serial(self):
1244*9c5db199SXin Li        """Returns the cr50 serial number."""
1245*9c5db199SXin Li        serial = self.get_devid().replace('0x', '').replace(' ', '-').upper()
1246*9c5db199SXin Li        logging.info('CCD serial: %s', serial)
1247*9c5db199SXin Li        return serial
1248*9c5db199SXin Li
1249*9c5db199SXin Li    def check_boot_mode(self, mode_exp='NORMAL'):
1250*9c5db199SXin Li        """Query the boot mode to Cr50, and compare it against mode_exp.
1251*9c5db199SXin Li
1252*9c5db199SXin Li        Args:
1253*9c5db199SXin Li            mode_exp: expecting boot mode. It should be either 'NORMAL'
1254*9c5db199SXin Li                      or 'NO_BOOT'.
1255*9c5db199SXin Li        Returns:
1256*9c5db199SXin Li            True if the boot mode matches mode_exp.
1257*9c5db199SXin Li            False, otherwise.
1258*9c5db199SXin Li        Raises:
1259*9c5db199SXin Li            TestError: Input parameter is not valid.
1260*9c5db199SXin Li        """
1261*9c5db199SXin Li
1262*9c5db199SXin Li        if mode_exp not in ['NORMAL', 'NO_BOOT']:
1263*9c5db199SXin Li            raise error.TestError('parameter, mode_exp is not valid: %s' %
1264*9c5db199SXin Li                                  mode_exp)
1265*9c5db199SXin Li        rv = self.send_command_retry_get_output('ec_comm',
1266*9c5db199SXin Li                ['boot_mode\s*:\s*(NORMAL|NO_BOOT)'], safe=True)
1267*9c5db199SXin Li        return mode_exp == rv[0][1]
1268*9c5db199SXin Li
1269*9c5db199SXin Li    def get_reset_count(self):
1270*9c5db199SXin Li        """Returns the cr50 reset count"""
1271*9c5db199SXin Li        return self.send_command_retry_get_output('sysinfo',
1272*9c5db199SXin Li                                                  ['Reset count: (\d+)'],
1273*9c5db199SXin Li                                                  safe=True)[0][1]
1274*9c5db199SXin Li
1275*9c5db199SXin Li    def check_servo_monitor(self):
1276*9c5db199SXin Li        """Returns True if cr50 can detect servo connect/disconnect"""
1277*9c5db199SXin Li        orig_dts = self._servo.get('servo_dts_mode')
1278*9c5db199SXin Li        # Detach ccd so EC uart won't interfere with servo detection
1279*9c5db199SXin Li        self._servo.set_dts_mode('off')
1280*9c5db199SXin Li        self._servo.set('ec_uart_en', 'off')
1281*9c5db199SXin Li        time.sleep(self.SHORT_WAIT)
1282*9c5db199SXin Li        if self.get_ccdstate()['Servo'] != 'disconnected':
1283*9c5db199SXin Li            self._servo.set_dts_mode(orig_dts)
1284*9c5db199SXin Li            return False
1285*9c5db199SXin Li
1286*9c5db199SXin Li        self._servo.set('ec_uart_en', 'on')
1287*9c5db199SXin Li        time.sleep(self.SHORT_WAIT)
1288*9c5db199SXin Li        if self.get_ccdstate()['Servo'] != 'connected':
1289*9c5db199SXin Li            self._servo.set_dts_mode(orig_dts)
1290*9c5db199SXin Li            return False
1291*9c5db199SXin Li        self._servo.set_dts_mode(orig_dts)
1292*9c5db199SXin Li        return True
1293*9c5db199SXin Li
1294*9c5db199SXin Li    def fips_crypto_allowed(self):
1295*9c5db199SXin Li        """Return 1 if fips crypto is enabled."""
1296*9c5db199SXin Li        if not self.has_command('fips'):
1297*9c5db199SXin Li            return 0
1298*9c5db199SXin Li
1299*9c5db199SXin Li        rv = self.send_command_retry_get_output('fips', [self.FIPS_RE])
1300*9c5db199SXin Li        logging.info('FIPS: %r', rv)
1301*9c5db199SXin Li        _, approved, allowed = rv[0]
1302*9c5db199SXin Li        if int(approved == '') != int(allowed):
1303*9c5db199SXin Li            raise error.TestFail('Approved does not match allowed %r' % rv)
1304*9c5db199SXin Li        return int(allowed)
1305*9c5db199SXin Li
1306*9c5db199SXin Li    def unlock_is_supported(self):
1307*9c5db199SXin Li        """Returns True if GSC supports the ccd unlock state."""
1308*9c5db199SXin Li        return True
1309*9c5db199SXin Li
1310*9c5db199SXin Li    def cap_is_always_on(self, cap):
1311*9c5db199SXin Li        """Returns True if the capability is set to Always"""
1312*9c5db199SXin Li        rv = self.send_command_retry_get_output('ccd',
1313*9c5db199SXin Li                                                [cap + self.CAP_FORMAT])[0]
1314*9c5db199SXin Li        # The third field could be Default or "Always". If it's Default,
1315*9c5db199SXin Li        # "Always" must show up in the third field.
1316*9c5db199SXin Li        return self.CAP_ALWAYS in rv[2] or self.CAP_ALWAYS in rv[3]
1317*9c5db199SXin Li
1318*9c5db199SXin Li    def servo_drv_enabled(self):
1319*9c5db199SXin Li        """Check if the caps  are accessible on boards wigh gsc controls."""
1320*9c5db199SXin Li        if not self._servo.main_device_uses_gsc_drv():
1321*9c5db199SXin Li            return True
1322*9c5db199SXin Li        for cap in self.SERVO_DRV_CAPS:
1323*9c5db199SXin Li            # If any capability isn't accessible, return False.
1324*9c5db199SXin Li            if not self.cap_is_always_on(cap):
1325*9c5db199SXin Li                return False
1326*9c5db199SXin Li        return True
1327*9c5db199SXin Li
1328*9c5db199SXin Li    def enable_servo_control_caps(self):
1329*9c5db199SXin Li        """Set all servo control capabilities to Always."""
1330*9c5db199SXin Li        # Nothing do do if servo doesn't use gsc for any controls.
1331*9c5db199SXin Li        if not self._servo.main_device_uses_gsc_drv():
1332*9c5db199SXin Li            return
1333*9c5db199SXin Li        logging.info('Setting servo caps to Always')
1334*9c5db199SXin Li        self.send_command('ccd testlab open')
1335*9c5db199SXin Li        for cap in self.SERVO_DRV_CAPS:
1336*9c5db199SXin Li            self.send_command('ccd set %s Always' % cap)
1337*9c5db199SXin Li        return self.servo_drv_enabled()
1338*9c5db199SXin Li
1339*9c5db199SXin Li    def ccd_reset_factory(self):
1340*9c5db199SXin Li        """Enable factory mode."""
1341*9c5db199SXin Li        self.send_command('ccd reset factory')
1342*9c5db199SXin Li
1343*9c5db199SXin Li    def ccd_reset(self, servo_en=True):
1344*9c5db199SXin Li        """Reset ccd capabilities."""
1345*9c5db199SXin Li        servo_uses_gsc = self._servo.main_device_uses_gsc_drv()
1346*9c5db199SXin Li        # If testlab mode is enabled, capabilities can be restored. It's
1347*9c5db199SXin Li        # ok to reset ccd.
1348*9c5db199SXin Li        if not servo_en and servo_uses_gsc and not self.testlab_is_on():
1349*9c5db199SXin Li            raise error.TestError(
1350*9c5db199SXin Li                    'Board uses ccd drivers. Enable testlab mode '
1351*9c5db199SXin Li                    'before ccd reset')
1352*9c5db199SXin Li        self.send_command('ccd reset')
1353*9c5db199SXin Li        if servo_en:
1354*9c5db199SXin Li            self.enable_servo_control_caps()
1355*9c5db199SXin Li
1356*9c5db199SXin Li    def check_for_console_errors(self, desc):
1357*9c5db199SXin Li        """Check cr50 uart output for errors.
1358*9c5db199SXin Li
1359*9c5db199SXin Li        Use the logs captured during firmware_test cleanup to check for cr50
1360*9c5db199SXin Li        errors. Flash operation issues aren't obvious unless you check the logs.
1361*9c5db199SXin Li        All flash op errors print do_flash_op and it isn't printed during normal
1362*9c5db199SXin Li        operation. Open the cr50 uart file and count the number of times this is
1363*9c5db199SXin Li        printed. Log the number of errors.
1364*9c5db199SXin Li        """
1365*9c5db199SXin Li        self._servo.record_uart_capture()
1366*9c5db199SXin Li        cr50_uart_file = self._servo.get_uart_logfile('cr50')
1367*9c5db199SXin Li        if not cr50_uart_file:
1368*9c5db199SXin Li            logging.info('There is not a cr50 uart file')
1369*9c5db199SXin Li            return
1370*9c5db199SXin Li
1371*9c5db199SXin Li        flash_error_count = 0
1372*9c5db199SXin Li        usb_error_count = 0
1373*9c5db199SXin Li        watchdog_count = 0
1374*9c5db199SXin Li        with open(cr50_uart_file, 'r') as f:
1375*9c5db199SXin Li            for line in f:
1376*9c5db199SXin Li                if self.FLASH_OP_ERROR_MSG in line:
1377*9c5db199SXin Li                    flash_error_count += 1
1378*9c5db199SXin Li                if self.USB_ERROR in line:
1379*9c5db199SXin Li                    usb_error_count += 1
1380*9c5db199SXin Li                if self.WATCHDOG_RST in line:
1381*9c5db199SXin Li                    watchdog_count += 1
1382*9c5db199SXin Li
1383*9c5db199SXin Li        # Log any flash operation errors.
1384*9c5db199SXin Li        logging.info('do_flash_op count: %d', flash_error_count)
1385*9c5db199SXin Li        logging.info('usb error count: %d', usb_error_count)
1386*9c5db199SXin Li        logging.info('watchdog count: %d', watchdog_count)
1387*9c5db199SXin Li        if watchdog_count:
1388*9c5db199SXin Li            raise error.TestFail('Found %r %d times in logs after %s' %
1389*9c5db199SXin Li                                 (self.WATCHDOG_RST, watchdog_count, desc))
1390*9c5db199SXin Li
1391*9c5db199SXin Li    def ap_ro_version_is_supported(self, version):
1392*9c5db199SXin Li        """Returns True if GSC supports the given version."""
1393*9c5db199SXin Li        return version in self.AP_RO_VERSIONS
1394*9c5db199SXin Li
1395*9c5db199SXin Li    def ap_ro_supported(self):
1396*9c5db199SXin Li        """Returns True if the hash is saved and AP RO is supported."""
1397*9c5db199SXin Li        return self.send_command_retry_get_output(
1398*9c5db199SXin Li                'ap_ro_info', [self.AP_RO_SUPPORTED_RE])[0][2] == 'yes'
1399*9c5db199SXin Li
1400*9c5db199SXin Li    def get_ap_ro_info(self):
1401*9c5db199SXin Li        """Returns a dictionary of the AP RO info.
1402*9c5db199SXin Li
1403*9c5db199SXin Li        Get the ap_ro_info output. Convert it to a usable dictionary.
1404*9c5db199SXin Li
1405*9c5db199SXin Li        Returns:
1406*9c5db199SXin Li            A dictionary with the following key value pairs.
1407*9c5db199SXin Li                'reason': String of unsupported reason or None if ap ro is
1408*9c5db199SXin Li                          supported.
1409*9c5db199SXin Li                'hash': 64 char hash or None if it isn't supported.
1410*9c5db199SXin Li                'supported': bool whether AP RO verification is supported.
1411*9c5db199SXin Li                'result': int of the AP RO verification result.
1412*9c5db199SXin Li        """
1413*9c5db199SXin Li        # Cr50 prints different output based on whether ap ro verification is
1414*9c5db199SXin Li        # supported.
1415*9c5db199SXin Li        if self.ap_ro_supported():
1416*9c5db199SXin Li            output = self.AP_RO_SAVED_OUTPUT
1417*9c5db199SXin Li        else:
1418*9c5db199SXin Li            output = self.AP_RO_UNSUPPORTED_OUTPUT
1419*9c5db199SXin Li        # The reason and hash output is optional. Make sure it's in the
1420*9c5db199SXin Li        # dictionary even if it isn't in the output.
1421*9c5db199SXin Li        info = {'hash': None, 'reason': None}
1422*9c5db199SXin Li        rv = self.send_command_retry_get_output('ap_ro_info',
1423*9c5db199SXin Li                                                output,
1424*9c5db199SXin Li                                                compare_output=True)
1425*9c5db199SXin Li        for _, k, v in rv:
1426*9c5db199SXin Li            # Make key more usable.
1427*9c5db199SXin Li            if k == 'ap_ro_check_unsupported':
1428*9c5db199SXin Li                k = 'reason'
1429*9c5db199SXin Li            # Convert digit strings to ints
1430*9c5db199SXin Li            if v.isdigit():
1431*9c5db199SXin Li                v = int(v)
1432*9c5db199SXin Li            # Convert yes or no to bool
1433*9c5db199SXin Li            if k == 'supported':
1434*9c5db199SXin Li                v = v == 'yes'
1435*9c5db199SXin Li            info[k] = v
1436*9c5db199SXin Li        return info
1437