1*9c5db199SXin Li# Lint as: python2, python3 2*9c5db199SXin Li# Copyright 2016 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Lifrom __future__ import absolute_import 7*9c5db199SXin Lifrom __future__ import division 8*9c5db199SXin Lifrom __future__ import print_function 9*9c5db199SXin Li 10*9c5db199SXin Liimport re 11*9c5db199SXin Liimport logging 12*9c5db199SXin Lifrom six.moves import range 13*9c5db199SXin Liimport time 14*9c5db199SXin Li 15*9c5db199SXin Lifrom autotest_lib.client.common_lib import error 16*9c5db199SXin Lifrom autotest_lib.server.cros.servo import pd_console 17*9c5db199SXin Li 18*9c5db199SXin Li 19*9c5db199SXin Liclass PDDevice(object): 20*9c5db199SXin Li """Base clase for all PD devices 21*9c5db199SXin Li 22*9c5db199SXin Li This class provides a set of APIs for expected Type C PD required actions 23*9c5db199SXin Li in TypeC FAFT tests. The base class is specific for Type C devices that 24*9c5db199SXin Li do not have any console access. 25*9c5db199SXin Li 26*9c5db199SXin Li """ 27*9c5db199SXin Li 28*9c5db199SXin Li def is_src(self, state=None): 29*9c5db199SXin Li """Checks if the port is connected as a source 30*9c5db199SXin Li 31*9c5db199SXin Li """ 32*9c5db199SXin Li raise NotImplementedError( 33*9c5db199SXin Li 'is_src should be implemented in derived class') 34*9c5db199SXin Li 35*9c5db199SXin Li def is_snk(self, state=None): 36*9c5db199SXin Li """Checks if the port is connected as a sink 37*9c5db199SXin Li 38*9c5db199SXin Li @returns None 39*9c5db199SXin Li """ 40*9c5db199SXin Li raise NotImplementedError( 41*9c5db199SXin Li 'is_snk should be implemented in derived class') 42*9c5db199SXin Li 43*9c5db199SXin Li def is_connected(self, state=None): 44*9c5db199SXin Li """Checks if the port is connected 45*9c5db199SXin Li 46*9c5db199SXin Li @returns True if in a connected state, False otherwise 47*9c5db199SXin Li """ 48*9c5db199SXin Li return self.is_src(state) or self.is_snk(state) 49*9c5db199SXin Li 50*9c5db199SXin Li def is_disconnected(self, state=None): 51*9c5db199SXin Li """Checks if the port is disconnected 52*9c5db199SXin Li 53*9c5db199SXin Li """ 54*9c5db199SXin Li raise NotImplementedError( 55*9c5db199SXin Li 'is_disconnected should be implemented in derived class') 56*9c5db199SXin Li 57*9c5db199SXin Li def is_ufp(self): 58*9c5db199SXin Li """Checks if data role is UFP 59*9c5db199SXin Li 60*9c5db199SXin Li """ 61*9c5db199SXin Li raise NotImplementedError( 62*9c5db199SXin Li 'is_ufp should be implemented in derived class') 63*9c5db199SXin Li 64*9c5db199SXin Li def is_dfp(self): 65*9c5db199SXin Li """Checks if data role is DFP 66*9c5db199SXin Li 67*9c5db199SXin Li """ 68*9c5db199SXin Li raise NotImplementedError( 69*9c5db199SXin Li 'is_dfp should be implemented in derived class') 70*9c5db199SXin Li 71*9c5db199SXin Li def is_drp(self): 72*9c5db199SXin Li """Checks if dual role mode is supported 73*9c5db199SXin Li 74*9c5db199SXin Li """ 75*9c5db199SXin Li raise NotImplementedError( 76*9c5db199SXin Li 'is_drp should be implemented in derived class') 77*9c5db199SXin Li 78*9c5db199SXin Li def dr_swap(self): 79*9c5db199SXin Li """Attempts a data role swap 80*9c5db199SXin Li 81*9c5db199SXin Li """ 82*9c5db199SXin Li raise NotImplementedError( 83*9c5db199SXin Li 'dr_swap should be implemented in derived class') 84*9c5db199SXin Li 85*9c5db199SXin Li def pr_swap(self): 86*9c5db199SXin Li """Attempts a power role swap 87*9c5db199SXin Li 88*9c5db199SXin Li """ 89*9c5db199SXin Li raise NotImplementedError( 90*9c5db199SXin Li 'pr_swap should be implemented in derived class') 91*9c5db199SXin Li 92*9c5db199SXin Li def vbus_request(self, voltage): 93*9c5db199SXin Li """Requests a specific VBUS voltage from SRC 94*9c5db199SXin Li 95*9c5db199SXin Li @param voltage: requested voltage level (5, 12, 20) in volts 96*9c5db199SXin Li """ 97*9c5db199SXin Li raise NotImplementedError( 98*9c5db199SXin Li 'vbus_request should be implemented in derived class') 99*9c5db199SXin Li 100*9c5db199SXin Li def soft_reset(self): 101*9c5db199SXin Li """Initates a PD soft reset sequence 102*9c5db199SXin Li 103*9c5db199SXin Li """ 104*9c5db199SXin Li raise NotImplementedError( 105*9c5db199SXin Li 'soft_reset should be implemented in derived class') 106*9c5db199SXin Li 107*9c5db199SXin Li def hard_reset(self): 108*9c5db199SXin Li """Initates a PD hard reset sequence 109*9c5db199SXin Li 110*9c5db199SXin Li """ 111*9c5db199SXin Li raise NotImplementedError( 112*9c5db199SXin Li 'hard_reset should be implemented in derived class') 113*9c5db199SXin Li 114*9c5db199SXin Li def drp_set(self, mode): 115*9c5db199SXin Li """Sets dualrole mode 116*9c5db199SXin Li 117*9c5db199SXin Li @param mode: desired dual role setting (on, off, snk, src) 118*9c5db199SXin Li """ 119*9c5db199SXin Li raise NotImplementedError( 120*9c5db199SXin Li 'drp_set should be implemented in derived class') 121*9c5db199SXin Li 122*9c5db199SXin Li def drp_get(self): 123*9c5db199SXin Li """Gets dualrole mode 124*9c5db199SXin Li 125*9c5db199SXin Li @returns one of the modes (on, off, snk, src) 126*9c5db199SXin Li """ 127*9c5db199SXin Li raise NotImplementedError( 128*9c5db199SXin Li 'drp_set should be implemented in derived class') 129*9c5db199SXin Li 130*9c5db199SXin Li def drp_disconnect_connect(self, disc_time_sec): 131*9c5db199SXin Li """Force PD disconnect/connect via drp settings 132*9c5db199SXin Li 133*9c5db199SXin Li @param disc_time_sec: Time in seconds between disconnect and reconnect 134*9c5db199SXin Li """ 135*9c5db199SXin Li raise NotImplementedError( 136*9c5db199SXin Li 'drp_disconnect_connect should be implemented in derived class' 137*9c5db199SXin Li ) 138*9c5db199SXin Li 139*9c5db199SXin Li def cc_disconnect_connect(self, disc_time_sec): 140*9c5db199SXin Li """Force PD disconnect/connect 141*9c5db199SXin Li 142*9c5db199SXin Li @param disc_time_sec: Time in seconds between disconnect and reconnect 143*9c5db199SXin Li """ 144*9c5db199SXin Li raise NotImplementedError( 145*9c5db199SXin Li 'cc_disconnect_connect should be implemented in derived class') 146*9c5db199SXin Li 147*9c5db199SXin Li def get_connected_state_after_cc_reconnect(self, disc_time_sec): 148*9c5db199SXin Li """Get the connected state after disconnect/reconnect 149*9c5db199SXin Li 150*9c5db199SXin Li @param disc_time_sec: Time in seconds for disconnect period. 151*9c5db199SXin Li @returns: The connected PD state. 152*9c5db199SXin Li """ 153*9c5db199SXin Li raise NotImplementedError( 154*9c5db199SXin Li 'get_connected_state_after_cc_reconnect should be implemented' 155*9c5db199SXin Li 'in derived class') 156*9c5db199SXin Li 157*9c5db199SXin Li 158*9c5db199SXin Liclass PDConsoleDevice(PDDevice): 159*9c5db199SXin Li """Class for PD devices that have console access 160*9c5db199SXin Li 161*9c5db199SXin Li This class contains methods for common PD actions for any PD device which 162*9c5db199SXin Li has UART console access. It inherits the PD device base class. In addition, 163*9c5db199SXin Li it stores both the UART console and port for the PD device. 164*9c5db199SXin Li """ 165*9c5db199SXin Li 166*9c5db199SXin Li def __init__(self, console, port, utils): 167*9c5db199SXin Li """Initialization method 168*9c5db199SXin Li 169*9c5db199SXin Li @param console: UART console object 170*9c5db199SXin Li @param port: USB PD port number 171*9c5db199SXin Li """ 172*9c5db199SXin Li # Save UART console 173*9c5db199SXin Li self.console = console 174*9c5db199SXin Li # Instantiate PD utilities used by methods in this class 175*9c5db199SXin Li self.utils = utils 176*9c5db199SXin Li # Save the PD port number for this device 177*9c5db199SXin Li self.port = port 178*9c5db199SXin Li # Not a PDTester device 179*9c5db199SXin Li self.is_pdtester = False 180*9c5db199SXin Li 181*9c5db199SXin Li def get_pd_state(self): 182*9c5db199SXin Li """Get the state of the PD port""" 183*9c5db199SXin Li return self.utils.get_pd_state(self.port) 184*9c5db199SXin Li 185*9c5db199SXin Li def get_pd_role(self): 186*9c5db199SXin Li """Get the current PD power role (source or sink) 187*9c5db199SXin Li 188*9c5db199SXin Li @returns: current pd state 189*9c5db199SXin Li """ 190*9c5db199SXin Li return self.utils.get_pd_role(self.port) 191*9c5db199SXin Li 192*9c5db199SXin Li def is_pd_flag_set(self, key): 193*9c5db199SXin Li """Test a bit in PD protocol state flags 194*9c5db199SXin Li 195*9c5db199SXin Li The flag word contains various PD protocol state information. 196*9c5db199SXin Li This method allows for a specific flag to be tested. 197*9c5db199SXin Li 198*9c5db199SXin Li @param key: dict key to retrieve the flag bit mapping 199*9c5db199SXin Li 200*9c5db199SXin Li @returns True if the bit to be tested is set 201*9c5db199SXin Li """ 202*9c5db199SXin Li return self.utils.is_pd_flag_set(self.port, key) 203*9c5db199SXin Li 204*9c5db199SXin Li def is_src(self, state=None): 205*9c5db199SXin Li """Checks if the port is connected as a source. 206*9c5db199SXin Li 207*9c5db199SXin Li The "state" argument allows the caller to get_pd_state() once, and then 208*9c5db199SXin Li evaluate multiple conditions without re-getting the state. 209*9c5db199SXin Li 210*9c5db199SXin Li @param state: the state to check (None to get current state) 211*9c5db199SXin Li @returns True if connected as SRC, False otherwise 212*9c5db199SXin Li """ 213*9c5db199SXin Li return self.utils.is_src_connected(self.port, state) 214*9c5db199SXin Li 215*9c5db199SXin Li def is_snk(self, state=None): 216*9c5db199SXin Li """Checks if the port is connected as a sink 217*9c5db199SXin Li 218*9c5db199SXin Li The "state" argument allows the caller to get_pd_state() once, and then 219*9c5db199SXin Li evaluate multiple conditions without re-getting the state. 220*9c5db199SXin Li 221*9c5db199SXin Li @param state: the state to check (None to get current state) 222*9c5db199SXin Li @returns True if connected as SNK, False otherwise 223*9c5db199SXin Li """ 224*9c5db199SXin Li return self.utils.is_snk_connected(self.port, state) 225*9c5db199SXin Li 226*9c5db199SXin Li def is_connected(self, state=None): 227*9c5db199SXin Li """Checks if the port is connected 228*9c5db199SXin Li 229*9c5db199SXin Li The "state" argument allows the caller to get_pd_state() once, and then 230*9c5db199SXin Li evaluate multiple conditions without re-getting the state. 231*9c5db199SXin Li 232*9c5db199SXin Li @param state: the state to check (None to get current state) 233*9c5db199SXin Li @returns True if in a connected state, False otherwise 234*9c5db199SXin Li """ 235*9c5db199SXin Li return self.is_snk(state) or self.is_src(state) 236*9c5db199SXin Li 237*9c5db199SXin Li def is_disconnected(self, state=None): 238*9c5db199SXin Li """Checks if the port is disconnected 239*9c5db199SXin Li 240*9c5db199SXin Li @returns True if in a disconnected state, False otherwise 241*9c5db199SXin Li """ 242*9c5db199SXin Li return self.utils.is_disconnected(self.port, state) 243*9c5db199SXin Li 244*9c5db199SXin Li def __repr__(self): 245*9c5db199SXin Li """String representation of the object""" 246*9c5db199SXin Li return "<%s %r port %s>" % ( 247*9c5db199SXin Li self.__class__.__name__, self.console.name, self.port) 248*9c5db199SXin Li 249*9c5db199SXin Li def is_drp(self): 250*9c5db199SXin Li """Checks if dual role mode is supported 251*9c5db199SXin Li 252*9c5db199SXin Li @returns True if dual role mode is 'on', False otherwise 253*9c5db199SXin Li """ 254*9c5db199SXin Li return self.utils.is_pd_dual_role_enabled(self.port) 255*9c5db199SXin Li 256*9c5db199SXin Li def drp_disconnect_connect(self, disc_time_sec): 257*9c5db199SXin Li """Disconnect/reconnect using drp mode settings 258*9c5db199SXin Li 259*9c5db199SXin Li A PD console device doesn't have an explicit connect/disconnect 260*9c5db199SXin Li command. Instead, the dualrole mode setting is used to force 261*9c5db199SXin Li disconnects in devices which support this feature. To disconnect, 262*9c5db199SXin Li force the dualrole mode to be the opposite role of the current 263*9c5db199SXin Li connected state. 264*9c5db199SXin Li 265*9c5db199SXin Li @param disc_time_sec: time in seconds to wait to reconnect 266*9c5db199SXin Li 267*9c5db199SXin Li @returns True if device disconnects, then returns to a connected 268*9c5db199SXin Li state. False if either step fails. 269*9c5db199SXin Li """ 270*9c5db199SXin Li # Dualrole mode must be supported 271*9c5db199SXin Li if self.is_drp() is False: 272*9c5db199SXin Li logging.warning('Device not DRP capable, unabled to force disconnect') 273*9c5db199SXin Li return False 274*9c5db199SXin Li # Force state will be the opposite of current connect state 275*9c5db199SXin Li if self.is_src(): 276*9c5db199SXin Li drp_mode = 'snk' 277*9c5db199SXin Li swap_state = self.utils.get_snk_connect_states() 278*9c5db199SXin Li else: 279*9c5db199SXin Li drp_mode = 'src' 280*9c5db199SXin Li swap_state = self.utils.get_src_connect_states() 281*9c5db199SXin Li # Force disconnect 282*9c5db199SXin Li self.drp_set(drp_mode) 283*9c5db199SXin Li # Wait for disconnect time 284*9c5db199SXin Li time.sleep(disc_time_sec) 285*9c5db199SXin Li # Verify that the device is disconnected 286*9c5db199SXin Li disconnect = self.is_disconnected() 287*9c5db199SXin Li 288*9c5db199SXin Li # If the other device is dualrole, then forcing dualrole mode will 289*9c5db199SXin Li # only cause the disconnect to appear momentarily and reconnect 290*9c5db199SXin Li # in the power role forced by the drp_set() call. For this case, 291*9c5db199SXin Li # the role swap verifies that a disconnect/connect sequence occurred. 292*9c5db199SXin Li if disconnect == False: 293*9c5db199SXin Li time.sleep(self.utils.CONNECT_TIME) 294*9c5db199SXin Li # Connected, verify if power role swap has occurred 295*9c5db199SXin Li if self.utils.get_pd_state(self.port) in swap_state: 296*9c5db199SXin Li # Restore default dualrole mode 297*9c5db199SXin Li self.drp_set('on') 298*9c5db199SXin Li # Restore orignal power role 299*9c5db199SXin Li connect = self.pr_swap() 300*9c5db199SXin Li if connect == False: 301*9c5db199SXin Li logging.warning('DRP on both devices, 2nd power swap failed') 302*9c5db199SXin Li return connect 303*9c5db199SXin Li 304*9c5db199SXin Li # Restore default dualrole mode 305*9c5db199SXin Li self.drp_set('on') 306*9c5db199SXin Li # Allow enough time for protocol state machine 307*9c5db199SXin Li time.sleep(self.utils.CONNECT_TIME) 308*9c5db199SXin Li # Check if connected 309*9c5db199SXin Li connect = self.is_connected() 310*9c5db199SXin Li logging.info('Disconnect = %r, Connect = %r', disconnect, connect) 311*9c5db199SXin Li return bool(disconnect and connect) 312*9c5db199SXin Li 313*9c5db199SXin Li def drp_set(self, mode): 314*9c5db199SXin Li """Sets dualrole mode 315*9c5db199SXin Li 316*9c5db199SXin Li @param mode: desired dual role setting (on, off, snk, src) 317*9c5db199SXin Li 318*9c5db199SXin Li @returns True is set was successful, False otherwise 319*9c5db199SXin Li """ 320*9c5db199SXin Li # Set desired dualrole mode 321*9c5db199SXin Li self.utils.set_pd_dualrole(self.port, mode) 322*9c5db199SXin Li # Get current setting 323*9c5db199SXin Li current = self.utils.get_pd_dualrole(self.port) 324*9c5db199SXin Li # Verify that setting is correct 325*9c5db199SXin Li return bool(mode == current) 326*9c5db199SXin Li 327*9c5db199SXin Li def drp_get(self): 328*9c5db199SXin Li """Gets dualrole mode 329*9c5db199SXin Li 330*9c5db199SXin Li @returns one of the modes (on, off, snk, src) 331*9c5db199SXin Li """ 332*9c5db199SXin Li return self.utils.get_pd_dualrole(self.port) 333*9c5db199SXin Li 334*9c5db199SXin Li def try_src(self, enable): 335*9c5db199SXin Li """Enables/Disables Try.SRC PD protocol setting 336*9c5db199SXin Li 337*9c5db199SXin Li @param enable: True to enable, False to disable 338*9c5db199SXin Li 339*9c5db199SXin Li @returns True is setting was successful, False if feature not 340*9c5db199SXin Li supported by the device, or not set as desired. 341*9c5db199SXin Li """ 342*9c5db199SXin Li # Create Try.SRC pd command 343*9c5db199SXin Li cmd = 'pd trysrc %d' % int(enable) 344*9c5db199SXin Li # TCPMv1 indicates Try.SRC is on by returning 'on' 345*9c5db199SXin Li # TCPMv2 indicates Try.SRC is on by returning 'Forced ON' 346*9c5db199SXin Li on_vals = ('on', 'Forced ON') 347*9c5db199SXin Li # TCPMv1 indicates Try.SRC is off by returning 'off' 348*9c5db199SXin Li # TCPMv2 indicates Try.SRC is off by returning 'Forced OFF' 349*9c5db199SXin Li off_vals = ('off', 'Forced OFF') 350*9c5db199SXin Li 351*9c5db199SXin Li # Try.SRC on/off is output, if supported feature 352*9c5db199SXin Li regex = ['Try\.SRC\s(%s)|(Parameter)' % ('|'.join(on_vals + off_vals))] 353*9c5db199SXin Li m = self.utils.send_pd_command_get_output(cmd, regex) 354*9c5db199SXin Li 355*9c5db199SXin Li # Determine if Try.SRC feature is supported 356*9c5db199SXin Li if 'Try.SRC' not in m[0][0]: 357*9c5db199SXin Li logging.warning('Try.SRC not supported on this PD device') 358*9c5db199SXin Li return False 359*9c5db199SXin Li 360*9c5db199SXin Li # TrySRC is supported on this PD device, verify setting. 361*9c5db199SXin Li trysrc_val = m[0][1] 362*9c5db199SXin Li logging.info('Try.SRC mode = %s', trysrc_val) 363*9c5db199SXin Li if enable: 364*9c5db199SXin Li vals = on_vals 365*9c5db199SXin Li else: 366*9c5db199SXin Li vals = off_vals 367*9c5db199SXin Li 368*9c5db199SXin Li return trysrc_val in vals 369*9c5db199SXin Li 370*9c5db199SXin Li def soft_reset(self): 371*9c5db199SXin Li """Initates a PD soft reset sequence 372*9c5db199SXin Li 373*9c5db199SXin Li To verify that a soft reset sequence was initiated, the 374*9c5db199SXin Li reply message is checked to verify that the reset command 375*9c5db199SXin Li was acknowledged by its port pair. The connect state should 376*9c5db199SXin Li be same as it was prior to issuing the reset command. 377*9c5db199SXin Li 378*9c5db199SXin Li @returns True if the port pair acknowledges the the reset message 379*9c5db199SXin Li and if following the command, the device returns to the same 380*9c5db199SXin Li connected state. False otherwise. 381*9c5db199SXin Li """ 382*9c5db199SXin Li RESET_DELAY = 0.5 383*9c5db199SXin Li cmd = 'pd %d soft' % self.port 384*9c5db199SXin Li state_before = self.utils.get_pd_state(self.port) 385*9c5db199SXin Li reply = self.utils.send_pd_command_get_reply_msg(cmd) 386*9c5db199SXin Li if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']: 387*9c5db199SXin Li return False 388*9c5db199SXin Li time.sleep(RESET_DELAY) 389*9c5db199SXin Li state_after = self.utils.get_pd_state(self.port) 390*9c5db199SXin Li return state_before == state_after 391*9c5db199SXin Li 392*9c5db199SXin Li def hard_reset(self): 393*9c5db199SXin Li """Initates a PD hard reset sequence 394*9c5db199SXin Li 395*9c5db199SXin Li To verify that a hard reset sequence was initiated, the 396*9c5db199SXin Li console ouput is scanned for HARD RST TX. In addition, the connect 397*9c5db199SXin Li state should be same as it was prior to issuing the reset command. 398*9c5db199SXin Li 399*9c5db199SXin Li @returns True if the port pair acknowledges that hard reset was 400*9c5db199SXin Li initiated and if following the command, the device returns to the same 401*9c5db199SXin Li connected state. False otherwise. 402*9c5db199SXin Li """ 403*9c5db199SXin Li RESET_DELAY = 1.0 404*9c5db199SXin Li cmd = 'pd %d hard' % self.port 405*9c5db199SXin Li state_before = self.utils.get_pd_state(self.port) 406*9c5db199SXin Li self.utils.enable_pd_console_debug() 407*9c5db199SXin Li try: 408*9c5db199SXin Li tcpmv1_pattern = '.*(HARD\sRST\sTX)' 409*9c5db199SXin Li tcpmv2_pattern = '.*(PE_SNK_Hard_Reset)|.*(PE_SRC_Hard_Reset)' 410*9c5db199SXin Li pattern = '|'.join((tcpmv1_pattern, tcpmv2_pattern)) 411*9c5db199SXin Li self.utils.send_pd_command_get_output(cmd, [pattern]) 412*9c5db199SXin Li except error.TestFail: 413*9c5db199SXin Li logging.warning('HARD RST TX not found') 414*9c5db199SXin Li return False 415*9c5db199SXin Li finally: 416*9c5db199SXin Li self.utils.disable_pd_console_debug() 417*9c5db199SXin Li 418*9c5db199SXin Li time.sleep(RESET_DELAY) 419*9c5db199SXin Li state_after = self.utils.get_pd_state(self.port) 420*9c5db199SXin Li return state_before == state_after 421*9c5db199SXin Li 422*9c5db199SXin Li def pr_swap(self): 423*9c5db199SXin Li """Attempts a power role swap 424*9c5db199SXin Li 425*9c5db199SXin Li In order to attempt a power role swap the device must be 426*9c5db199SXin Li connected and support dualrole mode. Once these two criteria 427*9c5db199SXin Li are checked a power role command is issued. Following a delay 428*9c5db199SXin Li to allow for a reconnection the new power role is checked 429*9c5db199SXin Li against the power role prior to issuing the command. 430*9c5db199SXin Li 431*9c5db199SXin Li @returns True if the device has swapped power roles, False otherwise. 432*9c5db199SXin Li """ 433*9c5db199SXin Li # Get starting state 434*9c5db199SXin Li if not self.is_drp(): 435*9c5db199SXin Li logging.warning('Dualrole Mode not enabled!') 436*9c5db199SXin Li return False 437*9c5db199SXin Li if self.is_connected() == False: 438*9c5db199SXin Li logging.warning('PD contract not established!') 439*9c5db199SXin Li return False 440*9c5db199SXin Li current_pr = self.utils.get_pd_state(self.port) 441*9c5db199SXin Li swap_cmd = 'pd %d swap power' % self.port 442*9c5db199SXin Li self.utils.send_pd_command(swap_cmd) 443*9c5db199SXin Li time.sleep(self.utils.CONNECT_TIME) 444*9c5db199SXin Li new_pr = self.utils.get_pd_state(self.port) 445*9c5db199SXin Li logging.info('Power swap: %s -> %s', current_pr, new_pr) 446*9c5db199SXin Li if self.is_connected() == False: 447*9c5db199SXin Li logging.warning('Device not connected following PR swap attempt.') 448*9c5db199SXin Li return False 449*9c5db199SXin Li return current_pr != new_pr 450*9c5db199SXin Li 451*9c5db199SXin Li 452*9c5db199SXin Liclass PDTesterDevice(PDConsoleDevice): 453*9c5db199SXin Li """Class for PDTester devices 454*9c5db199SXin Li 455*9c5db199SXin Li This class contains methods for PD funtions which are unique to the 456*9c5db199SXin Li PDTester board, e.g. Plankton or Servo v4. It inherits all the methods 457*9c5db199SXin Li for PD console devices. 458*9c5db199SXin Li """ 459*9c5db199SXin Li 460*9c5db199SXin Li def __init__(self, console, port, utils): 461*9c5db199SXin Li """Initialization method 462*9c5db199SXin Li 463*9c5db199SXin Li @param console: UART console for this device 464*9c5db199SXin Li @param port: USB PD port number 465*9c5db199SXin Li """ 466*9c5db199SXin Li # Instantiate the PD console object 467*9c5db199SXin Li super(PDTesterDevice, self).__init__(console, port, utils) 468*9c5db199SXin Li # Indicate this is PDTester device 469*9c5db199SXin Li self.is_pdtester = True 470*9c5db199SXin Li 471*9c5db199SXin Li def _toggle_pdtester_drp(self): 472*9c5db199SXin Li """Issue 'usbc_action drp' PDTester command 473*9c5db199SXin Li 474*9c5db199SXin Li @returns value of drp_enable in PDTester FW 475*9c5db199SXin Li """ 476*9c5db199SXin Li drp_cmd = 'usbc_action drp' 477*9c5db199SXin Li drp_re = ['DRP\s=\s(\d)'] 478*9c5db199SXin Li # Send DRP toggle command to PDTester and get value of 'drp_enable' 479*9c5db199SXin Li m = self.utils.send_pd_command_get_output(drp_cmd, drp_re) 480*9c5db199SXin Li return int(m[0][1]) 481*9c5db199SXin Li 482*9c5db199SXin Li def _enable_pdtester_drp(self): 483*9c5db199SXin Li """Enable DRP mode on PDTester 484*9c5db199SXin Li 485*9c5db199SXin Li DRP mode can only be toggled and is not able to be explicitly 486*9c5db199SXin Li enabled/disabled via the console. Therefore, this method will 487*9c5db199SXin Li toggle DRP mode until the console reply indicates that this 488*9c5db199SXin Li mode is enabled. The toggle happens a maximum of two times 489*9c5db199SXin Li in case this is called when it's already enabled. 490*9c5db199SXin Li 491*9c5db199SXin Li @returns True when DRP mode is enabled, False if not successful 492*9c5db199SXin Li """ 493*9c5db199SXin Li for attempt in range(2): 494*9c5db199SXin Li if self._toggle_pdtester_drp() == True: 495*9c5db199SXin Li logging.info('PDTester DRP mode enabled') 496*9c5db199SXin Li return True 497*9c5db199SXin Li logging.error('PDTester DRP mode set failure') 498*9c5db199SXin Li return False 499*9c5db199SXin Li 500*9c5db199SXin Li def _verify_state_sequence(self, states_list, console_log): 501*9c5db199SXin Li """Compare PD state transitions to expected values 502*9c5db199SXin Li 503*9c5db199SXin Li @param states_list: list of expected PD state transitions 504*9c5db199SXin Li @param console_log: console output which contains state names 505*9c5db199SXin Li 506*9c5db199SXin Li @returns True if the sequence matches, False otherwise 507*9c5db199SXin Li """ 508*9c5db199SXin Li # For each state in the expected state transiton table, build 509*9c5db199SXin Li # the regexp and search for it in the state transition log. 510*9c5db199SXin Li for state in states_list: 511*9c5db199SXin Li state_regx = r'C{0}\s+[\w]+:?\s({1})'.format(self.port, 512*9c5db199SXin Li state) 513*9c5db199SXin Li if re.search(state_regx, console_log) is None: 514*9c5db199SXin Li return False 515*9c5db199SXin Li return True 516*9c5db199SXin Li 517*9c5db199SXin Li def cc_disconnect_connect(self, disc_time_sec): 518*9c5db199SXin Li """Disconnect/reconnect using PDTester 519*9c5db199SXin Li 520*9c5db199SXin Li PDTester supports a feature which simulates a USB Type C disconnect 521*9c5db199SXin Li and reconnect. 522*9c5db199SXin Li 523*9c5db199SXin Li @param disc_time_sec: Time in seconds for disconnect period. 524*9c5db199SXin Li """ 525*9c5db199SXin Li DISC_DELAY = 100 526*9c5db199SXin Li disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, 527*9c5db199SXin Li disc_time_sec * 1000) 528*9c5db199SXin Li self.utils.send_pd_command(disc_cmd) 529*9c5db199SXin Li 530*9c5db199SXin Li def get_connected_state_after_cc_reconnect(self, disc_time_sec): 531*9c5db199SXin Li """Get the connected state after disconnect/reconnect using PDTester 532*9c5db199SXin Li 533*9c5db199SXin Li PDTester supports a feature which simulates a USB Type C disconnect 534*9c5db199SXin Li and reconnect. It returns the first connected state (either source or 535*9c5db199SXin Li sink) after reconnect. 536*9c5db199SXin Li 537*9c5db199SXin Li @param disc_time_sec: Time in seconds for disconnect period. 538*9c5db199SXin Li @returns: The connected PD state. 539*9c5db199SXin Li """ 540*9c5db199SXin Li DISC_DELAY = 100 541*9c5db199SXin Li disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, disc_time_sec * 1000) 542*9c5db199SXin Li state_exp = '(C%d)\s+[\w]+:?\s(%s)' 543*9c5db199SXin Li 544*9c5db199SXin Li disconnected_tuple = self.utils.get_disconnected_states() 545*9c5db199SXin Li disconnected_states = '|'.join(disconnected_tuple) 546*9c5db199SXin Li disconnected_exp = state_exp % (self.port, disconnected_states) 547*9c5db199SXin Li 548*9c5db199SXin Li src_connected_tuple = self.utils.get_src_connect_states() 549*9c5db199SXin Li snk_connected_tuple = self.utils.get_snk_connect_states() 550*9c5db199SXin Li connected_states = '|'.join(src_connected_tuple + snk_connected_tuple) 551*9c5db199SXin Li connected_exp = state_exp % (self.port, connected_states) 552*9c5db199SXin Li 553*9c5db199SXin Li m = self.utils.send_pd_command_get_output(disc_cmd, [disconnected_exp, 554*9c5db199SXin Li connected_exp]) 555*9c5db199SXin Li return m[1][2] 556*9c5db199SXin Li 557*9c5db199SXin Li def drp_disconnect_connect(self, disc_time_sec): 558*9c5db199SXin Li """Disconnect/reconnect using PDTester 559*9c5db199SXin Li 560*9c5db199SXin Li Utilize PDTester disconnect/connect utility and verify 561*9c5db199SXin Li that both disconnect and reconnect actions were successful. 562*9c5db199SXin Li 563*9c5db199SXin Li @param disc_time_sec: Time in seconds for disconnect period. 564*9c5db199SXin Li 565*9c5db199SXin Li @returns True if device disconnects, then returns to a connected 566*9c5db199SXin Li state. False if either step fails. 567*9c5db199SXin Li """ 568*9c5db199SXin Li self.cc_disconnect_connect(disc_time_sec) 569*9c5db199SXin Li time.sleep(disc_time_sec / 2) 570*9c5db199SXin Li disconnect = self.is_disconnected() 571*9c5db199SXin Li time.sleep(disc_time_sec / 2 + self.utils.CONNECT_TIME) 572*9c5db199SXin Li connect = self.is_connected() 573*9c5db199SXin Li return disconnect and connect 574*9c5db199SXin Li 575*9c5db199SXin Li def drp_set(self, mode): 576*9c5db199SXin Li """Sets dualrole mode 577*9c5db199SXin Li 578*9c5db199SXin Li @param mode: desired dual role setting (on, off, snk, src) 579*9c5db199SXin Li 580*9c5db199SXin Li @returns True if dualrole mode matches the requested value or 581*9c5db199SXin Li is successfully set to that value. False, otherwise. 582*9c5db199SXin Li """ 583*9c5db199SXin Li # Get current value of dualrole 584*9c5db199SXin Li drp = self.utils.get_pd_dualrole(self.port) 585*9c5db199SXin Li if drp == mode: 586*9c5db199SXin Li return True 587*9c5db199SXin Li 588*9c5db199SXin Li if mode == 'on': 589*9c5db199SXin Li # Setting dpr_enable on PDTester will set dualrole mode to on 590*9c5db199SXin Li return self._enable_pdtester_drp() 591*9c5db199SXin Li else: 592*9c5db199SXin Li # If desired setting is other than 'on', need to ensure that 593*9c5db199SXin Li # drp mode on PDTester is disabled. 594*9c5db199SXin Li if drp == 'on': 595*9c5db199SXin Li # This will turn off drp_enable flag and set dualmode to 'off' 596*9c5db199SXin Li return self._toggle_pdtester_drp() 597*9c5db199SXin Li # With drp_enable flag off, can set to desired setting 598*9c5db199SXin Li return self.utils.set_pd_dualrole(self.port, mode) 599*9c5db199SXin Li 600*9c5db199SXin Li def _reset(self, cmd, states_list): 601*9c5db199SXin Li """Initates a PD reset sequence 602*9c5db199SXin Li 603*9c5db199SXin Li PDTester device has state names available on the console. When 604*9c5db199SXin Li a soft reset is issued the console log is extracted and then 605*9c5db199SXin Li compared against the expected state transisitons. 606*9c5db199SXin Li 607*9c5db199SXin Li @param cmd: reset type (soft or hard) 608*9c5db199SXin Li @param states_list: list of expected PD state transitions 609*9c5db199SXin Li 610*9c5db199SXin Li @returns True if state transitions match, False otherwise 611*9c5db199SXin Li """ 612*9c5db199SXin Li # Want to grab all output until either SRC_READY or SNK_READY 613*9c5db199SXin Li reply_exp = ['(.*)(C%d)\s+[\w]+:?\s([\w]+_READY)' % self.port] 614*9c5db199SXin Li m = self.utils.send_pd_command_get_output(cmd, reply_exp) 615*9c5db199SXin Li return self._verify_state_sequence(states_list, m[0][0]) 616*9c5db199SXin Li 617*9c5db199SXin Li def soft_reset(self): 618*9c5db199SXin Li """Initates a PD soft reset sequence 619*9c5db199SXin Li 620*9c5db199SXin Li @returns True if state transitions match, False otherwise 621*9c5db199SXin Li """ 622*9c5db199SXin Li snk_reset_states = [ 623*9c5db199SXin Li 'SOFT_RESET', 624*9c5db199SXin Li 'SNK_DISCOVERY', 625*9c5db199SXin Li 'SNK_REQUESTED', 626*9c5db199SXin Li 'SNK_TRANSITION', 627*9c5db199SXin Li 'SNK_READY' 628*9c5db199SXin Li ] 629*9c5db199SXin Li 630*9c5db199SXin Li src_reset_states = [ 631*9c5db199SXin Li 'SOFT_RESET', 632*9c5db199SXin Li 'SRC_DISCOVERY', 633*9c5db199SXin Li 'SRC_NEGOCIATE', 634*9c5db199SXin Li 'SRC_ACCEPTED', 635*9c5db199SXin Li 'SRC_POWERED', 636*9c5db199SXin Li 'SRC_TRANSITION', 637*9c5db199SXin Li 'SRC_READY' 638*9c5db199SXin Li ] 639*9c5db199SXin Li 640*9c5db199SXin Li if self.is_src(): 641*9c5db199SXin Li states_list = src_reset_states 642*9c5db199SXin Li elif self.is_snk(): 643*9c5db199SXin Li states_list = snk_reset_states 644*9c5db199SXin Li else: 645*9c5db199SXin Li raise error.TestFail('Port Pair not in a connected state') 646*9c5db199SXin Li 647*9c5db199SXin Li cmd = 'pd %d soft' % self.port 648*9c5db199SXin Li return self._reset(cmd, states_list) 649*9c5db199SXin Li 650*9c5db199SXin Li def hard_reset(self): 651*9c5db199SXin Li """Initates a PD hard reset sequence 652*9c5db199SXin Li 653*9c5db199SXin Li @returns True if state transitions match, False otherwise 654*9c5db199SXin Li """ 655*9c5db199SXin Li snk_reset_states = [ 656*9c5db199SXin Li 'HARD_RESET_SEND', 657*9c5db199SXin Li 'HARD_RESET_EXECUTE', 658*9c5db199SXin Li 'SNK_HARD_RESET_RECOVER', 659*9c5db199SXin Li 'SNK_DISCOVERY', 660*9c5db199SXin Li 'SNK_REQUESTED', 661*9c5db199SXin Li 'SNK_TRANSITION', 662*9c5db199SXin Li 'SNK_READY' 663*9c5db199SXin Li ] 664*9c5db199SXin Li 665*9c5db199SXin Li src_reset_states = [ 666*9c5db199SXin Li 'HARD_RESET_SEND', 667*9c5db199SXin Li 'HARD_RESET_EXECUTE', 668*9c5db199SXin Li 'SRC_HARD_RESET_RECOVER', 669*9c5db199SXin Li 'SRC_DISCOVERY', 670*9c5db199SXin Li 'SRC_NEGOCIATE', 671*9c5db199SXin Li 'SRC_ACCEPTED', 672*9c5db199SXin Li 'SRC_POWERED', 673*9c5db199SXin Li 'SRC_TRANSITION', 674*9c5db199SXin Li 'SRC_READY' 675*9c5db199SXin Li ] 676*9c5db199SXin Li 677*9c5db199SXin Li if self.is_src(): 678*9c5db199SXin Li states_list = src_reset_states 679*9c5db199SXin Li elif self.is_snk(): 680*9c5db199SXin Li states_list = snk_reset_states 681*9c5db199SXin Li else: 682*9c5db199SXin Li raise error.TestFail('Port Pair not in a connected state') 683*9c5db199SXin Li 684*9c5db199SXin Li cmd = 'pd %d hard' % self.port 685*9c5db199SXin Li return self._reset(cmd, states_list) 686*9c5db199SXin Li 687*9c5db199SXin Li 688*9c5db199SXin Liclass PDPortPartner(object): 689*9c5db199SXin Li """Methods used to instantiate PD device objects 690*9c5db199SXin Li 691*9c5db199SXin Li This class is initalized with a list of servo consoles. It 692*9c5db199SXin Li contains methods to determine if USB PD devices are accessible 693*9c5db199SXin Li via the consoles and attempts to determine USB PD port partners. 694*9c5db199SXin Li A PD device is USB PD port specific, a single console may access 695*9c5db199SXin Li multiple PD devices. 696*9c5db199SXin Li 697*9c5db199SXin Li """ 698*9c5db199SXin Li 699*9c5db199SXin Li def __init__(self, consoles): 700*9c5db199SXin Li """Initialization method 701*9c5db199SXin Li 702*9c5db199SXin Li @param consoles: list of servo consoles 703*9c5db199SXin Li """ 704*9c5db199SXin Li self.consoles = consoles 705*9c5db199SXin Li 706*9c5db199SXin Li def __repr__(self): 707*9c5db199SXin Li """String representation of the object""" 708*9c5db199SXin Li return "<%s %r>" % (self.__class__.__name__, self.consoles) 709*9c5db199SXin Li 710*9c5db199SXin Li def _send_pd_state(self, port, console): 711*9c5db199SXin Li """Tests if PD device exists on a given port number 712*9c5db199SXin Li 713*9c5db199SXin Li @param port: USB PD port number to try 714*9c5db199SXin Li @param console: servo UART console 715*9c5db199SXin Li 716*9c5db199SXin Li @returns True if 'pd <port> state' command gives a valid 717*9c5db199SXin Li response, False otherwise 718*9c5db199SXin Li """ 719*9c5db199SXin Li cmd = 'pd %d state' % port 720*9c5db199SXin Li regex = r'(Port C\d)|(Parameter)' 721*9c5db199SXin Li m = console.send_command_get_output(cmd, [regex]) 722*9c5db199SXin Li # If PD port exists, then output will be Port C0 or C1 723*9c5db199SXin Li regex = r'Port C{0}'.format(port) 724*9c5db199SXin Li if re.search(regex, m[0][0]): 725*9c5db199SXin Li return True 726*9c5db199SXin Li return False 727*9c5db199SXin Li 728*9c5db199SXin Li def _find_num_pd_ports(self, console): 729*9c5db199SXin Li """Determine number of PD ports for a given console 730*9c5db199SXin Li 731*9c5db199SXin Li @param console: uart console accssed via servo 732*9c5db199SXin Li 733*9c5db199SXin Li @returns: number of PD ports accessible via console 734*9c5db199SXin Li """ 735*9c5db199SXin Li MAX_PORTS = 2 736*9c5db199SXin Li num_ports = 0 737*9c5db199SXin Li for port in range(MAX_PORTS): 738*9c5db199SXin Li if self._send_pd_state(port, console): 739*9c5db199SXin Li num_ports += 1 740*9c5db199SXin Li return num_ports 741*9c5db199SXin Li 742*9c5db199SXin Li def _is_pd_console(self, console): 743*9c5db199SXin Li """Check if pd option exists in console 744*9c5db199SXin Li 745*9c5db199SXin Li @param console: uart console accssed via servo 746*9c5db199SXin Li 747*9c5db199SXin Li @returns: True if 'pd' is found, False otherwise 748*9c5db199SXin Li """ 749*9c5db199SXin Li try: 750*9c5db199SXin Li m = console.send_command_get_output('help', [r'(pd)\s+']) 751*9c5db199SXin Li return True 752*9c5db199SXin Li except error.TestFail: 753*9c5db199SXin Li return False 754*9c5db199SXin Li 755*9c5db199SXin Li def _is_pdtester_console(self, console): 756*9c5db199SXin Li """Check for PDTester console 757*9c5db199SXin Li 758*9c5db199SXin Li This method looks for a console command option 'usbc_action' which 759*9c5db199SXin Li is unique to PDTester PD devices. 760*9c5db199SXin Li 761*9c5db199SXin Li @param console: uart console accssed via servo 762*9c5db199SXin Li 763*9c5db199SXin Li @returns True if usbc_action command is present, False otherwise 764*9c5db199SXin Li """ 765*9c5db199SXin Li try: 766*9c5db199SXin Li m = console.send_command_get_output('help', [r'(usbc_action)']) 767*9c5db199SXin Li return True 768*9c5db199SXin Li except error.TestFail: 769*9c5db199SXin Li return False 770*9c5db199SXin Li 771*9c5db199SXin Li def _check_port_pair(self, port1, port2): 772*9c5db199SXin Li """Check if two PD devices could be connected 773*9c5db199SXin Li 774*9c5db199SXin Li If two USB PD devices are connected, then they should be in 775*9c5db199SXin Li either the SRC_READY or SNK_READY states and have opposite 776*9c5db199SXin Li power roles. In addition, they must be on different servo 777*9c5db199SXin Li consoles. 778*9c5db199SXin Li 779*9c5db199SXin Li @param: list of two possible PD port parters 780*9c5db199SXin Li 781*9c5db199SXin Li @returns True if not the same console and both PD devices 782*9c5db199SXin Li are a plausible pair based only on their PD states. 783*9c5db199SXin Li """ 784*9c5db199SXin Li # Don't test if on the same servo console 785*9c5db199SXin Li if port1.console == port2.console: 786*9c5db199SXin Li logging.info("PD Devices are on same platform -> can't be a pair") 787*9c5db199SXin Li return False 788*9c5db199SXin Li 789*9c5db199SXin Li state1 = port1.get_pd_state() 790*9c5db199SXin Li port1_is_snk = port1.is_snk(state1) 791*9c5db199SXin Li port1_is_src = port1.is_src(state1) 792*9c5db199SXin Li 793*9c5db199SXin Li state2 = port2.get_pd_state() 794*9c5db199SXin Li port2_is_snk = port2.is_snk(state2) 795*9c5db199SXin Li port2_is_src = port2.is_src(state2) 796*9c5db199SXin Li 797*9c5db199SXin Li # Must be SRC <--> SNK or SNK <--> SRC 798*9c5db199SXin Li if (port1_is_src and port2_is_snk) or (port1_is_snk and port2_is_src): 799*9c5db199SXin Li logging.debug("SRC+SNK pair: %s (%s) <--> (%s) %s", 800*9c5db199SXin Li port1, state1, state2, port2) 801*9c5db199SXin Li return True 802*9c5db199SXin Li else: 803*9c5db199SXin Li logging.debug("Not a SRC+SNK pair: %s (%s) <--> (%s) %s", 804*9c5db199SXin Li port1, state1, state2, port2) 805*9c5db199SXin Li return False 806*9c5db199SXin Li 807*9c5db199SXin Li def _verify_pdtester_connection(self, tester_port, dut_port): 808*9c5db199SXin Li """Verify DUT to PDTester PD connection 809*9c5db199SXin Li 810*9c5db199SXin Li This method checks for a PDTester PD connection for the 811*9c5db199SXin Li given port by first verifying if a PD connection is present. 812*9c5db199SXin Li If found, then it uses a PDTester feature to force a PD disconnect. 813*9c5db199SXin Li If the port is no longer in the connected state, and following 814*9c5db199SXin Li a delay, is found to be back in the connected state, then 815*9c5db199SXin Li a DUT pd to PDTester connection is verified. 816*9c5db199SXin Li 817*9c5db199SXin Li @param dev_pair: list of two PD devices 818*9c5db199SXin Li 819*9c5db199SXin Li @returns True if DUT to PDTester pd connection is verified 820*9c5db199SXin Li """ 821*9c5db199SXin Li DISC_CHECK_TIME = 10 822*9c5db199SXin Li DISC_WAIT_TIME = 20 823*9c5db199SXin Li CONNECT_TIME = 4 824*9c5db199SXin Li 825*9c5db199SXin Li logging.info("Check: %s <--> %s", tester_port, dut_port) 826*9c5db199SXin Li 827*9c5db199SXin Li if not self._check_port_pair(tester_port, dut_port): 828*9c5db199SXin Li return False 829*9c5db199SXin Li 830*9c5db199SXin Li # Force PD disconnect 831*9c5db199SXin Li logging.debug('Disconnecting to check if devices are partners') 832*9c5db199SXin Li tester_port.cc_disconnect_connect(DISC_WAIT_TIME) 833*9c5db199SXin Li time.sleep(DISC_CHECK_TIME) 834*9c5db199SXin Li 835*9c5db199SXin Li # Verify that both devices are now disconnected 836*9c5db199SXin Li tester_state = tester_port.get_pd_state() 837*9c5db199SXin Li dut_state = dut_port.get_pd_state() 838*9c5db199SXin Li logging.debug("Recheck: %s (%s) <--> (%s) %s", 839*9c5db199SXin Li tester_port, tester_state, dut_state, dut_port) 840*9c5db199SXin Li 841*9c5db199SXin Li if not (tester_port.is_disconnected(tester_state) and 842*9c5db199SXin Li dut_port.is_disconnected(dut_state)): 843*9c5db199SXin Li logging.info("Ports did not disconnect at the same time, so" 844*9c5db199SXin Li " they aren't considered a pair.") 845*9c5db199SXin Li # Delay to allow non-pair devices to reconnect 846*9c5db199SXin Li time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME) 847*9c5db199SXin Li return False 848*9c5db199SXin Li 849*9c5db199SXin Li logging.debug('Pair disconnected. Waiting for reconnect...') 850*9c5db199SXin Li 851*9c5db199SXin Li # Allow enough time for reconnection 852*9c5db199SXin Li time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME) 853*9c5db199SXin Li if self._check_port_pair(tester_port, dut_port): 854*9c5db199SXin Li # Have verified a pd disconnect/reconnect sequence 855*9c5db199SXin Li logging.info('PDTester <--> DUT pair found') 856*9c5db199SXin Li return True 857*9c5db199SXin Li 858*9c5db199SXin Li logging.info("Ports did not reconnect at the same time, so" 859*9c5db199SXin Li " they aren't considered a pair.") 860*9c5db199SXin Li return False 861*9c5db199SXin Li 862*9c5db199SXin Li def identify_pd_devices(self): 863*9c5db199SXin Li """Instantiate PD devices present in test setup 864*9c5db199SXin Li 865*9c5db199SXin Li @return: list of 2 PD devices if a DUT <-> PDTester found. 866*9c5db199SXin Li If not found, then returns an empty list. 867*9c5db199SXin Li """ 868*9c5db199SXin Li tester_devports = [] 869*9c5db199SXin Li dut_devports = [] 870*9c5db199SXin Li 871*9c5db199SXin Li # For each possible uart console, check to see if a PD console 872*9c5db199SXin Li # is present and determine the number of PD ports. 873*9c5db199SXin Li for console in self.consoles: 874*9c5db199SXin Li if self._is_pd_console(console): 875*9c5db199SXin Li is_tester = self._is_pdtester_console(console) 876*9c5db199SXin Li num_ports = self._find_num_pd_ports(console) 877*9c5db199SXin Li # For each PD port that can be accessed via the console, 878*9c5db199SXin Li # instantiate either PDConsole or PDTester device. 879*9c5db199SXin Li for port in range(num_ports): 880*9c5db199SXin Li if is_tester: 881*9c5db199SXin Li logging.info('PDTesterDevice on %s port %d', 882*9c5db199SXin Li console.name, port) 883*9c5db199SXin Li tester_utils = pd_console.create_pd_console_utils( 884*9c5db199SXin Li console) 885*9c5db199SXin Li tester_devports.append(PDTesterDevice(console, 886*9c5db199SXin Li port, tester_utils)) 887*9c5db199SXin Li else: 888*9c5db199SXin Li logging.info('PDConsoleDevice on %s port %d', 889*9c5db199SXin Li console.name, port) 890*9c5db199SXin Li dut_utils = pd_console.create_pd_console_utils(console) 891*9c5db199SXin Li dut_devports.append(PDConsoleDevice(console, 892*9c5db199SXin Li port, dut_utils)) 893*9c5db199SXin Li 894*9c5db199SXin Li if not tester_devports: 895*9c5db199SXin Li logging.error('The specified consoles did not include any' 896*9c5db199SXin Li ' PD testers: %s', self.consoles) 897*9c5db199SXin Li 898*9c5db199SXin Li if not dut_devports: 899*9c5db199SXin Li logging.error('The specified consoles did not contain any' 900*9c5db199SXin Li ' DUTs: %s', self.consoles) 901*9c5db199SXin Li 902*9c5db199SXin Li # Determine PD port partners in the list of PD devices. Note that 903*9c5db199SXin Li # there can be PD devices which are not accessible via a uart console, 904*9c5db199SXin Li # but are connected to a PD port which is accessible. 905*9c5db199SXin Li for tester in reversed(tester_devports): 906*9c5db199SXin Li for dut in dut_devports: 907*9c5db199SXin Li if tester.console == dut.console: 908*9c5db199SXin Li # PD Devices are on same servo console -> can't be a pair 909*9c5db199SXin Li continue 910*9c5db199SXin Li if self._verify_pdtester_connection(tester, dut): 911*9c5db199SXin Li dut_devports.remove(dut) 912*9c5db199SXin Li return [tester, dut] 913*9c5db199SXin Li 914*9c5db199SXin Li return [] 915