xref: /aosp_15_r20/external/autotest/server/cros/servo/pd_device.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright 2016 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Lifrom __future__ import absolute_import
7*9c5db199SXin Lifrom __future__ import division
8*9c5db199SXin Lifrom __future__ import print_function
9*9c5db199SXin Li
10*9c5db199SXin Liimport re
11*9c5db199SXin Liimport logging
12*9c5db199SXin Lifrom six.moves import range
13*9c5db199SXin Liimport time
14*9c5db199SXin Li
15*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
16*9c5db199SXin Lifrom autotest_lib.server.cros.servo import pd_console
17*9c5db199SXin Li
18*9c5db199SXin Li
19*9c5db199SXin Liclass PDDevice(object):
20*9c5db199SXin Li    """Base clase for all PD devices
21*9c5db199SXin Li
22*9c5db199SXin Li    This class provides a set of APIs for expected Type C PD required actions
23*9c5db199SXin Li    in TypeC FAFT tests. The base class is specific for Type C devices that
24*9c5db199SXin Li    do not have any console access.
25*9c5db199SXin Li
26*9c5db199SXin Li    """
27*9c5db199SXin Li
28*9c5db199SXin Li    def is_src(self, state=None):
29*9c5db199SXin Li        """Checks if the port is connected as a source
30*9c5db199SXin Li
31*9c5db199SXin Li        """
32*9c5db199SXin Li        raise NotImplementedError(
33*9c5db199SXin Li                'is_src should be implemented in derived class')
34*9c5db199SXin Li
35*9c5db199SXin Li    def is_snk(self, state=None):
36*9c5db199SXin Li        """Checks if the port is connected as a sink
37*9c5db199SXin Li
38*9c5db199SXin Li        @returns None
39*9c5db199SXin Li        """
40*9c5db199SXin Li        raise NotImplementedError(
41*9c5db199SXin Li                'is_snk should be implemented in derived class')
42*9c5db199SXin Li
43*9c5db199SXin Li    def is_connected(self, state=None):
44*9c5db199SXin Li        """Checks if the port is connected
45*9c5db199SXin Li
46*9c5db199SXin Li        @returns True if in a connected state, False otherwise
47*9c5db199SXin Li        """
48*9c5db199SXin Li        return self.is_src(state) or self.is_snk(state)
49*9c5db199SXin Li
50*9c5db199SXin Li    def is_disconnected(self, state=None):
51*9c5db199SXin Li        """Checks if the port is disconnected
52*9c5db199SXin Li
53*9c5db199SXin Li        """
54*9c5db199SXin Li        raise NotImplementedError(
55*9c5db199SXin Li                'is_disconnected should be implemented in derived class')
56*9c5db199SXin Li
57*9c5db199SXin Li    def is_ufp(self):
58*9c5db199SXin Li        """Checks if data role is UFP
59*9c5db199SXin Li
60*9c5db199SXin Li        """
61*9c5db199SXin Li        raise NotImplementedError(
62*9c5db199SXin Li                'is_ufp should be implemented in derived class')
63*9c5db199SXin Li
64*9c5db199SXin Li    def is_dfp(self):
65*9c5db199SXin Li        """Checks if data role is DFP
66*9c5db199SXin Li
67*9c5db199SXin Li        """
68*9c5db199SXin Li        raise NotImplementedError(
69*9c5db199SXin Li                'is_dfp should be implemented in derived class')
70*9c5db199SXin Li
71*9c5db199SXin Li    def is_drp(self):
72*9c5db199SXin Li        """Checks if dual role mode is supported
73*9c5db199SXin Li
74*9c5db199SXin Li        """
75*9c5db199SXin Li        raise NotImplementedError(
76*9c5db199SXin Li                'is_drp should be implemented in derived class')
77*9c5db199SXin Li
78*9c5db199SXin Li    def dr_swap(self):
79*9c5db199SXin Li        """Attempts a data role swap
80*9c5db199SXin Li
81*9c5db199SXin Li        """
82*9c5db199SXin Li        raise NotImplementedError(
83*9c5db199SXin Li               'dr_swap should be implemented in derived class')
84*9c5db199SXin Li
85*9c5db199SXin Li    def pr_swap(self):
86*9c5db199SXin Li        """Attempts a power role swap
87*9c5db199SXin Li
88*9c5db199SXin Li        """
89*9c5db199SXin Li        raise NotImplementedError(
90*9c5db199SXin Li                'pr_swap should be implemented in derived class')
91*9c5db199SXin Li
92*9c5db199SXin Li    def vbus_request(self, voltage):
93*9c5db199SXin Li        """Requests a specific VBUS voltage from SRC
94*9c5db199SXin Li
95*9c5db199SXin Li        @param voltage: requested voltage level (5, 12, 20) in volts
96*9c5db199SXin Li        """
97*9c5db199SXin Li        raise NotImplementedError(
98*9c5db199SXin Li                'vbus_request should be implemented in derived class')
99*9c5db199SXin Li
100*9c5db199SXin Li    def soft_reset(self):
101*9c5db199SXin Li        """Initates a PD soft reset sequence
102*9c5db199SXin Li
103*9c5db199SXin Li        """
104*9c5db199SXin Li        raise NotImplementedError(
105*9c5db199SXin Li                'soft_reset should be implemented in derived class')
106*9c5db199SXin Li
107*9c5db199SXin Li    def hard_reset(self):
108*9c5db199SXin Li        """Initates a PD hard reset sequence
109*9c5db199SXin Li
110*9c5db199SXin Li        """
111*9c5db199SXin Li        raise NotImplementedError(
112*9c5db199SXin Li                'hard_reset should be implemented in derived class')
113*9c5db199SXin Li
114*9c5db199SXin Li    def drp_set(self, mode):
115*9c5db199SXin Li        """Sets dualrole mode
116*9c5db199SXin Li
117*9c5db199SXin Li        @param mode: desired dual role setting (on, off, snk, src)
118*9c5db199SXin Li        """
119*9c5db199SXin Li        raise NotImplementedError(
120*9c5db199SXin Li                'drp_set should be implemented in derived class')
121*9c5db199SXin Li
122*9c5db199SXin Li    def drp_get(self):
123*9c5db199SXin Li        """Gets dualrole mode
124*9c5db199SXin Li
125*9c5db199SXin Li        @returns one of the modes (on, off, snk, src)
126*9c5db199SXin Li        """
127*9c5db199SXin Li        raise NotImplementedError(
128*9c5db199SXin Li                'drp_set should be implemented in derived class')
129*9c5db199SXin Li
130*9c5db199SXin Li    def drp_disconnect_connect(self, disc_time_sec):
131*9c5db199SXin Li        """Force PD disconnect/connect via drp settings
132*9c5db199SXin Li
133*9c5db199SXin Li        @param disc_time_sec: Time in seconds between disconnect and reconnect
134*9c5db199SXin Li        """
135*9c5db199SXin Li        raise NotImplementedError(
136*9c5db199SXin Li                'drp_disconnect_connect should be implemented in derived class'
137*9c5db199SXin Li        )
138*9c5db199SXin Li
139*9c5db199SXin Li    def cc_disconnect_connect(self, disc_time_sec):
140*9c5db199SXin Li        """Force PD disconnect/connect
141*9c5db199SXin Li
142*9c5db199SXin Li        @param disc_time_sec: Time in seconds between disconnect and reconnect
143*9c5db199SXin Li        """
144*9c5db199SXin Li        raise NotImplementedError(
145*9c5db199SXin Li                'cc_disconnect_connect should be implemented in derived class')
146*9c5db199SXin Li
147*9c5db199SXin Li    def get_connected_state_after_cc_reconnect(self, disc_time_sec):
148*9c5db199SXin Li        """Get the connected state after disconnect/reconnect
149*9c5db199SXin Li
150*9c5db199SXin Li        @param disc_time_sec: Time in seconds for disconnect period.
151*9c5db199SXin Li        @returns: The connected PD state.
152*9c5db199SXin Li        """
153*9c5db199SXin Li        raise NotImplementedError(
154*9c5db199SXin Li                'get_connected_state_after_cc_reconnect should be implemented'
155*9c5db199SXin Li                'in derived class')
156*9c5db199SXin Li
157*9c5db199SXin Li
158*9c5db199SXin Liclass PDConsoleDevice(PDDevice):
159*9c5db199SXin Li    """Class for PD devices that have console access
160*9c5db199SXin Li
161*9c5db199SXin Li    This class contains methods for common PD actions for any PD device which
162*9c5db199SXin Li    has UART console access. It inherits the PD device base class. In addition,
163*9c5db199SXin Li    it stores both the UART console and port for the PD device.
164*9c5db199SXin Li    """
165*9c5db199SXin Li
166*9c5db199SXin Li    def __init__(self, console, port, utils):
167*9c5db199SXin Li        """Initialization method
168*9c5db199SXin Li
169*9c5db199SXin Li        @param console: UART console object
170*9c5db199SXin Li        @param port: USB PD port number
171*9c5db199SXin Li        """
172*9c5db199SXin Li        # Save UART console
173*9c5db199SXin Li        self.console = console
174*9c5db199SXin Li        # Instantiate PD utilities used by methods in this class
175*9c5db199SXin Li        self.utils = utils
176*9c5db199SXin Li        # Save the PD port number for this device
177*9c5db199SXin Li        self.port = port
178*9c5db199SXin Li        # Not a PDTester device
179*9c5db199SXin Li        self.is_pdtester = False
180*9c5db199SXin Li
181*9c5db199SXin Li    def get_pd_state(self):
182*9c5db199SXin Li        """Get the state of the PD port"""
183*9c5db199SXin Li        return self.utils.get_pd_state(self.port)
184*9c5db199SXin Li
185*9c5db199SXin Li    def get_pd_role(self):
186*9c5db199SXin Li        """Get the current PD power role (source or sink)
187*9c5db199SXin Li
188*9c5db199SXin Li        @returns: current pd state
189*9c5db199SXin Li        """
190*9c5db199SXin Li        return self.utils.get_pd_role(self.port)
191*9c5db199SXin Li
192*9c5db199SXin Li    def is_pd_flag_set(self, key):
193*9c5db199SXin Li        """Test a bit in PD protocol state flags
194*9c5db199SXin Li
195*9c5db199SXin Li        The flag word contains various PD protocol state information.
196*9c5db199SXin Li        This method allows for a specific flag to be tested.
197*9c5db199SXin Li
198*9c5db199SXin Li        @param key: dict key to retrieve the flag bit mapping
199*9c5db199SXin Li
200*9c5db199SXin Li        @returns True if the bit to be tested is set
201*9c5db199SXin Li        """
202*9c5db199SXin Li        return self.utils.is_pd_flag_set(self.port, key)
203*9c5db199SXin Li
204*9c5db199SXin Li    def is_src(self, state=None):
205*9c5db199SXin Li        """Checks if the port is connected as a source.
206*9c5db199SXin Li
207*9c5db199SXin Li        The "state" argument allows the caller to get_pd_state() once, and then
208*9c5db199SXin Li        evaluate multiple conditions without re-getting the state.
209*9c5db199SXin Li
210*9c5db199SXin Li        @param state: the state to check (None to get current state)
211*9c5db199SXin Li        @returns True if connected as SRC, False otherwise
212*9c5db199SXin Li        """
213*9c5db199SXin Li        return self.utils.is_src_connected(self.port, state)
214*9c5db199SXin Li
215*9c5db199SXin Li    def is_snk(self, state=None):
216*9c5db199SXin Li        """Checks if the port is connected as a sink
217*9c5db199SXin Li
218*9c5db199SXin Li        The "state" argument allows the caller to get_pd_state() once, and then
219*9c5db199SXin Li        evaluate multiple conditions without re-getting the state.
220*9c5db199SXin Li
221*9c5db199SXin Li        @param state: the state to check (None to get current state)
222*9c5db199SXin Li        @returns True if connected as SNK, False otherwise
223*9c5db199SXin Li        """
224*9c5db199SXin Li        return self.utils.is_snk_connected(self.port, state)
225*9c5db199SXin Li
226*9c5db199SXin Li    def is_connected(self, state=None):
227*9c5db199SXin Li        """Checks if the port is connected
228*9c5db199SXin Li
229*9c5db199SXin Li        The "state" argument allows the caller to get_pd_state() once, and then
230*9c5db199SXin Li        evaluate multiple conditions without re-getting the state.
231*9c5db199SXin Li
232*9c5db199SXin Li        @param state: the state to check (None to get current state)
233*9c5db199SXin Li        @returns True if in a connected state, False otherwise
234*9c5db199SXin Li        """
235*9c5db199SXin Li        return self.is_snk(state) or self.is_src(state)
236*9c5db199SXin Li
237*9c5db199SXin Li    def is_disconnected(self, state=None):
238*9c5db199SXin Li        """Checks if the port is disconnected
239*9c5db199SXin Li
240*9c5db199SXin Li        @returns True if in a disconnected state, False otherwise
241*9c5db199SXin Li        """
242*9c5db199SXin Li        return self.utils.is_disconnected(self.port, state)
243*9c5db199SXin Li
244*9c5db199SXin Li    def __repr__(self):
245*9c5db199SXin Li        """String representation of the object"""
246*9c5db199SXin Li        return "<%s %r port %s>" % (
247*9c5db199SXin Li            self.__class__.__name__, self.console.name, self.port)
248*9c5db199SXin Li
249*9c5db199SXin Li    def is_drp(self):
250*9c5db199SXin Li        """Checks if dual role mode is supported
251*9c5db199SXin Li
252*9c5db199SXin Li        @returns True if dual role mode is 'on', False otherwise
253*9c5db199SXin Li        """
254*9c5db199SXin Li        return self.utils.is_pd_dual_role_enabled(self.port)
255*9c5db199SXin Li
256*9c5db199SXin Li    def drp_disconnect_connect(self, disc_time_sec):
257*9c5db199SXin Li        """Disconnect/reconnect using drp mode settings
258*9c5db199SXin Li
259*9c5db199SXin Li        A PD console device doesn't have an explicit connect/disconnect
260*9c5db199SXin Li        command. Instead, the dualrole mode setting is used to force
261*9c5db199SXin Li        disconnects in devices which support this feature. To disconnect,
262*9c5db199SXin Li        force the dualrole mode to be the opposite role of the current
263*9c5db199SXin Li        connected state.
264*9c5db199SXin Li
265*9c5db199SXin Li        @param disc_time_sec: time in seconds to wait to reconnect
266*9c5db199SXin Li
267*9c5db199SXin Li        @returns True if device disconnects, then returns to a connected
268*9c5db199SXin Li        state. False if either step fails.
269*9c5db199SXin Li        """
270*9c5db199SXin Li        # Dualrole mode must be supported
271*9c5db199SXin Li        if self.is_drp() is False:
272*9c5db199SXin Li            logging.warning('Device not DRP capable, unabled to force disconnect')
273*9c5db199SXin Li            return False
274*9c5db199SXin Li        # Force state will be the opposite of current connect state
275*9c5db199SXin Li        if self.is_src():
276*9c5db199SXin Li            drp_mode = 'snk'
277*9c5db199SXin Li            swap_state = self.utils.get_snk_connect_states()
278*9c5db199SXin Li        else:
279*9c5db199SXin Li            drp_mode = 'src'
280*9c5db199SXin Li            swap_state = self.utils.get_src_connect_states()
281*9c5db199SXin Li        # Force disconnect
282*9c5db199SXin Li        self.drp_set(drp_mode)
283*9c5db199SXin Li        # Wait for disconnect time
284*9c5db199SXin Li        time.sleep(disc_time_sec)
285*9c5db199SXin Li        # Verify that the device is disconnected
286*9c5db199SXin Li        disconnect = self.is_disconnected()
287*9c5db199SXin Li
288*9c5db199SXin Li        # If the other device is dualrole, then forcing dualrole mode will
289*9c5db199SXin Li        # only cause the disconnect to appear momentarily and reconnect
290*9c5db199SXin Li        # in the power role forced by the drp_set() call. For this case,
291*9c5db199SXin Li        # the role swap verifies that a disconnect/connect sequence occurred.
292*9c5db199SXin Li        if disconnect == False:
293*9c5db199SXin Li            time.sleep(self.utils.CONNECT_TIME)
294*9c5db199SXin Li            # Connected, verify if power role swap has occurred
295*9c5db199SXin Li            if self.utils.get_pd_state(self.port) in swap_state:
296*9c5db199SXin Li                # Restore default dualrole mode
297*9c5db199SXin Li                self.drp_set('on')
298*9c5db199SXin Li                # Restore orignal power role
299*9c5db199SXin Li                connect = self.pr_swap()
300*9c5db199SXin Li                if connect == False:
301*9c5db199SXin Li                    logging.warning('DRP on both devices, 2nd power swap failed')
302*9c5db199SXin Li                return connect
303*9c5db199SXin Li
304*9c5db199SXin Li        # Restore default dualrole mode
305*9c5db199SXin Li        self.drp_set('on')
306*9c5db199SXin Li        # Allow enough time for protocol state machine
307*9c5db199SXin Li        time.sleep(self.utils.CONNECT_TIME)
308*9c5db199SXin Li        # Check if connected
309*9c5db199SXin Li        connect = self.is_connected()
310*9c5db199SXin Li        logging.info('Disconnect = %r, Connect = %r', disconnect, connect)
311*9c5db199SXin Li        return bool(disconnect and connect)
312*9c5db199SXin Li
313*9c5db199SXin Li    def drp_set(self, mode):
314*9c5db199SXin Li        """Sets dualrole mode
315*9c5db199SXin Li
316*9c5db199SXin Li        @param mode: desired dual role setting (on, off, snk, src)
317*9c5db199SXin Li
318*9c5db199SXin Li        @returns True is set was successful, False otherwise
319*9c5db199SXin Li        """
320*9c5db199SXin Li        # Set desired dualrole mode
321*9c5db199SXin Li        self.utils.set_pd_dualrole(self.port, mode)
322*9c5db199SXin Li        # Get current setting
323*9c5db199SXin Li        current = self.utils.get_pd_dualrole(self.port)
324*9c5db199SXin Li        # Verify that setting is correct
325*9c5db199SXin Li        return bool(mode == current)
326*9c5db199SXin Li
327*9c5db199SXin Li    def drp_get(self):
328*9c5db199SXin Li        """Gets dualrole mode
329*9c5db199SXin Li
330*9c5db199SXin Li        @returns one of the modes (on, off, snk, src)
331*9c5db199SXin Li        """
332*9c5db199SXin Li        return self.utils.get_pd_dualrole(self.port)
333*9c5db199SXin Li
334*9c5db199SXin Li    def try_src(self, enable):
335*9c5db199SXin Li        """Enables/Disables Try.SRC PD protocol setting
336*9c5db199SXin Li
337*9c5db199SXin Li        @param enable: True to enable, False to disable
338*9c5db199SXin Li
339*9c5db199SXin Li        @returns True is setting was successful, False if feature not
340*9c5db199SXin Li        supported by the device, or not set as desired.
341*9c5db199SXin Li        """
342*9c5db199SXin Li        # Create Try.SRC pd command
343*9c5db199SXin Li        cmd = 'pd trysrc %d' % int(enable)
344*9c5db199SXin Li        # TCPMv1 indicates Try.SRC is on by returning 'on'
345*9c5db199SXin Li        # TCPMv2 indicates Try.SRC is on by returning 'Forced ON'
346*9c5db199SXin Li        on_vals = ('on', 'Forced ON')
347*9c5db199SXin Li        # TCPMv1 indicates Try.SRC is off by returning 'off'
348*9c5db199SXin Li        # TCPMv2 indicates Try.SRC is off by returning 'Forced OFF'
349*9c5db199SXin Li        off_vals = ('off', 'Forced OFF')
350*9c5db199SXin Li
351*9c5db199SXin Li        # Try.SRC on/off is output, if supported feature
352*9c5db199SXin Li        regex = ['Try\.SRC\s(%s)|(Parameter)' % ('|'.join(on_vals + off_vals))]
353*9c5db199SXin Li        m = self.utils.send_pd_command_get_output(cmd, regex)
354*9c5db199SXin Li
355*9c5db199SXin Li        # Determine if Try.SRC feature is supported
356*9c5db199SXin Li        if 'Try.SRC' not in m[0][0]:
357*9c5db199SXin Li            logging.warning('Try.SRC not supported on this PD device')
358*9c5db199SXin Li            return False
359*9c5db199SXin Li
360*9c5db199SXin Li        # TrySRC is supported on this PD device, verify setting.
361*9c5db199SXin Li        trysrc_val = m[0][1]
362*9c5db199SXin Li        logging.info('Try.SRC mode = %s', trysrc_val)
363*9c5db199SXin Li        if enable:
364*9c5db199SXin Li            vals = on_vals
365*9c5db199SXin Li        else:
366*9c5db199SXin Li            vals = off_vals
367*9c5db199SXin Li
368*9c5db199SXin Li        return trysrc_val in vals
369*9c5db199SXin Li
370*9c5db199SXin Li    def soft_reset(self):
371*9c5db199SXin Li        """Initates a PD soft reset sequence
372*9c5db199SXin Li
373*9c5db199SXin Li        To verify that a soft reset sequence was initiated, the
374*9c5db199SXin Li        reply message is checked to verify that the reset command
375*9c5db199SXin Li        was acknowledged by its port pair. The connect state should
376*9c5db199SXin Li        be same as it was prior to issuing the reset command.
377*9c5db199SXin Li
378*9c5db199SXin Li        @returns True if the port pair acknowledges the the reset message
379*9c5db199SXin Li        and if following the command, the device returns to the same
380*9c5db199SXin Li        connected state. False otherwise.
381*9c5db199SXin Li        """
382*9c5db199SXin Li        RESET_DELAY = 0.5
383*9c5db199SXin Li        cmd = 'pd %d soft' % self.port
384*9c5db199SXin Li        state_before = self.utils.get_pd_state(self.port)
385*9c5db199SXin Li        reply = self.utils.send_pd_command_get_reply_msg(cmd)
386*9c5db199SXin Li        if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']:
387*9c5db199SXin Li            return False
388*9c5db199SXin Li        time.sleep(RESET_DELAY)
389*9c5db199SXin Li        state_after = self.utils.get_pd_state(self.port)
390*9c5db199SXin Li        return state_before == state_after
391*9c5db199SXin Li
392*9c5db199SXin Li    def hard_reset(self):
393*9c5db199SXin Li        """Initates a PD hard reset sequence
394*9c5db199SXin Li
395*9c5db199SXin Li        To verify that a hard reset sequence was initiated, the
396*9c5db199SXin Li        console ouput is scanned for HARD RST TX. In addition, the connect
397*9c5db199SXin Li        state should be same as it was prior to issuing the reset command.
398*9c5db199SXin Li
399*9c5db199SXin Li        @returns True if the port pair acknowledges that hard reset was
400*9c5db199SXin Li        initiated and if following the command, the device returns to the same
401*9c5db199SXin Li        connected state. False otherwise.
402*9c5db199SXin Li        """
403*9c5db199SXin Li        RESET_DELAY = 1.0
404*9c5db199SXin Li        cmd = 'pd %d hard' % self.port
405*9c5db199SXin Li        state_before = self.utils.get_pd_state(self.port)
406*9c5db199SXin Li        self.utils.enable_pd_console_debug()
407*9c5db199SXin Li        try:
408*9c5db199SXin Li            tcpmv1_pattern = '.*(HARD\sRST\sTX)'
409*9c5db199SXin Li            tcpmv2_pattern = '.*(PE_SNK_Hard_Reset)|.*(PE_SRC_Hard_Reset)'
410*9c5db199SXin Li            pattern = '|'.join((tcpmv1_pattern, tcpmv2_pattern))
411*9c5db199SXin Li            self.utils.send_pd_command_get_output(cmd, [pattern])
412*9c5db199SXin Li        except error.TestFail:
413*9c5db199SXin Li            logging.warning('HARD RST TX not found')
414*9c5db199SXin Li            return False
415*9c5db199SXin Li        finally:
416*9c5db199SXin Li            self.utils.disable_pd_console_debug()
417*9c5db199SXin Li
418*9c5db199SXin Li        time.sleep(RESET_DELAY)
419*9c5db199SXin Li        state_after = self.utils.get_pd_state(self.port)
420*9c5db199SXin Li        return state_before == state_after
421*9c5db199SXin Li
422*9c5db199SXin Li    def pr_swap(self):
423*9c5db199SXin Li        """Attempts a power role swap
424*9c5db199SXin Li
425*9c5db199SXin Li        In order to attempt a power role swap the device must be
426*9c5db199SXin Li        connected and support dualrole mode. Once these two criteria
427*9c5db199SXin Li        are checked a power role command is issued. Following a delay
428*9c5db199SXin Li        to allow for a reconnection the new power role is checked
429*9c5db199SXin Li        against the power role prior to issuing the command.
430*9c5db199SXin Li
431*9c5db199SXin Li        @returns True if the device has swapped power roles, False otherwise.
432*9c5db199SXin Li        """
433*9c5db199SXin Li        # Get starting state
434*9c5db199SXin Li        if not self.is_drp():
435*9c5db199SXin Li            logging.warning('Dualrole Mode not enabled!')
436*9c5db199SXin Li            return False
437*9c5db199SXin Li        if self.is_connected() == False:
438*9c5db199SXin Li            logging.warning('PD contract not established!')
439*9c5db199SXin Li            return False
440*9c5db199SXin Li        current_pr = self.utils.get_pd_state(self.port)
441*9c5db199SXin Li        swap_cmd = 'pd %d swap power' % self.port
442*9c5db199SXin Li        self.utils.send_pd_command(swap_cmd)
443*9c5db199SXin Li        time.sleep(self.utils.CONNECT_TIME)
444*9c5db199SXin Li        new_pr = self.utils.get_pd_state(self.port)
445*9c5db199SXin Li        logging.info('Power swap: %s -> %s', current_pr, new_pr)
446*9c5db199SXin Li        if self.is_connected() == False:
447*9c5db199SXin Li            logging.warning('Device not connected following PR swap attempt.')
448*9c5db199SXin Li            return False
449*9c5db199SXin Li        return current_pr != new_pr
450*9c5db199SXin Li
451*9c5db199SXin Li
452*9c5db199SXin Liclass PDTesterDevice(PDConsoleDevice):
453*9c5db199SXin Li    """Class for PDTester devices
454*9c5db199SXin Li
455*9c5db199SXin Li    This class contains methods for PD funtions which are unique to the
456*9c5db199SXin Li    PDTester board, e.g. Plankton or Servo v4. It inherits all the methods
457*9c5db199SXin Li    for PD console devices.
458*9c5db199SXin Li    """
459*9c5db199SXin Li
460*9c5db199SXin Li    def __init__(self, console, port, utils):
461*9c5db199SXin Li        """Initialization method
462*9c5db199SXin Li
463*9c5db199SXin Li        @param console: UART console for this device
464*9c5db199SXin Li        @param port: USB PD port number
465*9c5db199SXin Li        """
466*9c5db199SXin Li        # Instantiate the PD console object
467*9c5db199SXin Li        super(PDTesterDevice, self).__init__(console, port, utils)
468*9c5db199SXin Li        # Indicate this is PDTester device
469*9c5db199SXin Li        self.is_pdtester = True
470*9c5db199SXin Li
471*9c5db199SXin Li    def _toggle_pdtester_drp(self):
472*9c5db199SXin Li        """Issue 'usbc_action drp' PDTester command
473*9c5db199SXin Li
474*9c5db199SXin Li        @returns value of drp_enable in PDTester FW
475*9c5db199SXin Li        """
476*9c5db199SXin Li        drp_cmd = 'usbc_action drp'
477*9c5db199SXin Li        drp_re = ['DRP\s=\s(\d)']
478*9c5db199SXin Li        # Send DRP toggle command to PDTester and get value of 'drp_enable'
479*9c5db199SXin Li        m = self.utils.send_pd_command_get_output(drp_cmd, drp_re)
480*9c5db199SXin Li        return int(m[0][1])
481*9c5db199SXin Li
482*9c5db199SXin Li    def _enable_pdtester_drp(self):
483*9c5db199SXin Li        """Enable DRP mode on PDTester
484*9c5db199SXin Li
485*9c5db199SXin Li        DRP mode can only be toggled and is not able to be explicitly
486*9c5db199SXin Li        enabled/disabled via the console. Therefore, this method will
487*9c5db199SXin Li        toggle DRP mode until the console reply indicates that this
488*9c5db199SXin Li        mode is enabled. The toggle happens a maximum of two times
489*9c5db199SXin Li        in case this is called when it's already enabled.
490*9c5db199SXin Li
491*9c5db199SXin Li        @returns True when DRP mode is enabled, False if not successful
492*9c5db199SXin Li        """
493*9c5db199SXin Li        for attempt in range(2):
494*9c5db199SXin Li            if self._toggle_pdtester_drp() == True:
495*9c5db199SXin Li                logging.info('PDTester DRP mode enabled')
496*9c5db199SXin Li                return True
497*9c5db199SXin Li        logging.error('PDTester DRP mode set failure')
498*9c5db199SXin Li        return False
499*9c5db199SXin Li
500*9c5db199SXin Li    def _verify_state_sequence(self, states_list, console_log):
501*9c5db199SXin Li        """Compare PD state transitions to expected values
502*9c5db199SXin Li
503*9c5db199SXin Li        @param states_list: list of expected PD state transitions
504*9c5db199SXin Li        @param console_log: console output which contains state names
505*9c5db199SXin Li
506*9c5db199SXin Li        @returns True if the sequence matches, False otherwise
507*9c5db199SXin Li        """
508*9c5db199SXin Li        # For each state in the expected state transiton table, build
509*9c5db199SXin Li        # the regexp and search for it in the state transition log.
510*9c5db199SXin Li        for state in states_list:
511*9c5db199SXin Li            state_regx = r'C{0}\s+[\w]+:?\s({1})'.format(self.port,
512*9c5db199SXin Li                                                         state)
513*9c5db199SXin Li            if re.search(state_regx, console_log) is None:
514*9c5db199SXin Li                return False
515*9c5db199SXin Li        return True
516*9c5db199SXin Li
517*9c5db199SXin Li    def cc_disconnect_connect(self, disc_time_sec):
518*9c5db199SXin Li        """Disconnect/reconnect using PDTester
519*9c5db199SXin Li
520*9c5db199SXin Li        PDTester supports a feature which simulates a USB Type C disconnect
521*9c5db199SXin Li        and reconnect.
522*9c5db199SXin Li
523*9c5db199SXin Li        @param disc_time_sec: Time in seconds for disconnect period.
524*9c5db199SXin Li        """
525*9c5db199SXin Li        DISC_DELAY = 100
526*9c5db199SXin Li        disc_cmd = 'fakedisconnect %d  %d' % (DISC_DELAY,
527*9c5db199SXin Li                                              disc_time_sec * 1000)
528*9c5db199SXin Li        self.utils.send_pd_command(disc_cmd)
529*9c5db199SXin Li
530*9c5db199SXin Li    def get_connected_state_after_cc_reconnect(self, disc_time_sec):
531*9c5db199SXin Li        """Get the connected state after disconnect/reconnect using PDTester
532*9c5db199SXin Li
533*9c5db199SXin Li        PDTester supports a feature which simulates a USB Type C disconnect
534*9c5db199SXin Li        and reconnect. It returns the first connected state (either source or
535*9c5db199SXin Li        sink) after reconnect.
536*9c5db199SXin Li
537*9c5db199SXin Li        @param disc_time_sec: Time in seconds for disconnect period.
538*9c5db199SXin Li        @returns: The connected PD state.
539*9c5db199SXin Li        """
540*9c5db199SXin Li        DISC_DELAY = 100
541*9c5db199SXin Li        disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, disc_time_sec * 1000)
542*9c5db199SXin Li        state_exp = '(C%d)\s+[\w]+:?\s(%s)'
543*9c5db199SXin Li
544*9c5db199SXin Li        disconnected_tuple = self.utils.get_disconnected_states()
545*9c5db199SXin Li        disconnected_states = '|'.join(disconnected_tuple)
546*9c5db199SXin Li        disconnected_exp = state_exp % (self.port, disconnected_states)
547*9c5db199SXin Li
548*9c5db199SXin Li        src_connected_tuple = self.utils.get_src_connect_states()
549*9c5db199SXin Li        snk_connected_tuple = self.utils.get_snk_connect_states()
550*9c5db199SXin Li        connected_states = '|'.join(src_connected_tuple + snk_connected_tuple)
551*9c5db199SXin Li        connected_exp = state_exp % (self.port, connected_states)
552*9c5db199SXin Li
553*9c5db199SXin Li        m = self.utils.send_pd_command_get_output(disc_cmd, [disconnected_exp,
554*9c5db199SXin Li            connected_exp])
555*9c5db199SXin Li        return m[1][2]
556*9c5db199SXin Li
557*9c5db199SXin Li    def drp_disconnect_connect(self, disc_time_sec):
558*9c5db199SXin Li        """Disconnect/reconnect using PDTester
559*9c5db199SXin Li
560*9c5db199SXin Li        Utilize PDTester disconnect/connect utility and verify
561*9c5db199SXin Li        that both disconnect and reconnect actions were successful.
562*9c5db199SXin Li
563*9c5db199SXin Li        @param disc_time_sec: Time in seconds for disconnect period.
564*9c5db199SXin Li
565*9c5db199SXin Li        @returns True if device disconnects, then returns to a connected
566*9c5db199SXin Li        state. False if either step fails.
567*9c5db199SXin Li        """
568*9c5db199SXin Li        self.cc_disconnect_connect(disc_time_sec)
569*9c5db199SXin Li        time.sleep(disc_time_sec / 2)
570*9c5db199SXin Li        disconnect = self.is_disconnected()
571*9c5db199SXin Li        time.sleep(disc_time_sec / 2 + self.utils.CONNECT_TIME)
572*9c5db199SXin Li        connect = self.is_connected()
573*9c5db199SXin Li        return disconnect and connect
574*9c5db199SXin Li
575*9c5db199SXin Li    def drp_set(self, mode):
576*9c5db199SXin Li        """Sets dualrole mode
577*9c5db199SXin Li
578*9c5db199SXin Li        @param mode: desired dual role setting (on, off, snk, src)
579*9c5db199SXin Li
580*9c5db199SXin Li        @returns True if dualrole mode matches the requested value or
581*9c5db199SXin Li        is successfully set to that value. False, otherwise.
582*9c5db199SXin Li        """
583*9c5db199SXin Li        # Get current value of dualrole
584*9c5db199SXin Li        drp = self.utils.get_pd_dualrole(self.port)
585*9c5db199SXin Li        if drp == mode:
586*9c5db199SXin Li            return True
587*9c5db199SXin Li
588*9c5db199SXin Li        if mode == 'on':
589*9c5db199SXin Li            # Setting dpr_enable on PDTester will set dualrole mode to on
590*9c5db199SXin Li            return self._enable_pdtester_drp()
591*9c5db199SXin Li        else:
592*9c5db199SXin Li            # If desired setting is other than 'on', need to ensure that
593*9c5db199SXin Li            # drp mode on PDTester is disabled.
594*9c5db199SXin Li            if drp == 'on':
595*9c5db199SXin Li                # This will turn off drp_enable flag and set dualmode to 'off'
596*9c5db199SXin Li                return self._toggle_pdtester_drp()
597*9c5db199SXin Li            # With drp_enable flag off, can set to desired setting
598*9c5db199SXin Li            return self.utils.set_pd_dualrole(self.port, mode)
599*9c5db199SXin Li
600*9c5db199SXin Li    def _reset(self, cmd, states_list):
601*9c5db199SXin Li        """Initates a PD reset sequence
602*9c5db199SXin Li
603*9c5db199SXin Li        PDTester device has state names available on the console. When
604*9c5db199SXin Li        a soft reset is issued the console log is extracted and then
605*9c5db199SXin Li        compared against the expected state transisitons.
606*9c5db199SXin Li
607*9c5db199SXin Li        @param cmd: reset type (soft or hard)
608*9c5db199SXin Li        @param states_list: list of expected PD state transitions
609*9c5db199SXin Li
610*9c5db199SXin Li        @returns True if state transitions match, False otherwise
611*9c5db199SXin Li        """
612*9c5db199SXin Li        # Want to grab all output until either SRC_READY or SNK_READY
613*9c5db199SXin Li        reply_exp = ['(.*)(C%d)\s+[\w]+:?\s([\w]+_READY)' % self.port]
614*9c5db199SXin Li        m = self.utils.send_pd_command_get_output(cmd, reply_exp)
615*9c5db199SXin Li        return self._verify_state_sequence(states_list, m[0][0])
616*9c5db199SXin Li
617*9c5db199SXin Li    def soft_reset(self):
618*9c5db199SXin Li        """Initates a PD soft reset sequence
619*9c5db199SXin Li
620*9c5db199SXin Li        @returns True if state transitions match, False otherwise
621*9c5db199SXin Li        """
622*9c5db199SXin Li        snk_reset_states = [
623*9c5db199SXin Li            'SOFT_RESET',
624*9c5db199SXin Li            'SNK_DISCOVERY',
625*9c5db199SXin Li            'SNK_REQUESTED',
626*9c5db199SXin Li            'SNK_TRANSITION',
627*9c5db199SXin Li            'SNK_READY'
628*9c5db199SXin Li        ]
629*9c5db199SXin Li
630*9c5db199SXin Li        src_reset_states = [
631*9c5db199SXin Li            'SOFT_RESET',
632*9c5db199SXin Li            'SRC_DISCOVERY',
633*9c5db199SXin Li            'SRC_NEGOCIATE',
634*9c5db199SXin Li            'SRC_ACCEPTED',
635*9c5db199SXin Li            'SRC_POWERED',
636*9c5db199SXin Li            'SRC_TRANSITION',
637*9c5db199SXin Li            'SRC_READY'
638*9c5db199SXin Li        ]
639*9c5db199SXin Li
640*9c5db199SXin Li        if self.is_src():
641*9c5db199SXin Li            states_list = src_reset_states
642*9c5db199SXin Li        elif self.is_snk():
643*9c5db199SXin Li            states_list = snk_reset_states
644*9c5db199SXin Li        else:
645*9c5db199SXin Li            raise error.TestFail('Port Pair not in a connected state')
646*9c5db199SXin Li
647*9c5db199SXin Li        cmd = 'pd %d soft' % self.port
648*9c5db199SXin Li        return self._reset(cmd, states_list)
649*9c5db199SXin Li
650*9c5db199SXin Li    def hard_reset(self):
651*9c5db199SXin Li        """Initates a PD hard reset sequence
652*9c5db199SXin Li
653*9c5db199SXin Li        @returns True if state transitions match, False otherwise
654*9c5db199SXin Li        """
655*9c5db199SXin Li        snk_reset_states = [
656*9c5db199SXin Li            'HARD_RESET_SEND',
657*9c5db199SXin Li            'HARD_RESET_EXECUTE',
658*9c5db199SXin Li            'SNK_HARD_RESET_RECOVER',
659*9c5db199SXin Li            'SNK_DISCOVERY',
660*9c5db199SXin Li            'SNK_REQUESTED',
661*9c5db199SXin Li            'SNK_TRANSITION',
662*9c5db199SXin Li            'SNK_READY'
663*9c5db199SXin Li        ]
664*9c5db199SXin Li
665*9c5db199SXin Li        src_reset_states = [
666*9c5db199SXin Li            'HARD_RESET_SEND',
667*9c5db199SXin Li            'HARD_RESET_EXECUTE',
668*9c5db199SXin Li            'SRC_HARD_RESET_RECOVER',
669*9c5db199SXin Li            'SRC_DISCOVERY',
670*9c5db199SXin Li            'SRC_NEGOCIATE',
671*9c5db199SXin Li            'SRC_ACCEPTED',
672*9c5db199SXin Li            'SRC_POWERED',
673*9c5db199SXin Li            'SRC_TRANSITION',
674*9c5db199SXin Li            'SRC_READY'
675*9c5db199SXin Li        ]
676*9c5db199SXin Li
677*9c5db199SXin Li        if self.is_src():
678*9c5db199SXin Li            states_list = src_reset_states
679*9c5db199SXin Li        elif self.is_snk():
680*9c5db199SXin Li            states_list = snk_reset_states
681*9c5db199SXin Li        else:
682*9c5db199SXin Li            raise error.TestFail('Port Pair not in a connected state')
683*9c5db199SXin Li
684*9c5db199SXin Li        cmd = 'pd %d hard' % self.port
685*9c5db199SXin Li        return self._reset(cmd, states_list)
686*9c5db199SXin Li
687*9c5db199SXin Li
688*9c5db199SXin Liclass PDPortPartner(object):
689*9c5db199SXin Li    """Methods used to instantiate PD device objects
690*9c5db199SXin Li
691*9c5db199SXin Li    This class is initalized with a list of servo consoles. It
692*9c5db199SXin Li    contains methods to determine if USB PD devices are accessible
693*9c5db199SXin Li    via the consoles and attempts to determine USB PD port partners.
694*9c5db199SXin Li    A PD device is USB PD port specific, a single console may access
695*9c5db199SXin Li    multiple PD devices.
696*9c5db199SXin Li
697*9c5db199SXin Li    """
698*9c5db199SXin Li
699*9c5db199SXin Li    def __init__(self, consoles):
700*9c5db199SXin Li        """Initialization method
701*9c5db199SXin Li
702*9c5db199SXin Li        @param consoles: list of servo consoles
703*9c5db199SXin Li        """
704*9c5db199SXin Li        self.consoles = consoles
705*9c5db199SXin Li
706*9c5db199SXin Li    def __repr__(self):
707*9c5db199SXin Li        """String representation of the object"""
708*9c5db199SXin Li        return "<%s %r>" % (self.__class__.__name__, self.consoles)
709*9c5db199SXin Li
710*9c5db199SXin Li    def _send_pd_state(self, port, console):
711*9c5db199SXin Li        """Tests if PD device exists on a given port number
712*9c5db199SXin Li
713*9c5db199SXin Li        @param port: USB PD port number to try
714*9c5db199SXin Li        @param console: servo UART console
715*9c5db199SXin Li
716*9c5db199SXin Li        @returns True if 'pd <port> state' command gives a valid
717*9c5db199SXin Li        response, False otherwise
718*9c5db199SXin Li        """
719*9c5db199SXin Li        cmd = 'pd %d state' % port
720*9c5db199SXin Li        regex = r'(Port C\d)|(Parameter)'
721*9c5db199SXin Li        m = console.send_command_get_output(cmd, [regex])
722*9c5db199SXin Li        # If PD port exists, then output will be Port C0 or C1
723*9c5db199SXin Li        regex = r'Port C{0}'.format(port)
724*9c5db199SXin Li        if re.search(regex, m[0][0]):
725*9c5db199SXin Li            return True
726*9c5db199SXin Li        return False
727*9c5db199SXin Li
728*9c5db199SXin Li    def _find_num_pd_ports(self, console):
729*9c5db199SXin Li        """Determine number of PD ports for a given console
730*9c5db199SXin Li
731*9c5db199SXin Li        @param console: uart console accssed via servo
732*9c5db199SXin Li
733*9c5db199SXin Li        @returns: number of PD ports accessible via console
734*9c5db199SXin Li        """
735*9c5db199SXin Li        MAX_PORTS = 2
736*9c5db199SXin Li        num_ports = 0
737*9c5db199SXin Li        for port in range(MAX_PORTS):
738*9c5db199SXin Li            if self._send_pd_state(port, console):
739*9c5db199SXin Li                num_ports += 1
740*9c5db199SXin Li        return num_ports
741*9c5db199SXin Li
742*9c5db199SXin Li    def _is_pd_console(self, console):
743*9c5db199SXin Li        """Check if pd option exists in console
744*9c5db199SXin Li
745*9c5db199SXin Li        @param console: uart console accssed via servo
746*9c5db199SXin Li
747*9c5db199SXin Li        @returns: True if 'pd' is found, False otherwise
748*9c5db199SXin Li        """
749*9c5db199SXin Li        try:
750*9c5db199SXin Li            m = console.send_command_get_output('help', [r'(pd)\s+'])
751*9c5db199SXin Li            return True
752*9c5db199SXin Li        except error.TestFail:
753*9c5db199SXin Li            return False
754*9c5db199SXin Li
755*9c5db199SXin Li    def _is_pdtester_console(self, console):
756*9c5db199SXin Li        """Check for PDTester console
757*9c5db199SXin Li
758*9c5db199SXin Li        This method looks for a console command option 'usbc_action' which
759*9c5db199SXin Li        is unique to PDTester PD devices.
760*9c5db199SXin Li
761*9c5db199SXin Li        @param console: uart console accssed via servo
762*9c5db199SXin Li
763*9c5db199SXin Li        @returns True if usbc_action command is present, False otherwise
764*9c5db199SXin Li        """
765*9c5db199SXin Li        try:
766*9c5db199SXin Li            m = console.send_command_get_output('help', [r'(usbc_action)'])
767*9c5db199SXin Li            return True
768*9c5db199SXin Li        except error.TestFail:
769*9c5db199SXin Li            return False
770*9c5db199SXin Li
771*9c5db199SXin Li    def _check_port_pair(self, port1, port2):
772*9c5db199SXin Li        """Check if two PD devices could be connected
773*9c5db199SXin Li
774*9c5db199SXin Li        If two USB PD devices are connected, then they should be in
775*9c5db199SXin Li        either the SRC_READY or SNK_READY states and have opposite
776*9c5db199SXin Li        power roles. In addition, they must be on different servo
777*9c5db199SXin Li        consoles.
778*9c5db199SXin Li
779*9c5db199SXin Li        @param: list of two possible PD port parters
780*9c5db199SXin Li
781*9c5db199SXin Li        @returns True if not the same console and both PD devices
782*9c5db199SXin Li        are a plausible pair based only on their PD states.
783*9c5db199SXin Li        """
784*9c5db199SXin Li        # Don't test if on the same servo console
785*9c5db199SXin Li        if port1.console == port2.console:
786*9c5db199SXin Li            logging.info("PD Devices are on same platform -> can't be a pair")
787*9c5db199SXin Li            return False
788*9c5db199SXin Li
789*9c5db199SXin Li        state1 = port1.get_pd_state()
790*9c5db199SXin Li        port1_is_snk = port1.is_snk(state1)
791*9c5db199SXin Li        port1_is_src = port1.is_src(state1)
792*9c5db199SXin Li
793*9c5db199SXin Li        state2 = port2.get_pd_state()
794*9c5db199SXin Li        port2_is_snk = port2.is_snk(state2)
795*9c5db199SXin Li        port2_is_src = port2.is_src(state2)
796*9c5db199SXin Li
797*9c5db199SXin Li        # Must be SRC <--> SNK or SNK <--> SRC
798*9c5db199SXin Li        if (port1_is_src and port2_is_snk) or (port1_is_snk and port2_is_src):
799*9c5db199SXin Li            logging.debug("SRC+SNK pair: %s (%s) <--> (%s) %s",
800*9c5db199SXin Li                          port1, state1, state2, port2)
801*9c5db199SXin Li            return True
802*9c5db199SXin Li        else:
803*9c5db199SXin Li            logging.debug("Not a SRC+SNK pair: %s (%s) <--> (%s) %s",
804*9c5db199SXin Li                          port1, state1, state2, port2)
805*9c5db199SXin Li            return False
806*9c5db199SXin Li
807*9c5db199SXin Li    def _verify_pdtester_connection(self, tester_port, dut_port):
808*9c5db199SXin Li        """Verify DUT to PDTester PD connection
809*9c5db199SXin Li
810*9c5db199SXin Li        This method checks for a PDTester PD connection for the
811*9c5db199SXin Li        given port by first verifying if a PD connection is present.
812*9c5db199SXin Li        If found, then it uses a PDTester feature to force a PD disconnect.
813*9c5db199SXin Li        If the port is no longer in the connected state, and following
814*9c5db199SXin Li        a delay, is found to be back in the connected state, then
815*9c5db199SXin Li        a DUT pd to PDTester connection is verified.
816*9c5db199SXin Li
817*9c5db199SXin Li        @param dev_pair: list of two PD devices
818*9c5db199SXin Li
819*9c5db199SXin Li        @returns True if DUT to PDTester pd connection is verified
820*9c5db199SXin Li        """
821*9c5db199SXin Li        DISC_CHECK_TIME = 10
822*9c5db199SXin Li        DISC_WAIT_TIME = 20
823*9c5db199SXin Li        CONNECT_TIME = 4
824*9c5db199SXin Li
825*9c5db199SXin Li        logging.info("Check: %s <--> %s", tester_port, dut_port)
826*9c5db199SXin Li
827*9c5db199SXin Li        if not self._check_port_pair(tester_port, dut_port):
828*9c5db199SXin Li            return False
829*9c5db199SXin Li
830*9c5db199SXin Li        # Force PD disconnect
831*9c5db199SXin Li        logging.debug('Disconnecting to check if devices are partners')
832*9c5db199SXin Li        tester_port.cc_disconnect_connect(DISC_WAIT_TIME)
833*9c5db199SXin Li        time.sleep(DISC_CHECK_TIME)
834*9c5db199SXin Li
835*9c5db199SXin Li        # Verify that both devices are now disconnected
836*9c5db199SXin Li        tester_state = tester_port.get_pd_state()
837*9c5db199SXin Li        dut_state = dut_port.get_pd_state()
838*9c5db199SXin Li        logging.debug("Recheck: %s (%s) <--> (%s) %s",
839*9c5db199SXin Li                      tester_port, tester_state, dut_state, dut_port)
840*9c5db199SXin Li
841*9c5db199SXin Li        if not (tester_port.is_disconnected(tester_state) and
842*9c5db199SXin Li                dut_port.is_disconnected(dut_state)):
843*9c5db199SXin Li            logging.info("Ports did not disconnect at the same time, so"
844*9c5db199SXin Li                         " they aren't considered a pair.")
845*9c5db199SXin Li            # Delay to allow non-pair devices to reconnect
846*9c5db199SXin Li            time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME)
847*9c5db199SXin Li            return False
848*9c5db199SXin Li
849*9c5db199SXin Li        logging.debug('Pair disconnected.  Waiting for reconnect...')
850*9c5db199SXin Li
851*9c5db199SXin Li        # Allow enough time for reconnection
852*9c5db199SXin Li        time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME)
853*9c5db199SXin Li        if self._check_port_pair(tester_port, dut_port):
854*9c5db199SXin Li            # Have verified a pd disconnect/reconnect sequence
855*9c5db199SXin Li            logging.info('PDTester <--> DUT pair found')
856*9c5db199SXin Li            return True
857*9c5db199SXin Li
858*9c5db199SXin Li        logging.info("Ports did not reconnect at the same time, so"
859*9c5db199SXin Li                     " they aren't considered a pair.")
860*9c5db199SXin Li        return False
861*9c5db199SXin Li
862*9c5db199SXin Li    def identify_pd_devices(self):
863*9c5db199SXin Li        """Instantiate PD devices present in test setup
864*9c5db199SXin Li
865*9c5db199SXin Li        @return: list of 2 PD devices if a DUT <-> PDTester found.
866*9c5db199SXin Li                 If not found, then returns an empty list.
867*9c5db199SXin Li        """
868*9c5db199SXin Li        tester_devports = []
869*9c5db199SXin Li        dut_devports = []
870*9c5db199SXin Li
871*9c5db199SXin Li        # For each possible uart console, check to see if a PD console
872*9c5db199SXin Li        # is present and determine the number of PD ports.
873*9c5db199SXin Li        for console in self.consoles:
874*9c5db199SXin Li            if self._is_pd_console(console):
875*9c5db199SXin Li                is_tester = self._is_pdtester_console(console)
876*9c5db199SXin Li                num_ports = self._find_num_pd_ports(console)
877*9c5db199SXin Li                # For each PD port that can be accessed via the console,
878*9c5db199SXin Li                # instantiate either PDConsole or PDTester device.
879*9c5db199SXin Li                for port in range(num_ports):
880*9c5db199SXin Li                    if is_tester:
881*9c5db199SXin Li                        logging.info('PDTesterDevice on %s port %d',
882*9c5db199SXin Li                                     console.name, port)
883*9c5db199SXin Li                        tester_utils = pd_console.create_pd_console_utils(
884*9c5db199SXin Li                                       console)
885*9c5db199SXin Li                        tester_devports.append(PDTesterDevice(console,
886*9c5db199SXin Li                                                    port, tester_utils))
887*9c5db199SXin Li                    else:
888*9c5db199SXin Li                        logging.info('PDConsoleDevice on %s port %d',
889*9c5db199SXin Li                                     console.name, port)
890*9c5db199SXin Li                        dut_utils = pd_console.create_pd_console_utils(console)
891*9c5db199SXin Li                        dut_devports.append(PDConsoleDevice(console,
892*9c5db199SXin Li                                                    port, dut_utils))
893*9c5db199SXin Li
894*9c5db199SXin Li        if not tester_devports:
895*9c5db199SXin Li            logging.error('The specified consoles did not include any'
896*9c5db199SXin Li                          ' PD testers: %s', self.consoles)
897*9c5db199SXin Li
898*9c5db199SXin Li        if not dut_devports:
899*9c5db199SXin Li            logging.error('The specified consoles did not contain any'
900*9c5db199SXin Li                          ' DUTs: %s', self.consoles)
901*9c5db199SXin Li
902*9c5db199SXin Li        # Determine PD port partners in the list of PD devices. Note that
903*9c5db199SXin Li        # there can be PD devices which are not accessible via a uart console,
904*9c5db199SXin Li        # but are connected to a PD port which is accessible.
905*9c5db199SXin Li        for tester in reversed(tester_devports):
906*9c5db199SXin Li            for dut in dut_devports:
907*9c5db199SXin Li                if tester.console == dut.console:
908*9c5db199SXin Li                    # PD Devices are on same servo console -> can't be a pair
909*9c5db199SXin Li                    continue
910*9c5db199SXin Li                if self._verify_pdtester_connection(tester, dut):
911*9c5db199SXin Li                    dut_devports.remove(dut)
912*9c5db199SXin Li                    return [tester, dut]
913*9c5db199SXin Li
914*9c5db199SXin Li        return []
915