xref: /aosp_15_r20/external/autotest/server/site_tests/firmware_PDDataSwap/firmware_PDDataSwap.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
11from autotest_lib.server.cros.servo import pd_device
12
13
14class firmware_PDDataSwap(FirmwareTest):
15    """
16    Servo based USB PD data role swap test
17
18    Pass critera is all data role swaps complete, or
19    a reject control message is received from the DUT in the
20    cases where the swap does not complete.
21
22    """
23
24    version = 1
25    PD_ROLE_DELAY = 0.5
26    PD_CONNECT_DELAY = 4
27    DATA_SWAP_ITERATIONS = 10
28    # Upward facing port data role
29    UFP = 'UFP'
30    # Downward facing port data role
31    DFP = 'DFP'
32    # Swap Result Tables
33    swap_attempt = {
34        ('rx', DFP): 0,
35        ('rx', UFP): 0,
36        ('tx', DFP): 0,
37        ('tx', UFP): 0
38    }
39    swap_failure = {
40        ('rx', DFP): 0,
41        ('rx', UFP): 0,
42        ('tx', DFP): 0,
43        ('tx', UFP): 0
44    }
45
46    def _get_data_role(self, port):
47        """Get data role of PD connection
48
49        @param console: pd console object for uart access
50        @param port: 0/1 pd port of current connection
51
52        @returns: 'DFP' or 'UFP'
53        """
54        role = port.get_pd_role()
55        m = re.search('[\w]+-([\w]+)', role)
56        return m.group(1)
57
58    def _get_remote_role(self, local_role):
59        """Invert data role
60
61        @param local_role: data role to be flipped
62
63        @returns: flipped data role value
64        """
65        if local_role == self.DFP:
66            return self.UFP
67        else:
68            return self.DFP
69
70    def _change_dut_power_role(self):
71        """Force power role change via PDTester
72
73        @returns True is power role change is successful
74        """
75        PDTESTER_SRC_VOLTAGE = 5
76        PDTESTER_SNK_VOLTAGE = 0
77        pd_state = self.dut_port.get_pd_state()
78        if self.dut_port.is_src():
79            # DUT is currently a SRC, so change to SNK
80            # Use PDTester method to ensure power role change
81            self.pdtester.charge(PDTESTER_SRC_VOLTAGE)
82        else:
83            # DUT is currently a SNK, so change it to a SRC.
84            self.pdtester.charge(PDTESTER_SNK_VOLTAGE)
85        # Wait for change to take place
86        time.sleep(self.PD_CONNECT_DELAY)
87        pdtester_state = self.pdtester_port.get_pd_state()
88        # Current PDTester state should equal DUT state when called
89        return bool(pd_state == pdtester_state)
90
91    def _send_data_swap_get_reply(self, port):
92        """Send data swap request, get PD control msg reply
93
94        The PD console debug mode is enabled prior to sending
95        a pd data role swap request message. This allows the
96        control message reply to be extracted. The debug mode
97        is disabled prior to exiting.
98
99        @param port: pd device object
100
101        @returns: PD control header message
102        """
103        # Enable PD console debug mode to show control messages
104        port.utils.enable_pd_console_debug()
105        cmd = 'pd %d swap data' % port.port
106        m = port.utils.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
107        ctrl_msg = int(m[0][1], 16) & port.utils.PD_CONTROL_MSG_MASK
108        port.utils.disable_pd_console_debug()
109        return ctrl_msg
110
111    def _attempt_data_swap(self, direction):
112        """Perform a data role swap request
113
114        Data swap requests can be either initiated by the DUT or received
115        by the DUT. This direction determines which PD console is used
116        to initiate the swap command. The data role before and after
117        the swap command are compared to determine if it took place.
118
119        Even if data swap capability is advertised, a PD device is allowed
120        to reject the request. Therefore, not swapping isn't itself a
121        failure. When PDTester is used to initate the request, the debug
122        mode is enabled which allows the control message from the DUT to
123        be analyzed. If the swap does not occur, but the request is rejected
124        by the DUT then that is not counted as a failure.
125
126        @param direction: rx or tx from the DUT perspective
127
128        @returns PD control reply message for tx swaps, 0 otherwise
129        """
130        # Get starting DUT data role
131        dut_dr = self._get_data_role(self.dut_port)
132        self.swap_attempt[(direction, dut_dr)] += 1
133        if direction == 'tx':
134            # Initiate swap request from the DUT
135            cmd = 'pd %d swap data' % self.dut_port.port
136            # Send the 'swap data' command
137            self.dut_port.utils.send_pd_command(cmd)
138            # Not using debug mode, so there is no reply message
139            ctrl = 0
140        else:
141            # Initiate swap request from PDTester
142            ctrl = self._send_data_swap_get_reply(self.pdtester_port)
143
144        time.sleep(self.PD_ROLE_DELAY)
145        # Get DUT current data role
146        swap_dr = self._get_data_role(self.dut_port)
147        logging.info('%s swap attempt: prev = %s, new = %s, msg = %s',
148                      direction, dut_dr, swap_dr, ctrl)
149        if (dut_dr == swap_dr and
150                ctrl != self.dut_port.utils.PD_CONTROL_MSG_DICT['Reject']):
151            self.swap_failure[(direction, dut_dr)] += 1
152        return ctrl
153
154    def _execute_data_role_swap_test(self):
155        """Execute a series of data role swaps
156
157        Attempt both rx and tx data swaps, from perspective of DUT.
158        Even if the DUT advertises support, it can
159        reject swap requests when already in the desired data role. For
160        example many devices will not swap if already in DFP mode.
161        However, PDTester should always accept a request. Therefore,
162        when a swap failed on a rx swap, then that is followed by
163        a tx swap attempt.
164
165        @param pd_port: port number of DUT PD connection
166        """
167        for attempt in range(self.DATA_SWAP_ITERATIONS):
168            # Use the same direction for every 2 loop iterations
169            if attempt & 2:
170                direction = 'tx'
171            else:
172                direction = 'rx'
173            ctrl_msg = self._attempt_data_swap(direction)
174            if (direction == 'rx' and
175                    ctrl_msg ==
176                    self.dut_port.utils.PD_CONTROL_MSG_DICT['Reject']):
177                # Use pdtester initiated swap to change roles
178                self._attempt_data_swap('tx')
179
180    def _test_data_swap_reject(self):
181        """Verify that data swap request is rejected
182
183        This tests the case where the DUT doesn't advertise support
184        for data swaps. A data request is sent by PDTester, and then
185        the control message checked to ensure the request was rejected.
186        In addition, the data role and connection state are verified
187        to remain unchanged.
188        """
189        # Get current DUT data role
190        dut_data_role = self._get_data_role(self.dut_port)
191        dut_connect_state = self.dut_port.get_pd_state()
192        # Send swap command from PDTester and get reply
193        ctrl_msg = self._send_data_swap_get_reply(self.pdtester_port)
194        if ctrl_msg != self.dut_port.utils.PD_CONTROL_MSG_DICT['Reject']:
195            raise error.TestFail('Data Swap Req not rejected, returned %r' %
196                                 ctrl_msg)
197        # Get DUT current state
198        pd_state = self.dut_port.get_pd_state()
199        if pd_state != dut_connect_state:
200            raise error.TestFail('PD not connected! pd_state = %r' %
201                                 pd_state)
202        # Since reject message was received, verify data role didn't change
203        curr_dr = self._get_data_role(self.dut_port)
204        if curr_dr != dut_data_role:
205            raise error.TestFail('Unexpected PD data role change')
206
207    def initialize(self, host, cmdline_args, flip_cc=False, dts_mode=False,
208                   init_power_mode=None):
209        super(firmware_PDDataSwap, self).initialize(host, cmdline_args)
210        self.setup_pdtester(flip_cc, dts_mode, min_batt_level=10)
211        # Only run in normal mode
212        self.switcher.setup_mode('normal')
213        if init_power_mode:
214            # Set the DUT to suspend or shutdown mode
215            self.set_ap_off_power_mode(init_power_mode)
216        self.usbpd.send_command('chan 0')
217
218    def cleanup(self):
219        self.usbpd.send_command('chan 0xffffffff')
220        self.restore_ap_on_power_mode()
221        super(firmware_PDDataSwap, self).cleanup()
222
223    def run_once(self):
224        """Exectue Data Role swap test.
225
226        1. Verify that pd console is accessible
227        2. Verify that DUT has a valid PD contract
228        3. Determine if DUT advertises support for data swaps
229        4. Test DUT initiated and received data swaps
230        5. Swap power roles if supported
231        6. Repeat DUT received data swap requests
232
233        """
234
235        # Create list of available UART consoles
236        consoles = [self.usbpd, self.pdtester]
237        port_partner = pd_device.PDPortPartner(consoles)
238
239        # Identify a valid test port pair
240        port_pair = port_partner.identify_pd_devices()
241        if not port_pair:
242            raise error.TestFail('No PD connection found!')
243
244        for port in port_pair:
245            if port.is_pdtester:
246                self.pdtester_port = port
247            else:
248                self.dut_port = port
249
250        dut_connect_state = self.dut_port.get_pd_state()
251        logging.info('Initial DUT connect state = %s', dut_connect_state)
252
253        # Determine if DUT supports data role swaps
254        dr_swap_allowed = self.pdtester_port.is_pd_flag_set('data_swap')
255
256        # Get current DUT data role
257        dut_data_role = self._get_data_role(self.dut_port)
258        logging.info('Starting DUT Data Role = %r', dut_data_role)
259
260        # If data swaps are not allowed on the DUT, then still
261        # attempt a data swap and verify that the request is
262        # rejected by the DUT and that it remains connected and
263        # in the same role.
264        if dr_swap_allowed == False:
265            logging.info('Data Swap support not advertised by DUT')
266            self._test_data_swap_reject()
267            logging.info('Data Swap request rejected by DUT as expected')
268        else:
269            # Data role swap support advertised, test this feature.
270            self._execute_data_role_swap_test()
271
272            # If DUT supports Power Role swap then attempt to change roles.
273            # This way, data role swaps will be tested in both configurations.
274            if self.pdtester_port.is_pd_flag_set('power_swap'):
275                logging.info('\nDUT advertises Power Swap Support')
276                # Attempt to swap power roles
277                power_swap = self._change_dut_power_role()
278                if power_swap:
279                    try:
280                        self._execute_data_role_swap_test()
281                    finally:
282                        # Swap power role, back to the original
283                        self._change_dut_power_role()
284                else:
285                    logging.warning('Power swap not successful!')
286                    logging.warning('Only tested with DUT in %s state',
287                                 dut_connect_state)
288            else:
289                logging.info('DUT does not advertise power swap support')
290
291            logging.info('***************** Swap Results ********************')
292            total_attempts = 0
293            total_failures = 0
294            for direction, role in self.swap_attempt.keys():
295                logging.info('%s %s swap attempts = %d, failures = %d',
296                             direction, role,
297                             self.swap_attempt[(direction, role)],
298                             self.swap_failure[(direction, role)])
299                total_attempts += self.swap_attempt[(direction, role)]
300                total_failures += self.swap_failure[(direction, role)]
301
302            # If any swap attempts were not successful, flag test as failure
303            if total_failures:
304                raise error.TestFail('Data Swap Fail: Attempt = %d, Failure = %d' %
305                                 (total_attempts, total_failures))
306