xref: /aosp_15_r20/external/autotest/server/site_tests/firmware_PDProtocol/firmware_PDProtocol.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7
8from collections import defaultdict
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
12from autotest_lib.server.cros.servo import pd_console
13
14
15class firmware_PDProtocol(FirmwareTest):
16    """
17    Servo based USB PD protocol test.
18
19    A charger must be connected to the DUT for this test.
20
21    This test checks that when an appropriate zinger charger is connected that
22    the PD is properly negotiated in dev mode and when booted from a test image
23    through recovery that the PD is not negotiated.
24
25    Example:
26    PD Successfully negotiated
27    - ServoV4 in SRC_READY or SNK_READY state
28
29    PD not negotiated
30    - ServoV4 in SRC_DISCOVERY or SNK_DISCOVERY state
31
32    """
33    version = 1
34    NEEDS_SERVO_USB = True
35
36    PD_NOT_SUPPORTED_PATTERN = 'INVALID_COMMAND'
37
38    ECTOOL_CMD_DICT = defaultdict(lambda: 'ectool usbpdpower')
39
40    def initialize(self, host, cmdline_args, ec_wp=None):
41        """Initialize the test"""
42        super(firmware_PDProtocol, self).initialize(host, cmdline_args,
43                                                    ec_wp=ec_wp)
44
45        self.ECTOOL_CMD_DICT['samus'] = 'ectool --dev=1 usbpdpower'
46
47        self.current_board = self.servo.get_board();
48
49        self.check_if_pd_supported()
50        self.switcher.setup_mode('dev')
51        # The USB disk is used for recovery. But this test wants a fine-grained
52        # control, i.e. swapping the role just before booting into recovery,
53        # not swapping here. So set used_for_recovery=False.
54        self.setup_usbkey(usbkey=True, host=False, used_for_recovery=False)
55
56        self.original_dev_boot_usb = self.faft_client.system.get_dev_boot_usb()
57        logging.info('Original dev_boot_usb value: %s',
58                     str(self.original_dev_boot_usb))
59
60        self.hw_wp = self.servo.get('fw_wp_state')
61        self.sw_wp = self.faft_client.ec.get_write_protect_status()['enabled']
62        logging.info('hw_wp=%s, sw_wp=%s', self.hw_wp, self.sw_wp)
63
64    def cleanup(self):
65        """Cleanup the test"""
66        if hasattr(self, 'original_dev_boot_usb'):
67            self.ensure_dev_internal_boot(self.original_dev_boot_usb)
68        super(firmware_PDProtocol, self).cleanup()
69
70    def check_if_pd_supported(self):
71        """ Checks if the DUT responds to ectool usbpdpower and skips the test
72        if it isn't supported on the device.
73        """
74        output = self.run_command(self.ECTOOL_CMD_DICT[self.current_board])
75
76        if (not output or
77            self.check_ec_output(output, self.PD_NOT_SUPPORTED_PATTERN)):
78            raise error.TestNAError("PD not supported skipping test.")
79
80    def boot_to_recovery(self):
81        """Boot device into recovery mode."""
82        logging.info('Reboot into Recovery...')
83        self.switcher.reboot_to_mode(to_mode='rec')
84
85        self.check_state((self.checkers.crossystem_checker,
86                          {'mainfw_type': 'recovery'}))
87
88    def run_command(self, command):
89        """Runs the specified command and returns the output
90        as a list of strings.
91
92        @param command: The command to run on the DUT
93        @return A list of strings of the command output
94        """
95        logging.info('Command to run: %s', command)
96
97        output = self.faft_client.system.run_shell_command_get_output(command)
98
99        logging.info('Command output: %s', output)
100
101        return output
102
103    def check_ec_output(self, output, pattern):
104        """Checks if any line in the output matches the given pattern.
105
106        @param output: A list of strings containg the output to search
107        @param pattern: The regex to search the output for
108
109        @return True upon first match found or False
110        """
111        logging.info('Checking %s for %s.', output, pattern)
112
113        for line in output:
114            if bool(re.search(pattern, line)):
115                return True
116
117        return False
118
119    def run_once(self, host):
120        """Main test logic"""
121        # TODO(b/35573842): Refactor to use PDPortPartner to probe the port
122        self.pdtester_port = 1 if 'servo_v4' in self.pdtester.servo_type else 0
123        self.pdtester_pd_utils = pd_console.create_pd_console_utils(
124                                 self.pdtester)
125
126        self.ensure_dev_internal_boot(self.original_dev_boot_usb)
127
128        # Check servo_v4 is negotiated
129        if self.pdtester_pd_utils.is_disconnected(self.pdtester_port):
130            raise error.TestNAError('PD not connected')
131
132        # TODO(b:152148025): Directly set role as pdsnkdts might fail the
133        # PD communication. In short term, we could use PR SWAP instead, and
134        # should also fix the TCPM for handling SRCDTS -> SNKDTS case.
135        self.set_servo_v4_role_to_snk(pd_comm=True)
136        self.boot_to_recovery()
137
138        # Check PD is not negotiated
139        # We allow the chromebox/chromebase, to enable the PD in the
140        # recovery mode.
141        if (host.get_board_type() != 'CHROMEBOX'
142                    and host.get_board_type() != 'CHROMEBASE'
143                    and not self.pdtester_pd_utils.is_snk_discovery_state(
144                            self.pdtester_port)):
145            raise error.TestFail('Expect PD to be disabled, WP (HW/SW) %s/%s' %
146                                 (self.hw_wp, self.sw_wp))
147
148        # Check WP status. Only both SW/HW WP on should pass the test.
149        if (not self.sw_wp) or ('off' in self.hw_wp):
150            raise error.TestFail(
151                'Expect HW and SW WP on, got hw_wp=%s, sw_wp=%s' %
152                (self.hw_wp, self.sw_wp))
153