1# Copyright 2016 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import re 7import time 8 9from autotest_lib.client.common_lib import error 10from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 11from autotest_lib.server.cros.servo import pd_device 12 13 14class firmware_PDDataSwap(FirmwareTest): 15 """ 16 Servo based USB PD data role swap test 17 18 Pass critera is all data role swaps complete, or 19 a reject control message is received from the DUT in the 20 cases where the swap does not complete. 21 22 """ 23 24 version = 1 25 PD_ROLE_DELAY = 0.5 26 PD_CONNECT_DELAY = 4 27 DATA_SWAP_ITERATIONS = 10 28 # Upward facing port data role 29 UFP = 'UFP' 30 # Downward facing port data role 31 DFP = 'DFP' 32 # Swap Result Tables 33 swap_attempt = { 34 ('rx', DFP): 0, 35 ('rx', UFP): 0, 36 ('tx', DFP): 0, 37 ('tx', UFP): 0 38 } 39 swap_failure = { 40 ('rx', DFP): 0, 41 ('rx', UFP): 0, 42 ('tx', DFP): 0, 43 ('tx', UFP): 0 44 } 45 46 def _get_data_role(self, port): 47 """Get data role of PD connection 48 49 @param console: pd console object for uart access 50 @param port: 0/1 pd port of current connection 51 52 @returns: 'DFP' or 'UFP' 53 """ 54 role = port.get_pd_role() 55 m = re.search('[\w]+-([\w]+)', role) 56 return m.group(1) 57 58 def _get_remote_role(self, local_role): 59 """Invert data role 60 61 @param local_role: data role to be flipped 62 63 @returns: flipped data role value 64 """ 65 if local_role == self.DFP: 66 return self.UFP 67 else: 68 return self.DFP 69 70 def _change_dut_power_role(self): 71 """Force power role change via PDTester 72 73 @returns True is power role change is successful 74 """ 75 PDTESTER_SRC_VOLTAGE = 5 76 PDTESTER_SNK_VOLTAGE = 0 77 pd_state = self.dut_port.get_pd_state() 78 if self.dut_port.is_src(): 79 # DUT is currently a SRC, so change to SNK 80 # Use PDTester method to ensure power role change 81 self.pdtester.charge(PDTESTER_SRC_VOLTAGE) 82 else: 83 # DUT is currently a SNK, so change it to a SRC. 84 self.pdtester.charge(PDTESTER_SNK_VOLTAGE) 85 # Wait for change to take place 86 time.sleep(self.PD_CONNECT_DELAY) 87 pdtester_state = self.pdtester_port.get_pd_state() 88 # Current PDTester state should equal DUT state when called 89 return bool(pd_state == pdtester_state) 90 91 def _send_data_swap_get_reply(self, port): 92 """Send data swap request, get PD control msg reply 93 94 The PD console debug mode is enabled prior to sending 95 a pd data role swap request message. This allows the 96 control message reply to be extracted. The debug mode 97 is disabled prior to exiting. 98 99 @param port: pd device object 100 101 @returns: PD control header message 102 """ 103 # Enable PD console debug mode to show control messages 104 port.utils.enable_pd_console_debug() 105 cmd = 'pd %d swap data' % port.port 106 m = port.utils.send_pd_command_get_output(cmd, ['RECV\s([\w]+)']) 107 ctrl_msg = int(m[0][1], 16) & port.utils.PD_CONTROL_MSG_MASK 108 port.utils.disable_pd_console_debug() 109 return ctrl_msg 110 111 def _attempt_data_swap(self, direction): 112 """Perform a data role swap request 113 114 Data swap requests can be either initiated by the DUT or received 115 by the DUT. This direction determines which PD console is used 116 to initiate the swap command. The data role before and after 117 the swap command are compared to determine if it took place. 118 119 Even if data swap capability is advertised, a PD device is allowed 120 to reject the request. Therefore, not swapping isn't itself a 121 failure. When PDTester is used to initate the request, the debug 122 mode is enabled which allows the control message from the DUT to 123 be analyzed. If the swap does not occur, but the request is rejected 124 by the DUT then that is not counted as a failure. 125 126 @param direction: rx or tx from the DUT perspective 127 128 @returns PD control reply message for tx swaps, 0 otherwise 129 """ 130 # Get starting DUT data role 131 dut_dr = self._get_data_role(self.dut_port) 132 self.swap_attempt[(direction, dut_dr)] += 1 133 if direction == 'tx': 134 # Initiate swap request from the DUT 135 cmd = 'pd %d swap data' % self.dut_port.port 136 # Send the 'swap data' command 137 self.dut_port.utils.send_pd_command(cmd) 138 # Not using debug mode, so there is no reply message 139 ctrl = 0 140 else: 141 # Initiate swap request from PDTester 142 ctrl = self._send_data_swap_get_reply(self.pdtester_port) 143 144 time.sleep(self.PD_ROLE_DELAY) 145 # Get DUT current data role 146 swap_dr = self._get_data_role(self.dut_port) 147 logging.info('%s swap attempt: prev = %s, new = %s, msg = %s', 148 direction, dut_dr, swap_dr, ctrl) 149 if (dut_dr == swap_dr and 150 ctrl != self.dut_port.utils.PD_CONTROL_MSG_DICT['Reject']): 151 self.swap_failure[(direction, dut_dr)] += 1 152 return ctrl 153 154 def _execute_data_role_swap_test(self): 155 """Execute a series of data role swaps 156 157 Attempt both rx and tx data swaps, from perspective of DUT. 158 Even if the DUT advertises support, it can 159 reject swap requests when already in the desired data role. For 160 example many devices will not swap if already in DFP mode. 161 However, PDTester should always accept a request. Therefore, 162 when a swap failed on a rx swap, then that is followed by 163 a tx swap attempt. 164 165 @param pd_port: port number of DUT PD connection 166 """ 167 for attempt in range(self.DATA_SWAP_ITERATIONS): 168 # Use the same direction for every 2 loop iterations 169 if attempt & 2: 170 direction = 'tx' 171 else: 172 direction = 'rx' 173 ctrl_msg = self._attempt_data_swap(direction) 174 if (direction == 'rx' and 175 ctrl_msg == 176 self.dut_port.utils.PD_CONTROL_MSG_DICT['Reject']): 177 # Use pdtester initiated swap to change roles 178 self._attempt_data_swap('tx') 179 180 def _test_data_swap_reject(self): 181 """Verify that data swap request is rejected 182 183 This tests the case where the DUT doesn't advertise support 184 for data swaps. A data request is sent by PDTester, and then 185 the control message checked to ensure the request was rejected. 186 In addition, the data role and connection state are verified 187 to remain unchanged. 188 """ 189 # Get current DUT data role 190 dut_data_role = self._get_data_role(self.dut_port) 191 dut_connect_state = self.dut_port.get_pd_state() 192 # Send swap command from PDTester and get reply 193 ctrl_msg = self._send_data_swap_get_reply(self.pdtester_port) 194 if ctrl_msg != self.dut_port.utils.PD_CONTROL_MSG_DICT['Reject']: 195 raise error.TestFail('Data Swap Req not rejected, returned %r' % 196 ctrl_msg) 197 # Get DUT current state 198 pd_state = self.dut_port.get_pd_state() 199 if pd_state != dut_connect_state: 200 raise error.TestFail('PD not connected! pd_state = %r' % 201 pd_state) 202 # Since reject message was received, verify data role didn't change 203 curr_dr = self._get_data_role(self.dut_port) 204 if curr_dr != dut_data_role: 205 raise error.TestFail('Unexpected PD data role change') 206 207 def initialize(self, host, cmdline_args, flip_cc=False, dts_mode=False, 208 init_power_mode=None): 209 super(firmware_PDDataSwap, self).initialize(host, cmdline_args) 210 self.setup_pdtester(flip_cc, dts_mode, min_batt_level=10) 211 # Only run in normal mode 212 self.switcher.setup_mode('normal') 213 if init_power_mode: 214 # Set the DUT to suspend or shutdown mode 215 self.set_ap_off_power_mode(init_power_mode) 216 self.usbpd.send_command('chan 0') 217 218 def cleanup(self): 219 self.usbpd.send_command('chan 0xffffffff') 220 self.restore_ap_on_power_mode() 221 super(firmware_PDDataSwap, self).cleanup() 222 223 def run_once(self): 224 """Exectue Data Role swap test. 225 226 1. Verify that pd console is accessible 227 2. Verify that DUT has a valid PD contract 228 3. Determine if DUT advertises support for data swaps 229 4. Test DUT initiated and received data swaps 230 5. Swap power roles if supported 231 6. Repeat DUT received data swap requests 232 233 """ 234 235 # Create list of available UART consoles 236 consoles = [self.usbpd, self.pdtester] 237 port_partner = pd_device.PDPortPartner(consoles) 238 239 # Identify a valid test port pair 240 port_pair = port_partner.identify_pd_devices() 241 if not port_pair: 242 raise error.TestFail('No PD connection found!') 243 244 for port in port_pair: 245 if port.is_pdtester: 246 self.pdtester_port = port 247 else: 248 self.dut_port = port 249 250 dut_connect_state = self.dut_port.get_pd_state() 251 logging.info('Initial DUT connect state = %s', dut_connect_state) 252 253 # Determine if DUT supports data role swaps 254 dr_swap_allowed = self.pdtester_port.is_pd_flag_set('data_swap') 255 256 # Get current DUT data role 257 dut_data_role = self._get_data_role(self.dut_port) 258 logging.info('Starting DUT Data Role = %r', dut_data_role) 259 260 # If data swaps are not allowed on the DUT, then still 261 # attempt a data swap and verify that the request is 262 # rejected by the DUT and that it remains connected and 263 # in the same role. 264 if dr_swap_allowed == False: 265 logging.info('Data Swap support not advertised by DUT') 266 self._test_data_swap_reject() 267 logging.info('Data Swap request rejected by DUT as expected') 268 else: 269 # Data role swap support advertised, test this feature. 270 self._execute_data_role_swap_test() 271 272 # If DUT supports Power Role swap then attempt to change roles. 273 # This way, data role swaps will be tested in both configurations. 274 if self.pdtester_port.is_pd_flag_set('power_swap'): 275 logging.info('\nDUT advertises Power Swap Support') 276 # Attempt to swap power roles 277 power_swap = self._change_dut_power_role() 278 if power_swap: 279 try: 280 self._execute_data_role_swap_test() 281 finally: 282 # Swap power role, back to the original 283 self._change_dut_power_role() 284 else: 285 logging.warning('Power swap not successful!') 286 logging.warning('Only tested with DUT in %s state', 287 dut_connect_state) 288 else: 289 logging.info('DUT does not advertise power swap support') 290 291 logging.info('***************** Swap Results ********************') 292 total_attempts = 0 293 total_failures = 0 294 for direction, role in self.swap_attempt.keys(): 295 logging.info('%s %s swap attempts = %d, failures = %d', 296 direction, role, 297 self.swap_attempt[(direction, role)], 298 self.swap_failure[(direction, role)]) 299 total_attempts += self.swap_attempt[(direction, role)] 300 total_failures += self.swap_failure[(direction, role)] 301 302 # If any swap attempts were not successful, flag test as failure 303 if total_failures: 304 raise error.TestFail('Data Swap Fail: Attempt = %d, Failure = %d' % 305 (total_attempts, total_failures)) 306