xref: /aosp_15_r20/external/autotest/server/cros/servo/pd_console.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright 2015 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Lifrom __future__ import absolute_import
7*9c5db199SXin Lifrom __future__ import division
8*9c5db199SXin Lifrom __future__ import print_function
9*9c5db199SXin Li
10*9c5db199SXin Liimport re
11*9c5db199SXin Liimport logging
12*9c5db199SXin Liimport six
13*9c5db199SXin Lifrom six.moves import range
14*9c5db199SXin Liimport time
15*9c5db199SXin Li
16*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
17*9c5db199SXin Li
18*9c5db199SXin Li
19*9c5db199SXin Liclass PDConsoleUtils(object):
20*9c5db199SXin Li    """Base clase for all PD console utils
21*9c5db199SXin Li
22*9c5db199SXin Li    This class provides a set of APIs for expected Type C PD required actions
23*9c5db199SXin Li    in TypeC FAFT tests. The base class is specific for Type C console access.
24*9c5db199SXin Li
25*9c5db199SXin Li    """
26*9c5db199SXin Li    def __init__(self, console):
27*9c5db199SXin Li        """Console can be either usbpd, ec, or pdtester UART
28*9c5db199SXin Li
29*9c5db199SXin Li        This object with then be used by the class which creates
30*9c5db199SXin Li        the PDConsoleUtils class to send/receive commands to UART
31*9c5db199SXin Li        """
32*9c5db199SXin Li        # save console for UART access functions
33*9c5db199SXin Li        self.console = console
34*9c5db199SXin Li
35*9c5db199SXin Li    def send_pd_command(self, cmd):
36*9c5db199SXin Li        """Send command to PD console UART
37*9c5db199SXin Li
38*9c5db199SXin Li        @param cmd: pd command string
39*9c5db199SXin Li        """
40*9c5db199SXin Li        self.console.send_command(cmd)
41*9c5db199SXin Li
42*9c5db199SXin Li    def send_pd_command_get_output(self, cmd, regexp, debug_on=True):
43*9c5db199SXin Li        """Send command to PD console, wait for response
44*9c5db199SXin Li
45*9c5db199SXin Li        @param cmd: pd command string
46*9c5db199SXin Li        @param regexp: regular expression for desired output
47*9c5db199SXin Li        """
48*9c5db199SXin Li        # Enable PD console debug mode to show control messages
49*9c5db199SXin Li        if debug_on:
50*9c5db199SXin Li            self.enable_pd_console_debug()
51*9c5db199SXin Li        output = self.console.send_command_get_output(cmd, regexp)
52*9c5db199SXin Li        if debug_on:
53*9c5db199SXin Li            self.disable_pd_console_debug()
54*9c5db199SXin Li        return output
55*9c5db199SXin Li
56*9c5db199SXin Li    def send_pd_command_get_reply_msg(self, cmd):
57*9c5db199SXin Li        """Send PD protocol msg, get PD control msg reply
58*9c5db199SXin Li
59*9c5db199SXin Li        The PD console debug mode is enabled prior to sending
60*9c5db199SXin Li        a pd protocol message. This allows the
61*9c5db199SXin Li        control message reply to be extracted. The debug mode
62*9c5db199SXin Li        is disabled prior to exiting.
63*9c5db199SXin Li
64*9c5db199SXin Li        @param cmd: pd command to issue to the UART console
65*9c5db199SXin Li
66*9c5db199SXin Li        @returns: PD control header message
67*9c5db199SXin Li        """
68*9c5db199SXin Li        m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)\W'])
69*9c5db199SXin Li        ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
70*9c5db199SXin Li        return ctrl_msg
71*9c5db199SXin Li
72*9c5db199SXin Li    def verify_pd_console(self):
73*9c5db199SXin Li        """Verify that PD commands exist on UART console
74*9c5db199SXin Li
75*9c5db199SXin Li        Send 'help' command to UART console
76*9c5db199SXin Li
77*9c5db199SXin Li        @returns: True if 'pd' is found, False if not
78*9c5db199SXin Li        """
79*9c5db199SXin Li
80*9c5db199SXin Li        l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
81*9c5db199SXin Li        if l[0][1] == 'pd':
82*9c5db199SXin Li            return True
83*9c5db199SXin Li        else:
84*9c5db199SXin Li            return False
85*9c5db199SXin Li
86*9c5db199SXin Li    def get_pd_version(self):
87*9c5db199SXin Li        """Get the version of the PD stack
88*9c5db199SXin Li
89*9c5db199SXin Li        @returns: version of PD stack, one of (1, 2)
90*9c5db199SXin Li        """
91*9c5db199SXin Li        # Match a number or an error ("Wrong number of params")
92*9c5db199SXin Li        matches = self.console.send_command_get_output('pd version',
93*9c5db199SXin Li                                                       [r'\s+(\d+|Wrong.*)'])
94*9c5db199SXin Li        if matches:
95*9c5db199SXin Li            result = matches[0][1]
96*9c5db199SXin Li            if result[0].isdigit():
97*9c5db199SXin Li                return int(result)
98*9c5db199SXin Li        return 1
99*9c5db199SXin Li
100*9c5db199SXin Li    def execute_pd_state_cmd(self, port):
101*9c5db199SXin Li        """Get PD state for specified channel
102*9c5db199SXin Li
103*9c5db199SXin Li        pd 0/1 state command gives produces 5 fields. The full response
104*9c5db199SXin Li        line is captured and then parsed to extract each field to fill
105*9c5db199SXin Li        the dict containing port, polarity, role, pd_state, and flags.
106*9c5db199SXin Li
107*9c5db199SXin Li        @param port: Type C PD port 0 or 1
108*9c5db199SXin Li
109*9c5db199SXin Li        @returns: A dict with the 5 fields listed above
110*9c5db199SXin Li        @raises: TestFail if any field not found
111*9c5db199SXin Li        """
112*9c5db199SXin Li        cmd = 'pd'
113*9c5db199SXin Li        subcmd = 'state'
114*9c5db199SXin Li        pd_cmd = cmd +" " + str(port) + " " + subcmd
115*9c5db199SXin Li        time.sleep(self.CURRENT_STATE_PROBE_DELAY)
116*9c5db199SXin Li        # Two FW versions for this command, get full line.
117*9c5db199SXin Li        m = self.send_pd_command_get_output(pd_cmd, ['(Port.*) - (Role:.*)\n'],
118*9c5db199SXin Li                                            debug_on=False)
119*9c5db199SXin Li
120*9c5db199SXin Li        # Extract desired values from result string
121*9c5db199SXin Li        state_result = {}
122*9c5db199SXin Li        pd_state_dict = self.PD_STATE_DICT
123*9c5db199SXin Li
124*9c5db199SXin Li        for key, regexp in six.iteritems(pd_state_dict):
125*9c5db199SXin Li            value = re.search(regexp, m[0][0])
126*9c5db199SXin Li            if value:
127*9c5db199SXin Li                state_result[key] = value.group(1)
128*9c5db199SXin Li            else:
129*9c5db199SXin Li                raise error.TestFail('pd %d state: %r value not found' %
130*9c5db199SXin Li                                     (port, key))
131*9c5db199SXin Li
132*9c5db199SXin Li        return state_result
133*9c5db199SXin Li
134*9c5db199SXin Li    def get_pd_state(self, port):
135*9c5db199SXin Li        """Get the current PD state
136*9c5db199SXin Li
137*9c5db199SXin Li        """
138*9c5db199SXin Li        raise NotImplementedError(
139*9c5db199SXin Li            'should be implemented in derived class')
140*9c5db199SXin Li
141*9c5db199SXin Li    def get_pd_port(self, port):
142*9c5db199SXin Li        """Get the current PD port
143*9c5db199SXin Li
144*9c5db199SXin Li        @param port: Type C PD port 0/1
145*9c5db199SXin Li        @returns: current pd state
146*9c5db199SXin Li        """
147*9c5db199SXin Li        pd_dict = self.execute_pd_state_cmd(port)
148*9c5db199SXin Li        return pd_dict['port']
149*9c5db199SXin Li
150*9c5db199SXin Li    def get_pd_role(self, port):
151*9c5db199SXin Li        """Get the current PD power role (source or sink)
152*9c5db199SXin Li
153*9c5db199SXin Li        @param port: Type C PD port 0/1
154*9c5db199SXin Li        @returns: current pd state
155*9c5db199SXin Li        """
156*9c5db199SXin Li        pd_dict = self.execute_pd_state_cmd(port)
157*9c5db199SXin Li        return pd_dict['role']
158*9c5db199SXin Li
159*9c5db199SXin Li    def get_pd_flags(self, port):
160*9c5db199SXin Li        """Get the current PD flags
161*9c5db199SXin Li
162*9c5db199SXin Li        @param port: Type C PD port 0/1
163*9c5db199SXin Li        @returns: current pd state
164*9c5db199SXin Li        """
165*9c5db199SXin Li        pd_dict = self.execute_pd_state_cmd(port)
166*9c5db199SXin Li        return pd_dict['flags']
167*9c5db199SXin Li
168*9c5db199SXin Li    def get_pd_dualrole(self, port):
169*9c5db199SXin Li        """Get the current PD dualrole setting
170*9c5db199SXin Li
171*9c5db199SXin Li        @param port: Type C PD port 0/1
172*9c5db199SXin Li        @returns: current PD dualrole setting, one of (on, off, snk, src)
173*9c5db199SXin Li        """
174*9c5db199SXin Li
175*9c5db199SXin Li        if self.per_port_dualrole_setting is True:
176*9c5db199SXin Li            cmd = 'pd %d dualrole' % port
177*9c5db199SXin Li        elif self.per_port_dualrole_setting is False:
178*9c5db199SXin Li            cmd = 'pd dualrole'
179*9c5db199SXin Li        else:
180*9c5db199SXin Li            try:
181*9c5db199SXin Li                self.per_port_dualrole_setting = True
182*9c5db199SXin Li                return self.get_pd_dualrole(port)
183*9c5db199SXin Li            except:
184*9c5db199SXin Li                self.per_port_dualrole_setting = False
185*9c5db199SXin Li                return self.get_pd_dualrole(port)
186*9c5db199SXin Li
187*9c5db199SXin Li        dualrole_values = self.DUALROLE_VALUES
188*9c5db199SXin Li
189*9c5db199SXin Li        m = self.send_pd_command_get_output(
190*9c5db199SXin Li                cmd, ['dual-role toggling:\s+([\w ]+)[\r\n]'], debug_on=False)
191*9c5db199SXin Li        # Find the index according to the output of "pd dualrole" command
192*9c5db199SXin Li        dual_index = self.DUALROLE_CMD_RESULTS.index(m[0][1])
193*9c5db199SXin Li        # Map to a string which is the output of this method
194*9c5db199SXin Li        return dualrole_values[dual_index]
195*9c5db199SXin Li
196*9c5db199SXin Li    def set_pd_dualrole(self, port, value):
197*9c5db199SXin Li        """Set pd dualrole
198*9c5db199SXin Li
199*9c5db199SXin Li        """
200*9c5db199SXin Li        raise NotImplementedError(
201*9c5db199SXin Li            'should be implemented in derived class')
202*9c5db199SXin Li
203*9c5db199SXin Li    def query_pd_connection(self):
204*9c5db199SXin Li        """Determine if PD connection is present
205*9c5db199SXin Li
206*9c5db199SXin Li        Try the 'pd 0/1 state' command and see if it's in either
207*9c5db199SXin Li        expected state of a connection. Record the port number
208*9c5db199SXin Li        that has an active connection
209*9c5db199SXin Li
210*9c5db199SXin Li        @returns: dict with params port, connect, and state
211*9c5db199SXin Li        """
212*9c5db199SXin Li        status = {}
213*9c5db199SXin Li        port = 0;
214*9c5db199SXin Li        status['connect'] = False
215*9c5db199SXin Li        status['port'] = port
216*9c5db199SXin Li        state = self.get_pd_state(port)
217*9c5db199SXin Li        # Check port 0 first
218*9c5db199SXin Li
219*9c5db199SXin Li        if self.is_pd_connected(port):
220*9c5db199SXin Li            status['connect'] = True
221*9c5db199SXin Li            status['role'] = state
222*9c5db199SXin Li        else:
223*9c5db199SXin Li            port = 1
224*9c5db199SXin Li            status['port'] = port
225*9c5db199SXin Li            state = self.get_pd_state(port)
226*9c5db199SXin Li            logging.info('CHECK PORT 1: %s', state)
227*9c5db199SXin Li            # Check port 1
228*9c5db199SXin Li            if self.is_pd_connected(port):
229*9c5db199SXin Li                status['connect'] = True
230*9c5db199SXin Li                status['role'] = state
231*9c5db199SXin Li
232*9c5db199SXin Li        return status
233*9c5db199SXin Li
234*9c5db199SXin Li    def swap_power_role(self, port):
235*9c5db199SXin Li        """Attempt a power role swap
236*9c5db199SXin Li
237*9c5db199SXin Li        This method attempts to execute a power role swap. A check
238*9c5db199SXin Li        is made to ensure that dualrole mode is enabled and that
239*9c5db199SXin Li        a PD contract is currently established. If both checks pass,
240*9c5db199SXin Li        then the power role swap command is issued. After a delay,
241*9c5db199SXin Li        if a PD contract is established and the current state does
242*9c5db199SXin Li        not equal the starting state, then it was successful.
243*9c5db199SXin Li
244*9c5db199SXin Li        @param port: pd port number
245*9c5db199SXin Li
246*9c5db199SXin Li        @returns: True if power swap is successful, False otherwise.
247*9c5db199SXin Li        """
248*9c5db199SXin Li        # Get starting state
249*9c5db199SXin Li        if self.is_pd_dual_role_enabled(port) == False:
250*9c5db199SXin Li            logging.info('Dualrole Mode not enabled!')
251*9c5db199SXin Li            return False
252*9c5db199SXin Li        if self.is_pd_connected(port) == False:
253*9c5db199SXin Li            logging.info('PD contract not established!')
254*9c5db199SXin Li            return False
255*9c5db199SXin Li        current_pr = self.get_pd_state(port)
256*9c5db199SXin Li        swap_cmd = 'pd %d swap power' % port
257*9c5db199SXin Li        self.send_pd_command(swap_cmd)
258*9c5db199SXin Li        time.sleep(self.CONNECT_TIME)
259*9c5db199SXin Li        new_pr = self.get_pd_state(port)
260*9c5db199SXin Li        logging.info('Power swap: %s -> %s', current_pr, new_pr)
261*9c5db199SXin Li        if self.is_pd_connected(port) == False:
262*9c5db199SXin Li            return False
263*9c5db199SXin Li        return bool(current_pr != new_pr)
264*9c5db199SXin Li
265*9c5db199SXin Li    def disable_pd_console_debug(self):
266*9c5db199SXin Li        """Turn off PD console debug
267*9c5db199SXin Li
268*9c5db199SXin Li        """
269*9c5db199SXin Li        cmd = 'pd dump 0'
270*9c5db199SXin Li        self.send_pd_command(cmd)
271*9c5db199SXin Li
272*9c5db199SXin Li    def enable_pd_console_debug(self):
273*9c5db199SXin Li        """Enable PD console debug level 1
274*9c5db199SXin Li
275*9c5db199SXin Li        """
276*9c5db199SXin Li        cmd = 'pd dump 2'
277*9c5db199SXin Li        self.send_pd_command(cmd)
278*9c5db199SXin Li
279*9c5db199SXin Li    def is_pd_flag_set(self, port, key):
280*9c5db199SXin Li        """Test a bit in PD protocol state flags
281*9c5db199SXin Li
282*9c5db199SXin Li        The flag word contains various PD protocol state information.
283*9c5db199SXin Li        This method allows for a specific flag to be tested.
284*9c5db199SXin Li
285*9c5db199SXin Li        @param port: Port which has the active PD connection
286*9c5db199SXin Li        @param key: dict key to retrieve the flag bit mapping
287*9c5db199SXin Li
288*9c5db199SXin Li        @returns True if the bit to be tested is set
289*9c5db199SXin Li        """
290*9c5db199SXin Li        pd_flags = self.get_pd_flags(port)
291*9c5db199SXin Li        return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
292*9c5db199SXin Li
293*9c5db199SXin Li    def is_pd_connected(self, port):
294*9c5db199SXin Li        """Check if a PD connection is active
295*9c5db199SXin Li
296*9c5db199SXin Li        @param port: port to be used for pd console commands
297*9c5db199SXin Li
298*9c5db199SXin Li        @returns True if port is in connected state
299*9c5db199SXin Li        """
300*9c5db199SXin Li        return self.is_src_connected(port) or self.is_snk_connected(port)
301*9c5db199SXin Li
302*9c5db199SXin Li    def is_pd_dual_role_enabled(self, port):
303*9c5db199SXin Li        """Check if a PD device is in dualrole mode
304*9c5db199SXin Li
305*9c5db199SXin Li        @param port: Type C PD port 0/1
306*9c5db199SXin Li
307*9c5db199SXin Li        @returns True is dualrole mode is active, false otherwise
308*9c5db199SXin Li        """
309*9c5db199SXin Li        drp = self.get_pd_dualrole(port)
310*9c5db199SXin Li        return drp == 'on'
311*9c5db199SXin Li
312*9c5db199SXin Li    def is_src_connected(self, port, state=None):
313*9c5db199SXin Li        """Checks if the port is connected as a source
314*9c5db199SXin Li
315*9c5db199SXin Li        @param port: Type C PD port 0/1
316*9c5db199SXin Li        @param state: the state to check (None to get current state)
317*9c5db199SXin Li
318*9c5db199SXin Li        @returns True if connected as SRC, False otherwise
319*9c5db199SXin Li        """
320*9c5db199SXin Li        if state is None:
321*9c5db199SXin Li            state = self.get_pd_state(port)
322*9c5db199SXin Li        return state in self.get_src_connect_states()
323*9c5db199SXin Li
324*9c5db199SXin Li    def is_snk_connected(self, port, state=None):
325*9c5db199SXin Li        """Checks if the port is connected as a sink
326*9c5db199SXin Li
327*9c5db199SXin Li        @param port: Type C PD port 0/1
328*9c5db199SXin Li        @param state: the state to check (None to get current state)
329*9c5db199SXin Li
330*9c5db199SXin Li        @returns True if connected as SNK, False otherwise
331*9c5db199SXin Li        """
332*9c5db199SXin Li        if state is None:
333*9c5db199SXin Li            state = self.get_pd_state(port)
334*9c5db199SXin Li        return state in self.get_snk_connect_states()
335*9c5db199SXin Li
336*9c5db199SXin Li    def is_disconnected(self, port, state=None):
337*9c5db199SXin Li        """Checks if the port is disconnected
338*9c5db199SXin Li
339*9c5db199SXin Li        @param port: Type C PD port 0/1
340*9c5db199SXin Li        @param state: the state to check (None to get current state)
341*9c5db199SXin Li
342*9c5db199SXin Li        @return True if disconnected
343*9c5db199SXin Li        """
344*9c5db199SXin Li        if state is None:
345*9c5db199SXin Li            state = self.get_pd_state(port)
346*9c5db199SXin Li        return state in self.get_disconnected_states()
347*9c5db199SXin Li
348*9c5db199SXin Li    def get_src_connect_states(self):
349*9c5db199SXin Li        """Returns the name of the SRC state
350*9c5db199SXin Li
351*9c5db199SXin Li        """
352*9c5db199SXin Li        raise NotImplementedError(
353*9c5db199SXin Li            'should be implemented in derived class')
354*9c5db199SXin Li
355*9c5db199SXin Li    def get_snk_connect_states(self):
356*9c5db199SXin Li        """Returns the name of the SNK state
357*9c5db199SXin Li
358*9c5db199SXin Li        """
359*9c5db199SXin Li        raise NotImplementedError(
360*9c5db199SXin Li            'should be implemented in derived class')
361*9c5db199SXin Li
362*9c5db199SXin Li    def get_disconnected_states(self):
363*9c5db199SXin Li        """Returns the names of the disconnected states
364*9c5db199SXin Li
365*9c5db199SXin Li        """
366*9c5db199SXin Li        return self.DISCONNECTED_STATES
367*9c5db199SXin Li
368*9c5db199SXin Li    def is_snk_discovery_state(self, port):
369*9c5db199SXin Li        """Returns true if in snk discovery state, else false
370*9c5db199SXin Li
371*9c5db199SXin Li        @param port: Type C PD port 0/1
372*9c5db199SXin Li
373*9c5db199SXin Li        @return: True if in SNK Discovery state
374*9c5db199SXin Li        """
375*9c5db199SXin Li        raise NotImplementedError(
376*9c5db199SXin Li            'should be implemented in derived class')
377*9c5db199SXin Li
378*9c5db199SXin Liclass TCPMv1ConsoleUtils(PDConsoleUtils):
379*9c5db199SXin Li    """ Provides a set of methods common to USB PD TCPMv1 FAFT tests
380*9c5db199SXin Li
381*9c5db199SXin Li    Each instance of this class is associated with a particular
382*9c5db199SXin Li    servo UART console. USB PD tests will typically use the console
383*9c5db199SXin Li    command 'pd' and its subcommands to control/monitor Type C PD
384*9c5db199SXin Li    connections. The servo object used for UART operations is
385*9c5db199SXin Li    passed in and stored when this object is created.
386*9c5db199SXin Li
387*9c5db199SXin Li    """
388*9c5db199SXin Li    SRC_CONNECT = ('SRC_READY',)
389*9c5db199SXin Li    SNK_CONNECT = ('SNK_READY',)
390*9c5db199SXin Li    SRC_DISC = 'SRC_DISCONNECTED'
391*9c5db199SXin Li    SNK_DISC = 'SNK_DISCONNECTED'
392*9c5db199SXin Li    SNK_DISCOVERY = 'SNK_DISCOVERY'
393*9c5db199SXin Li    DRP_AUTO_TOGGLE = 'DRP_AUTO_TOGGLE'
394*9c5db199SXin Li    DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE)
395*9c5db199SXin Li
396*9c5db199SXin Li    PD_MAX_PORTS = 2
397*9c5db199SXin Li    CONNECT_TIME = 4
398*9c5db199SXin Li
399*9c5db199SXin Li    CURRENT_STATE_PROBE_DELAY = 2
400*9c5db199SXin Li    DUALROLE_QUERY_DELAY = 1
401*9c5db199SXin Li    # Dualrole input/output values of methods in this class.
402*9c5db199SXin Li    DUALROLE_VALUES = ['on', 'off', 'snk', 'src']
403*9c5db199SXin Li    # Strings passing to the console command "pd dualrole"
404*9c5db199SXin Li    DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source']
405*9c5db199SXin Li    # Strings returned from the console command "pd dualrole"
406*9c5db199SXin Li    DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source']
407*9c5db199SXin Li
408*9c5db199SXin Li    # Some old firmware uses a single dualrole setting for all ports; while
409*9c5db199SXin Li    # some new firmware uses a per port dualrole setting. This flag will be
410*9c5db199SXin Li    # initialized to True or False.
411*9c5db199SXin Li    # TODO: Remove this flag when the old setting phases out
412*9c5db199SXin Li    per_port_dualrole_setting = None
413*9c5db199SXin Li
414*9c5db199SXin Li    # Dictionary for 'pd 0/1 state' parsing
415*9c5db199SXin Li    PD_STATE_DICT = {
416*9c5db199SXin Li        'port': 'Port\s+([\w]+)',
417*9c5db199SXin Li        'role': 'Role:\s+([\w]+-[\w]+)',
418*9c5db199SXin Li        'pd_state': 'State:\s+([\w()]+)',
419*9c5db199SXin Li        'flags': 'Flags:\s+([\w]+)',
420*9c5db199SXin Li        'polarity': '(CC\d)'
421*9c5db199SXin Li    }
422*9c5db199SXin Li
423*9c5db199SXin Li    # Regex to match PD state name; work for both old and new formats
424*9c5db199SXin Li    RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?"
425*9c5db199SXin Li    # Copied from ec repo: common/usb_pd_protocol.c
426*9c5db199SXin Li    PD_STATE_NAMES = [
427*9c5db199SXin Li        "DISABLED",                   # index: 0
428*9c5db199SXin Li        "SUSPENDED",
429*9c5db199SXin Li        "SNK_DISCONNECTED",
430*9c5db199SXin Li        "SNK_DISCONNECTED_DEBOUNCE",
431*9c5db199SXin Li        "SNK_HARD_RESET_RECOVER",
432*9c5db199SXin Li        "SNK_DISCOVERY",              # index: 5
433*9c5db199SXin Li        "SNK_REQUESTED",
434*9c5db199SXin Li        "SNK_TRANSITION",
435*9c5db199SXin Li        "SNK_READY",
436*9c5db199SXin Li        "SNK_SWAP_INIT",
437*9c5db199SXin Li        "SNK_SWAP_SNK_DISABLE",       # index: 10
438*9c5db199SXin Li        "SNK_SWAP_SRC_DISABLE",
439*9c5db199SXin Li        "SNK_SWAP_STANDBY",
440*9c5db199SXin Li        "SNK_SWAP_COMPLETE",
441*9c5db199SXin Li        "SRC_DISCONNECTED",
442*9c5db199SXin Li        "SRC_DISCONNECTED_DEBOUNCE",  # index: 15
443*9c5db199SXin Li        "SRC_HARD_RESET_RECOVER",
444*9c5db199SXin Li        "SRC_STARTUP",
445*9c5db199SXin Li        "SRC_DISCOVERY",
446*9c5db199SXin Li        "SRC_NEGOCIATE",
447*9c5db199SXin Li        "SRC_ACCEPTED",               # index: 20
448*9c5db199SXin Li        "SRC_POWERED",
449*9c5db199SXin Li        "SRC_TRANSITION",
450*9c5db199SXin Li        "SRC_READY",
451*9c5db199SXin Li        "SRC_GET_SNK_CAP",
452*9c5db199SXin Li        "DR_SWAP",                    # index: 25
453*9c5db199SXin Li        "SRC_SWAP_INIT",
454*9c5db199SXin Li        "SRC_SWAP_SNK_DISABLE",
455*9c5db199SXin Li        "SRC_SWAP_SRC_DISABLE",
456*9c5db199SXin Li        "SRC_SWAP_STANDBY",
457*9c5db199SXin Li        "VCONN_SWAP_SEND",            # index: 30
458*9c5db199SXin Li        "VCONN_SWAP_INIT",
459*9c5db199SXin Li        "VCONN_SWAP_READY",
460*9c5db199SXin Li        "SOFT_RESET",
461*9c5db199SXin Li        "HARD_RESET_SEND",
462*9c5db199SXin Li        "HARD_RESET_EXECUTE",         # index: 35
463*9c5db199SXin Li        "BIST_RX",
464*9c5db199SXin Li        "BIST_TX",
465*9c5db199SXin Li        "DRP_AUTO_TOGGLE",
466*9c5db199SXin Li    ]
467*9c5db199SXin Li
468*9c5db199SXin Li    # Dictionary for PD control message types
469*9c5db199SXin Li    PD_CONTROL_MSG_MASK = 0x1f
470*9c5db199SXin Li    PD_CONTROL_MSG_DICT = {
471*9c5db199SXin Li        'GoodCRC': 1,
472*9c5db199SXin Li        'GotoMin': 2,
473*9c5db199SXin Li        'Accept': 3,
474*9c5db199SXin Li        'Reject': 4,
475*9c5db199SXin Li        'Ping': 5,
476*9c5db199SXin Li        'PS_RDY': 6,
477*9c5db199SXin Li        'Get_Source_Cap': 7,
478*9c5db199SXin Li        'Get_Sink_Cap': 8,
479*9c5db199SXin Li        'DR_Swap': 9,
480*9c5db199SXin Li        'PR_Swap': 10,
481*9c5db199SXin Li        'VCONN_Swap': 11,
482*9c5db199SXin Li        'Wait': 12,
483*9c5db199SXin Li        'Soft_Reset': 13
484*9c5db199SXin Li    }
485*9c5db199SXin Li
486*9c5db199SXin Li    # Dictionary for PD firmware state flags
487*9c5db199SXin Li    PD_STATE_FLAGS_DICT = {
488*9c5db199SXin Li        'power_swap': 1 << 1,
489*9c5db199SXin Li        'data_swap': 1 << 2,
490*9c5db199SXin Li        'data_swap_active': 1 << 3,
491*9c5db199SXin Li        'vconn_on': 1 << 12
492*9c5db199SXin Li    }
493*9c5db199SXin Li
494*9c5db199SXin Li    def _normalize_pd_state(self, state):
495*9c5db199SXin Li        """Normalize the PD state name which handles both old and new formats.
496*9c5db199SXin Li
497*9c5db199SXin Li        The old format is like: "SNK_READY"
498*9c5db199SXin Li        The new format is like: "8()" if debug_level == 0, or
499*9c5db199SXin Li                                "8(SNK_READY)" if debug_level > 0
500*9c5db199SXin Li
501*9c5db199SXin Li        This method will convert the new format to the old one.
502*9c5db199SXin Li
503*9c5db199SXin Li        @param state: The raw PD state text
504*9c5db199SXin Li
505*9c5db199SXin Li        @returns: The normalized PD state name
506*9c5db199SXin Li        @raises: TestFail if unexpected PD state format
507*9c5db199SXin Li        """
508*9c5db199SXin Li        m = re.match(self.RE_PD_STATE, state)
509*9c5db199SXin Li        if m and any(m.groups()):
510*9c5db199SXin Li            state_index, state_name = m.groups()
511*9c5db199SXin Li            if state_index is None:
512*9c5db199SXin Li                # The old format: return the name
513*9c5db199SXin Li                return state_name
514*9c5db199SXin Li            # The new format: map the index to a name
515*9c5db199SXin Li            mapped_name = self.PD_STATE_NAMES[int(state_index)]
516*9c5db199SXin Li            if state_name is not None:
517*9c5db199SXin Li                assert mapped_name == state_name
518*9c5db199SXin Li            return mapped_name
519*9c5db199SXin Li        else:
520*9c5db199SXin Li            raise error.TestFail('Unexpected PD state format: %s' % state)
521*9c5db199SXin Li
522*9c5db199SXin Li    def get_pd_state(self, port):
523*9c5db199SXin Li        """Get the current PD state
524*9c5db199SXin Li
525*9c5db199SXin Li        @param port: Type C PD port 0/1
526*9c5db199SXin Li        @returns: current pd state
527*9c5db199SXin Li        """
528*9c5db199SXin Li
529*9c5db199SXin Li        pd_dict = self.execute_pd_state_cmd(port)
530*9c5db199SXin Li        return self._normalize_pd_state(pd_dict['pd_state'])
531*9c5db199SXin Li
532*9c5db199SXin Li    def set_pd_dualrole(self, port, value):
533*9c5db199SXin Li        """Set pd dualrole
534*9c5db199SXin Li
535*9c5db199SXin Li        It can be set to either:
536*9c5db199SXin Li        1. on
537*9c5db199SXin Li        2. off
538*9c5db199SXin Li        3. snk (force sink mode)
539*9c5db199SXin Li        4. src (force source mode)
540*9c5db199SXin Li        After setting, the current value is read to confirm that it
541*9c5db199SXin Li        was set properly.
542*9c5db199SXin Li
543*9c5db199SXin Li        @param port: Type C PD port 0/1
544*9c5db199SXin Li        @param value: One of the 4 options listed
545*9c5db199SXin Li        """
546*9c5db199SXin Li        dualrole_values = self.DUALROLE_VALUES
547*9c5db199SXin Li        # If the dualrole setting is not initialized, call the get method to
548*9c5db199SXin Li        # initialize it.
549*9c5db199SXin Li        if self.per_port_dualrole_setting is None:
550*9c5db199SXin Li            self.get_pd_dualrole(port)
551*9c5db199SXin Li
552*9c5db199SXin Li        # Get string required for console command
553*9c5db199SXin Li        dual_index = dualrole_values.index(value)
554*9c5db199SXin Li        # Create console command
555*9c5db199SXin Li        if self.per_port_dualrole_setting is True:
556*9c5db199SXin Li            cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index])
557*9c5db199SXin Li        elif self.per_port_dualrole_setting is False:
558*9c5db199SXin Li            cmd = 'pd dualrole %s' % (self.DUALROLE_CMD_ARGS[dual_index])
559*9c5db199SXin Li        else:
560*9c5db199SXin Li            raise error.TestFail("dualrole error")
561*9c5db199SXin Li
562*9c5db199SXin Li        self.console.send_command(cmd)
563*9c5db199SXin Li        time.sleep(self.DUALROLE_QUERY_DELAY)
564*9c5db199SXin Li        # Get current setting to verify that command was successful
565*9c5db199SXin Li        dual = self.get_pd_dualrole(port)
566*9c5db199SXin Li        # If it doesn't match, then raise error
567*9c5db199SXin Li        if dual != value:
568*9c5db199SXin Li            raise error.TestFail("dualrole error: " + value + " != " + dual)
569*9c5db199SXin Li
570*9c5db199SXin Li    def get_src_connect_states(self):
571*9c5db199SXin Li        """Returns the name of the SRC state
572*9c5db199SXin Li
573*9c5db199SXin Li        """
574*9c5db199SXin Li        return self.SRC_CONNECT
575*9c5db199SXin Li
576*9c5db199SXin Li    def get_snk_connect_states(self):
577*9c5db199SXin Li        """Returns the name of the SRC state
578*9c5db199SXin Li
579*9c5db199SXin Li        """
580*9c5db199SXin Li        return self.SNK_CONNECT
581*9c5db199SXin Li
582*9c5db199SXin Li    def is_snk_discovery_state(self, port):
583*9c5db199SXin Li        """Returns true if in snk discovery state, else false
584*9c5db199SXin Li
585*9c5db199SXin Li        @param port: Type C PD port 0/1
586*9c5db199SXin Li
587*9c5db199SXin Li        @return: True if in SNK Discovery state
588*9c5db199SXin Li        """
589*9c5db199SXin Li        state = self.get_pd_state(port)
590*9c5db199SXin Li        return state == self.SNK_DISCOVERY
591*9c5db199SXin Li
592*9c5db199SXin Liclass TCPMv2ConsoleUtils(PDConsoleUtils):
593*9c5db199SXin Li    """ Provides a set of methods common to USB PD TCPMv1 FAFT tests
594*9c5db199SXin Li
595*9c5db199SXin Li    Each instance of this class is associated with a particular
596*9c5db199SXin Li    servo UART console. USB PD tests will typically use the console
597*9c5db199SXin Li    command 'pd' and its subcommands to control/monitor Type C PD
598*9c5db199SXin Li    connections. The servo object used for UART operations is
599*9c5db199SXin Li    passed in and stored when this object is created.
600*9c5db199SXin Li
601*9c5db199SXin Li    """
602*9c5db199SXin Li    SRC_CONNECT = ('Attached.SRC', 'UnorientedDebugAccessory.SRC')
603*9c5db199SXin Li    SNK_CONNECT = ('Attached.SNK', 'DebugAccessory.SNK')
604*9c5db199SXin Li    SRC_DISC = 'Unattached.SRC'
605*9c5db199SXin Li    SNK_DISC = 'Unattached.SNK'
606*9c5db199SXin Li    DRP_AUTO_TOGGLE = 'DRPAutoToggle'
607*9c5db199SXin Li    LOW_POWER_MODE = 'LowPowerMode'
608*9c5db199SXin Li    SNK_DISCOVERY = 'PE_SNK_DISCOVERY'
609*9c5db199SXin Li    DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE, LOW_POWER_MODE)
610*9c5db199SXin Li
611*9c5db199SXin Li    PD_MAX_PORTS = 2
612*9c5db199SXin Li    CONNECT_TIME = 4
613*9c5db199SXin Li
614*9c5db199SXin Li    CURRENT_STATE_PROBE_DELAY = 2
615*9c5db199SXin Li    DUALROLE_QUERY_DELAY = 1
616*9c5db199SXin Li    # Dualrole input/output values of methods in this class.
617*9c5db199SXin Li    DUALROLE_VALUES = ['on', 'off', 'sink', 'source']
618*9c5db199SXin Li    # Strings passing to the console command "pd dualrole"
619*9c5db199SXin Li    DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source']
620*9c5db199SXin Li    # Strings returned from the console command "pd dualrole"
621*9c5db199SXin Li    DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source']
622*9c5db199SXin Li
623*9c5db199SXin Li    # Some old firmware uses a single dualrole setting for all ports; while
624*9c5db199SXin Li    # some new firmware uses a per port dualrole setting. This flag will be
625*9c5db199SXin Li    # initialized to True or False.
626*9c5db199SXin Li    # TODO: Remove this flag when the old setting phases out
627*9c5db199SXin Li    per_port_dualrole_setting = None
628*9c5db199SXin Li
629*9c5db199SXin Li    # Dictionary for 'pd 0/1 state' parsing
630*9c5db199SXin Li    PD_STATE_DICT = {
631*9c5db199SXin Li        'port': 'Port\s+([\w]+)',
632*9c5db199SXin Li        'role': 'Role:\s+([\w]+-[\w]+)',
633*9c5db199SXin Li        'pd_state': 'TC State:\s+([\w().]+)',
634*9c5db199SXin Li        'flags': 'Flags:\s+([\w]+)',
635*9c5db199SXin Li        'pe_state': 'PE State:\s+(\w*)',
636*9c5db199SXin Li        'polarity': '(CC\d)'
637*9c5db199SXin Li    }
638*9c5db199SXin Li
639*9c5db199SXin Li    # Regex to match PD state name; work for both old and new formats
640*9c5db199SXin Li    RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?"
641*9c5db199SXin Li
642*9c5db199SXin Li    # Dictionary for PD control message types
643*9c5db199SXin Li    PD_CONTROL_MSG_MASK = 0x1f
644*9c5db199SXin Li    PD_CONTROL_MSG_DICT = {
645*9c5db199SXin Li        'GoodCRC': 1,
646*9c5db199SXin Li        'GotoMin': 2,
647*9c5db199SXin Li        'Accept': 3,
648*9c5db199SXin Li        'Reject': 4,
649*9c5db199SXin Li        'Ping': 5,
650*9c5db199SXin Li        'PS_RDY': 6,
651*9c5db199SXin Li        'Get_Source_Cap': 7,
652*9c5db199SXin Li        'Get_Sink_Cap': 8,
653*9c5db199SXin Li        'DR_Swap': 9,
654*9c5db199SXin Li        'PR_Swap': 10,
655*9c5db199SXin Li        'VCONN_Swap': 11,
656*9c5db199SXin Li        'Wait': 12,
657*9c5db199SXin Li        'Soft_Reset': 13
658*9c5db199SXin Li    }
659*9c5db199SXin Li
660*9c5db199SXin Li    # Dictionary for PD firmware state flags
661*9c5db199SXin Li    PD_STATE_FLAGS_DICT = {
662*9c5db199SXin Li        'power_swap': 1 << 1,
663*9c5db199SXin Li        'data_swap': 1 << 2,
664*9c5db199SXin Li        'data_swap_active': 1 << 3,
665*9c5db199SXin Li        'vconn_on': 1 << 12
666*9c5db199SXin Li    }
667*9c5db199SXin Li
668*9c5db199SXin Li    def get_pe_state(self, port):
669*9c5db199SXin Li        """Get the current Policy Engine state
670*9c5db199SXin Li
671*9c5db199SXin Li        @param port: Type C PD port 0/1
672*9c5db199SXin Li        @returns: current pe state
673*9c5db199SXin Li        """
674*9c5db199SXin Li
675*9c5db199SXin Li        pd_dict = self.execute_pd_state_cmd(port)
676*9c5db199SXin Li        return pd_dict['pe_state']
677*9c5db199SXin Li
678*9c5db199SXin Li    def get_pd_state(self, port):
679*9c5db199SXin Li        """Get the current PD state
680*9c5db199SXin Li
681*9c5db199SXin Li        @param port: Type C PD port 0/1
682*9c5db199SXin Li        @returns: current pd state
683*9c5db199SXin Li        """
684*9c5db199SXin Li
685*9c5db199SXin Li        pd_dict = self.execute_pd_state_cmd(port)
686*9c5db199SXin Li        return pd_dict['pd_state']
687*9c5db199SXin Li
688*9c5db199SXin Li    def set_pd_dualrole(self, port, value):
689*9c5db199SXin Li        """Set pd dualrole
690*9c5db199SXin Li
691*9c5db199SXin Li        It can be set to either:
692*9c5db199SXin Li        1. on
693*9c5db199SXin Li        2. off
694*9c5db199SXin Li        3. snk (force sink mode)
695*9c5db199SXin Li        4. src (force source mode)
696*9c5db199SXin Li        After setting, the current value is read to confirm that it
697*9c5db199SXin Li        was set properly.
698*9c5db199SXin Li
699*9c5db199SXin Li        @param port: Type C PD port 0/1
700*9c5db199SXin Li        @param value: One of the 4 options listed
701*9c5db199SXin Li        """
702*9c5db199SXin Li        dualrole_values = self.DUALROLE_VALUES
703*9c5db199SXin Li
704*9c5db199SXin Li        if value == 'src':
705*9c5db199SXin Li            value = 'source'
706*9c5db199SXin Li        elif value == 'snk':
707*9c5db199SXin Li            value = 'sink'
708*9c5db199SXin Li
709*9c5db199SXin Li        # Get string required for console command
710*9c5db199SXin Li        dual_index = dualrole_values.index(value)
711*9c5db199SXin Li        # Create console command
712*9c5db199SXin Li        cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index])
713*9c5db199SXin Li        self.console.send_command(cmd)
714*9c5db199SXin Li        time.sleep(self.DUALROLE_QUERY_DELAY)
715*9c5db199SXin Li        # Get current setting to verify that command was successful
716*9c5db199SXin Li        dual = self.get_pd_dualrole(port)
717*9c5db199SXin Li        # If it doesn't match, then raise error
718*9c5db199SXin Li        if dual != value:
719*9c5db199SXin Li            raise error.TestFail("dualrole error: " + value + " != " + dual)
720*9c5db199SXin Li
721*9c5db199SXin Li    def get_src_connect_states(self):
722*9c5db199SXin Li        """Returns the name of the SRC states
723*9c5db199SXin Li
724*9c5db199SXin Li        @returns: List of connected source state names
725*9c5db199SXin Li        """
726*9c5db199SXin Li        return self.SRC_CONNECT
727*9c5db199SXin Li
728*9c5db199SXin Li    def get_snk_connect_states(self):
729*9c5db199SXin Li        """Returns the name of the SRC states
730*9c5db199SXin Li
731*9c5db199SXin Li        @returns: List of connected sink state names
732*9c5db199SXin Li        """
733*9c5db199SXin Li        return self.SNK_CONNECT
734*9c5db199SXin Li
735*9c5db199SXin Li    def is_snk_discovery_state(self, port):
736*9c5db199SXin Li        """Returns true if in snk discovery state, else false
737*9c5db199SXin Li
738*9c5db199SXin Li        @param port: Type C PD port 0/1
739*9c5db199SXin Li
740*9c5db199SXin Li        @return: True if in SNK Discovery state
741*9c5db199SXin Li        """
742*9c5db199SXin Li        state = self.get_pe_state(port)
743*9c5db199SXin Li        return state == self.SNK_DISCOVERY
744*9c5db199SXin Li
745*9c5db199SXin Liclass PDConnectionUtils(PDConsoleUtils):
746*9c5db199SXin Li    """Provides a set of methods common to USB PD FAFT tests
747*9c5db199SXin Li
748*9c5db199SXin Li    This class is used for PD utility methods that require access
749*9c5db199SXin Li    to both PDTester and DUT PD consoles.
750*9c5db199SXin Li
751*9c5db199SXin Li    """
752*9c5db199SXin Li
753*9c5db199SXin Li    def __init__(self, dut_console, pdtester_console):
754*9c5db199SXin Li        """
755*9c5db199SXin Li        @param dut_console: PD console object for DUT
756*9c5db199SXin Li        @param pdtester_console: PD console object for PDTester
757*9c5db199SXin Li        """
758*9c5db199SXin Li        # save console for DUT PD UART access functions
759*9c5db199SXin Li        self.dut_console = dut_console
760*9c5db199SXin Li        # save console for PDTester UART access functions
761*9c5db199SXin Li        self.pdtester_console = pdtester_console
762*9c5db199SXin Li
763*9c5db199SXin Li    def _verify_pdtester_connection(self, port):
764*9c5db199SXin Li        """Verify DUT to PDTester PD connection
765*9c5db199SXin Li
766*9c5db199SXin Li        This method checks for a PDTester PD connection for the
767*9c5db199SXin Li        given port by first verifying if a PD connection is present.
768*9c5db199SXin Li        If found, then it uses a PDTester feature to force a PD disconnect.
769*9c5db199SXin Li        If the port is no longer in the connected state, and following
770*9c5db199SXin Li        a delay, is found to be back in the connected state, then
771*9c5db199SXin Li        a DUT pd to PDTester connection is verified.
772*9c5db199SXin Li
773*9c5db199SXin Li        @param port: DUT pd port to test
774*9c5db199SXin Li
775*9c5db199SXin Li        @returns True if DUT to PDTester pd connection is verified
776*9c5db199SXin Li        """
777*9c5db199SXin Li        DISCONNECT_CHECK_TIME = 2
778*9c5db199SXin Li        DISCONNECT_TIME_SEC = 10
779*9c5db199SXin Li        # pdtester console command to force PD disconnect
780*9c5db199SXin Li        disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
781*9c5db199SXin Li        # Only check for PDTester if DUT has active PD connection
782*9c5db199SXin Li        if self.dut_console.is_pd_connected(port):
783*9c5db199SXin Li            # Attempt to force PD disconnection
784*9c5db199SXin Li            self.pdtester_console.send_pd_command(disc_cmd)
785*9c5db199SXin Li            time.sleep(DISCONNECT_CHECK_TIME)
786*9c5db199SXin Li            # Verify that DUT PD port is no longer connected
787*9c5db199SXin Li            if self.dut_console.is_pd_connected(port) == False:
788*9c5db199SXin Li                # Wait for disconnect timer and give time to reconnect
789*9c5db199SXin Li                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
790*9c5db199SXin Li                if self.dut_console.is_pd_connected(port):
791*9c5db199SXin Li                    logging.info('PDTester connection verified on port %d',
792*9c5db199SXin Li                                 port)
793*9c5db199SXin Li                    return True
794*9c5db199SXin Li            else:
795*9c5db199SXin Li                # Could have disconnected other port, allow it to reconnect
796*9c5db199SXin Li                # before exiting.
797*9c5db199SXin Li                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
798*9c5db199SXin Li        return False
799*9c5db199SXin Li
800*9c5db199SXin Li    def find_dut_to_pdtester_connection(self):
801*9c5db199SXin Li        """Find the PD port which is connected to PDTester
802*9c5db199SXin Li
803*9c5db199SXin Li        @returns DUT pd port number if found, None otherwise
804*9c5db199SXin Li        """
805*9c5db199SXin Li        for port in range(self.dut_console.PD_MAX_PORTS):
806*9c5db199SXin Li            # Check for DUT to PDTester connection on port
807*9c5db199SXin Li            if self._verify_pdtester_connection(port):
808*9c5db199SXin Li                # PDTester PD connection found so exit
809*9c5db199SXin Li                return port
810*9c5db199SXin Li        return None
811*9c5db199SXin Li
812*9c5db199SXin Lidef create_pd_console_utils(console):
813*9c5db199SXin Li    """Factory that detects the proper PDConsole Utils to use for DUT
814*9c5db199SXin Li
815*9c5db199SXin Li    @param console: DUT PD console
816*9c5db199SXin Li
817*9c5db199SXin Li    @returns: An instance of TCPMv1ConsoleUtils or TCPMv2ConsoleUtils
818*9c5db199SXin Li    """
819*9c5db199SXin Li    pd_console_utils = {
820*9c5db199SXin Li        1: TCPMv1ConsoleUtils,
821*9c5db199SXin Li        2: TCPMv2ConsoleUtils,
822*9c5db199SXin Li    }
823*9c5db199SXin Li
824*9c5db199SXin Li    version = PDConsoleUtils(console).get_pd_version()
825*9c5db199SXin Li    logging.debug('%s is TCPM v%s', console, version)
826*9c5db199SXin Li    cls = pd_console_utils[version]
827*9c5db199SXin Li    return cls(console)
828