xref: /aosp_15_r20/external/autotest/client/cros/ec.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright 2015 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Lifrom __future__ import absolute_import
6*9c5db199SXin Lifrom __future__ import division
7*9c5db199SXin Lifrom __future__ import print_function
8*9c5db199SXin Li
9*9c5db199SXin Liimport logging
10*9c5db199SXin Liimport os
11*9c5db199SXin Liimport re
12*9c5db199SXin Lifrom six.moves import range
13*9c5db199SXin Liimport time
14*9c5db199SXin Li
15*9c5db199SXin Lifrom autotest_lib.client.bin import utils
16*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
17*9c5db199SXin Li
18*9c5db199SXin Li# en-US key matrix (from "kb membrane pin matrix.pdf")
19*9c5db199SXin LiKEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
20*9c5db199SXin Li             '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
21*9c5db199SXin Li             '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
22*9c5db199SXin Li             'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
23*9c5db199SXin Li             'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
24*9c5db199SXin Li             '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
25*9c5db199SXin Li             'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
26*9c5db199SXin Li             ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
27*9c5db199SXin Li             'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
28*9c5db199SXin Li             '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
29*9c5db199SXin Li             '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
30*9c5db199SXin Li             '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
31*9c5db199SXin Li             '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
32*9c5db199SXin Li             '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
33*9c5db199SXin Li             '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
34*9c5db199SXin Li             '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
35*9c5db199SXin Li             '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
36*9c5db199SXin Li             '<left>': (7, 12)}
37*9c5db199SXin Li
38*9c5db199SXin Li
39*9c5db199SXin Lidef has_ectool():
40*9c5db199SXin Li    """Determine if ectool shell command is present.
41*9c5db199SXin Li
42*9c5db199SXin Li    Returns:
43*9c5db199SXin Li        boolean true if avail, false otherwise.
44*9c5db199SXin Li    """
45*9c5db199SXin Li    cmd = 'which ectool'
46*9c5db199SXin Li    return (utils.system(cmd, ignore_status=True) == 0)
47*9c5db199SXin Li
48*9c5db199SXin Li
49*9c5db199SXin Lidef has_cros_ec():
50*9c5db199SXin Li    """Check whether DUT has chromium ec or not.
51*9c5db199SXin Li
52*9c5db199SXin Li    Returns:
53*9c5db199SXin Li        boolean whether device has ec or not.
54*9c5db199SXin Li    """
55*9c5db199SXin Li    return os.path.exists('/dev/cros_ec')
56*9c5db199SXin Li
57*9c5db199SXin Li
58*9c5db199SXin Liclass ECError(Exception):
59*9c5db199SXin Li    """Base class for a failure when communicating with EC."""
60*9c5db199SXin Li    pass
61*9c5db199SXin Li
62*9c5db199SXin Li
63*9c5db199SXin Liclass EC_Common(object):
64*9c5db199SXin Li    """Class for EC common.
65*9c5db199SXin Li
66*9c5db199SXin Li    This incredibly brief base class is intended to encapsulate common elements
67*9c5db199SXin Li    across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
68*9c5db199SXin Li    that includes only the use of ectool.
69*9c5db199SXin Li    """
70*9c5db199SXin Li
71*9c5db199SXin Li    def __init__(self, target='cros_ec'):
72*9c5db199SXin Li        """Constructor.
73*9c5db199SXin Li
74*9c5db199SXin Li        @param target: target name of ec to communicate with.
75*9c5db199SXin Li        """
76*9c5db199SXin Li        if not has_ectool():
77*9c5db199SXin Li            ec_info = utils.system_output("mosys ec info",
78*9c5db199SXin Li                                          ignore_status=True)
79*9c5db199SXin Li            logging.warning("Ectool absent on this platform ( %s )",
80*9c5db199SXin Li                         ec_info)
81*9c5db199SXin Li            raise error.TestNAError("Platform doesn't support ectool")
82*9c5db199SXin Li        self._target = target
83*9c5db199SXin Li
84*9c5db199SXin Li    def ec_command(self, cmd, **kwargs):
85*9c5db199SXin Li        """Executes ec command and returns results.
86*9c5db199SXin Li
87*9c5db199SXin Li        @param cmd: string of command to execute.
88*9c5db199SXin Li        @param kwargs: optional params passed to utils.system_output
89*9c5db199SXin Li
90*9c5db199SXin Li        @returns: string of results from ec command.
91*9c5db199SXin Li        """
92*9c5db199SXin Li        full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
93*9c5db199SXin Li        logging.debug('Command: %s', full_cmd)
94*9c5db199SXin Li        result = utils.system_output(full_cmd, **kwargs)
95*9c5db199SXin Li        logging.debug('Result: %s', result)
96*9c5db199SXin Li        return result
97*9c5db199SXin Li
98*9c5db199SXin Li
99*9c5db199SXin Liclass EC(EC_Common):
100*9c5db199SXin Li    """Class for CrOS embedded controller (EC)."""
101*9c5db199SXin Li    HELLO_RE = "EC says hello"
102*9c5db199SXin Li    GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
103*9c5db199SXin Li    SET_FANSPEED_RE = "Fan target RPM set."
104*9c5db199SXin Li    TEMP_SENSOR_TEMP_RE = "([0-9]+) K"
105*9c5db199SXin Li    # <sensor idx>: <sensor type> <sensor name>
106*9c5db199SXin Li    TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)"
107*9c5db199SXin Li    TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
108*9c5db199SXin Li    # For battery, check we can see a non-zero capacity value.
109*9c5db199SXin Li    BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
110*9c5db199SXin Li    LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
111*9c5db199SXin Li
112*9c5db199SXin Li    def __init__(self):
113*9c5db199SXin Li        """Constructor."""
114*9c5db199SXin Li        super(EC, self).__init__()
115*9c5db199SXin Li        self._temperature_dict = None
116*9c5db199SXin Li
117*9c5db199SXin Li    def hello(self, **kwargs):
118*9c5db199SXin Li        """Test EC hello command.
119*9c5db199SXin Li
120*9c5db199SXin Li        @param kwargs: optional params passed to utils.system_output
121*9c5db199SXin Li
122*9c5db199SXin Li        @returns True if success False otherwise.
123*9c5db199SXin Li        """
124*9c5db199SXin Li        response = self.ec_command('hello', **kwargs)
125*9c5db199SXin Li        return (re.search(self.HELLO_RE, response) is not None)
126*9c5db199SXin Li
127*9c5db199SXin Li    def auto_fan_ctrl(self):
128*9c5db199SXin Li        """Turns auto fan ctrl on.
129*9c5db199SXin Li
130*9c5db199SXin Li        @returns True if success False otherwise.
131*9c5db199SXin Li        """
132*9c5db199SXin Li        response = self.ec_command('autofanctrl')
133*9c5db199SXin Li        logging.info('Turned on auto fan control.')
134*9c5db199SXin Li        return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
135*9c5db199SXin Li
136*9c5db199SXin Li    def get_fanspeed(self):
137*9c5db199SXin Li        """Gets fanspeed.
138*9c5db199SXin Li
139*9c5db199SXin Li        @raises error.TestError if regexp fails to match.
140*9c5db199SXin Li
141*9c5db199SXin Li        @returns integer of fan speed RPM.
142*9c5db199SXin Li        """
143*9c5db199SXin Li        response = self.ec_command('pwmgetfanrpm')
144*9c5db199SXin Li        match = re.search(self.GET_FANSPEED_RE, response)
145*9c5db199SXin Li        if not match:
146*9c5db199SXin Li            raise error.TestError('Unable to read fan speed')
147*9c5db199SXin Li
148*9c5db199SXin Li        rpm = int(match.group(1))
149*9c5db199SXin Li        logging.info('Fan speed: %d', rpm)
150*9c5db199SXin Li        return rpm
151*9c5db199SXin Li
152*9c5db199SXin Li    def set_fanspeed(self, rpm):
153*9c5db199SXin Li        """Sets fan speed.
154*9c5db199SXin Li
155*9c5db199SXin Li        @param rpm: integer of fan speed RPM to set
156*9c5db199SXin Li
157*9c5db199SXin Li        @returns True if success False otherwise.
158*9c5db199SXin Li        """
159*9c5db199SXin Li        response = self.ec_command('pwmsetfanrpm %d' % rpm)
160*9c5db199SXin Li        logging.info('Set fan speed: %d', rpm)
161*9c5db199SXin Li        return (re.search(self.SET_FANSPEED_RE, response) is not None)
162*9c5db199SXin Li
163*9c5db199SXin Li    def _get_temperature_dict(self):
164*9c5db199SXin Li        """Read EC temperature name and idx into a dict.
165*9c5db199SXin Li
166*9c5db199SXin Li        @returns dict where key=<sensor name>, value =<sensor idx>
167*9c5db199SXin Li        """
168*9c5db199SXin Li        # The sensor (name, idx) mapping does not change.
169*9c5db199SXin Li        if self._temperature_dict:
170*9c5db199SXin Li            return self._temperature_dict
171*9c5db199SXin Li
172*9c5db199SXin Li        temperature_dict = {}
173*9c5db199SXin Li        response = self.ec_command('tempsinfo all')
174*9c5db199SXin Li        for rline in response.split('\n'):
175*9c5db199SXin Li            match = re.search(self.TEMP_SENSOR_INFO_RE, rline)
176*9c5db199SXin Li            if match:
177*9c5db199SXin Li                temperature_dict[match.group(3)] = int(match.group(1))
178*9c5db199SXin Li
179*9c5db199SXin Li        self._temperature_dict = temperature_dict
180*9c5db199SXin Li        return temperature_dict
181*9c5db199SXin Li
182*9c5db199SXin Li    def get_temperature(self, idx=None, name=None):
183*9c5db199SXin Li        """Gets temperature from idx sensor.
184*9c5db199SXin Li
185*9c5db199SXin Li        Reads temperature either directly if idx is provided or by discovering
186*9c5db199SXin Li        idx using name.
187*9c5db199SXin Li
188*9c5db199SXin Li        @param idx:  integer of temp sensor to read.  Default=None
189*9c5db199SXin Li        @param name: string of temp sensor to read.  Default=None.
190*9c5db199SXin Li            For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro
191*9c5db199SXin Li
192*9c5db199SXin Li        @raises ECError if fails to find idx of name.
193*9c5db199SXin Li        @raises error.TestError if fails to read sensor or fails to identify
194*9c5db199SXin Li        sensor to read from idx & name param.
195*9c5db199SXin Li
196*9c5db199SXin Li        @returns integer of temperature reading in degrees Kelvin.
197*9c5db199SXin Li        """
198*9c5db199SXin Li        if idx is None:
199*9c5db199SXin Li            temperature_dict = self._get_temperature_dict()
200*9c5db199SXin Li            if name in temperature_dict:
201*9c5db199SXin Li                idx = temperature_dict[name]
202*9c5db199SXin Li            else:
203*9c5db199SXin Li                raise ECError('Finding temp idx for name %s' % name)
204*9c5db199SXin Li
205*9c5db199SXin Li        response = self.ec_command('temps %d' % idx)
206*9c5db199SXin Li        match = re.search(self.TEMP_SENSOR_TEMP_RE, response)
207*9c5db199SXin Li        if not match:
208*9c5db199SXin Li            raise error.TestError('Reading temperature idx %d' % idx)
209*9c5db199SXin Li
210*9c5db199SXin Li        return int(match.group(1))
211*9c5db199SXin Li
212*9c5db199SXin Li    def get_battery(self):
213*9c5db199SXin Li        """Get battery presence (design capacity found).
214*9c5db199SXin Li
215*9c5db199SXin Li        @returns True if success False otherwise.
216*9c5db199SXin Li        """
217*9c5db199SXin Li        try:
218*9c5db199SXin Li            response = self.ec_command('battery')
219*9c5db199SXin Li        except error.CmdError:
220*9c5db199SXin Li            raise ECError('calling EC battery command')
221*9c5db199SXin Li
222*9c5db199SXin Li        return (re.search(self.BATTERY_RE, response) is not None)
223*9c5db199SXin Li
224*9c5db199SXin Li    def get_lightbar(self):
225*9c5db199SXin Li        """Test lightbar.
226*9c5db199SXin Li
227*9c5db199SXin Li        @returns True if success False otherwise.
228*9c5db199SXin Li        """
229*9c5db199SXin Li        self.ec_command('lightbar on')
230*9c5db199SXin Li        self.ec_command('lightbar init')
231*9c5db199SXin Li        self.ec_command('lightbar 4 255 255 255')
232*9c5db199SXin Li        response = self.ec_command('lightbar')
233*9c5db199SXin Li        self.ec_command('lightbar off')
234*9c5db199SXin Li        return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
235*9c5db199SXin Li
236*9c5db199SXin Li    def key_press(self, key):
237*9c5db199SXin Li        """Emit key down and up signal of the keyboard.
238*9c5db199SXin Li
239*9c5db199SXin Li        @param key: name of a key defined in KEYMATRIX.
240*9c5db199SXin Li        """
241*9c5db199SXin Li        self.key_down(key)
242*9c5db199SXin Li        self.key_up(key)
243*9c5db199SXin Li
244*9c5db199SXin Li    def _key_action(self, key, action_type):
245*9c5db199SXin Li        if not key in KEYMATRIX:
246*9c5db199SXin Li            raise error.TestError('Unknown key: ' + key)
247*9c5db199SXin Li        row, col = KEYMATRIX[key]
248*9c5db199SXin Li        self.ec_command('kbpress %d %d %d' % (row, col, action_type))
249*9c5db199SXin Li
250*9c5db199SXin Li    def key_down(self, key):
251*9c5db199SXin Li        """Emit key down signal of the keyboard.
252*9c5db199SXin Li
253*9c5db199SXin Li        @param key: name of a key defined in KEYMATRIX.
254*9c5db199SXin Li        """
255*9c5db199SXin Li        self._key_action(key, 1)
256*9c5db199SXin Li
257*9c5db199SXin Li    def key_up(self, key):
258*9c5db199SXin Li        """Emit key up signal of the keyboard.
259*9c5db199SXin Li
260*9c5db199SXin Li        @param key: name of a key defined in KEYMATRIX.
261*9c5db199SXin Li        """
262*9c5db199SXin Li        self._key_action(key, 0)
263*9c5db199SXin Li
264*9c5db199SXin Li
265*9c5db199SXin Liclass EC_USBPD_Port(EC_Common):
266*9c5db199SXin Li    """Class for CrOS embedded controller for USB-PD Port.
267*9c5db199SXin Li
268*9c5db199SXin Li    Public attributes:
269*9c5db199SXin Li        index: integer of USB type-C port index.
270*9c5db199SXin Li
271*9c5db199SXin Li    Public Methods:
272*9c5db199SXin Li        is_dfp: Determine if data role is Downstream Facing Port (DFP).
273*9c5db199SXin Li        is_amode_supported: Check if alternate mode is supported by port.
274*9c5db199SXin Li        is_amode_entered: Check if alternate mode is entered.
275*9c5db199SXin Li        set_amode: Set an alternate mode.
276*9c5db199SXin Li
277*9c5db199SXin Li    Private attributes:
278*9c5db199SXin Li        _port: integer of USB type-C port id.
279*9c5db199SXin Li        _port_info: holds usbpd protocol info.
280*9c5db199SXin Li        _amodes: holds alternate mode info.
281*9c5db199SXin Li
282*9c5db199SXin Li    Private methods:
283*9c5db199SXin Li        _invalidate_port_data: Remove port data to force re-eval.
284*9c5db199SXin Li        _get_port_info: Get USB-PD port info.
285*9c5db199SXin Li        _get_amodes: parse and return port's svid info.
286*9c5db199SXin Li    """
287*9c5db199SXin Li    def __init__(self, index):
288*9c5db199SXin Li        """Constructor.
289*9c5db199SXin Li
290*9c5db199SXin Li        @param index: integer of USB type-C port index.
291*9c5db199SXin Li        """
292*9c5db199SXin Li        self.index = index
293*9c5db199SXin Li        # TODO(crosbug.com/p/38133) target= only works for samus
294*9c5db199SXin Li        super(EC_USBPD_Port, self).__init__(target='cros_pd')
295*9c5db199SXin Li
296*9c5db199SXin Li        # Interrogate port at instantiation.  Use invalidate to force re-eval.
297*9c5db199SXin Li        self._port_info = self._get_port_info()
298*9c5db199SXin Li        self._amodes = self._get_amodes()
299*9c5db199SXin Li
300*9c5db199SXin Li    def _invalidate_port_data(self):
301*9c5db199SXin Li        """Remove port data to force re-eval."""
302*9c5db199SXin Li        self._port_info = None
303*9c5db199SXin Li        self._amodes = None
304*9c5db199SXin Li
305*9c5db199SXin Li    def _get_port_info(self):
306*9c5db199SXin Li        """Get USB-PD port info.
307*9c5db199SXin Li
308*9c5db199SXin Li        ectool command usbpd provides the following information about the port:
309*9c5db199SXin Li          - Enabled/Disabled
310*9c5db199SXin Li          - Power & Data Role
311*9c5db199SXin Li          - Polarity
312*9c5db199SXin Li          - Protocol State
313*9c5db199SXin Li
314*9c5db199SXin Li        At time of authoring it looks like:
315*9c5db199SXin Li          Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
316*9c5db199SXin Li
317*9c5db199SXin Li        @raises error.TestError if ...
318*9c5db199SXin Li          port info not parseable.
319*9c5db199SXin Li
320*9c5db199SXin Li        @returns dictionary for <port> with keyval pairs:
321*9c5db199SXin Li          enabled: True | False | None
322*9c5db199SXin Li          power_role: sink | source | None
323*9c5db199SXin Li          data_role: UFP | DFP | None
324*9c5db199SXin Li          is_reversed: True | False | None
325*9c5db199SXin Li          state: various strings | None
326*9c5db199SXin Li        """
327*9c5db199SXin Li        PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
328*9c5db199SXin Li                       'Polarity:CC(\d+)\s+State:(\w+)'
329*9c5db199SXin Li
330*9c5db199SXin Li        match = re.search(PORT_INFO_RE,
331*9c5db199SXin Li                          self.ec_command("usbpd %s" % (self.index)))
332*9c5db199SXin Li        if not match or int(match.group(1)) != self.index:
333*9c5db199SXin Li            raise error.TestError('Unable to determine port %d info' %
334*9c5db199SXin Li                                  self.index)
335*9c5db199SXin Li
336*9c5db199SXin Li        pinfo = dict(enabled=None, power_role=None, data_role=None,
337*9c5db199SXin Li                    is_reversed=None, state=None)
338*9c5db199SXin Li        pinfo['enabled'] = match.group(2) == 'enabled'
339*9c5db199SXin Li        pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
340*9c5db199SXin Li        pinfo['data_role'] = match.group(4)
341*9c5db199SXin Li        pinfo['is_reversed'] = True if match.group(5) == '2' else False
342*9c5db199SXin Li        pinfo['state'] = match.group(6)
343*9c5db199SXin Li        logging.debug('port_info = %s', pinfo)
344*9c5db199SXin Li        return pinfo
345*9c5db199SXin Li
346*9c5db199SXin Li    def _get_amodes(self):
347*9c5db199SXin Li        """Parse alternate modes from pdgetmode.
348*9c5db199SXin Li
349*9c5db199SXin Li        Looks like ...
350*9c5db199SXin Li          *SVID:0xff01 *0x00000485  0x00000000 ...
351*9c5db199SXin Li          SVID:0x18d1   0x00000001  0x00000000 ...
352*9c5db199SXin Li
353*9c5db199SXin Li        @returns dictionary of format:
354*9c5db199SXin Li          <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
355*9c5db199SXin Li            where:
356*9c5db199SXin Li              <svid>        : USB-IF Standard or vendor id as
357*9c5db199SXin Li                              hex string (i.e. 0xff01)
358*9c5db199SXin Li              <config_list> : list of uint32_t configs
359*9c5db199SXin Li              <opos>        : integer of active object position.
360*9c5db199SXin Li                              Note, this is the config list index + 1
361*9c5db199SXin Li        """
362*9c5db199SXin Li        SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
363*9c5db199SXin Li        svids = dict()
364*9c5db199SXin Li        cmd = 'pdgetmode %d' % self.index
365*9c5db199SXin Li        for line in self.ec_command(cmd, ignore_status=True).split('\n'):
366*9c5db199SXin Li            if line.strip() == '':
367*9c5db199SXin Li                continue
368*9c5db199SXin Li            logging.debug('pdgetmode line: %s', line)
369*9c5db199SXin Li            match = re.search(SVID_RE, line)
370*9c5db199SXin Li            if not match:
371*9c5db199SXin Li                logging.warning("Unable to parse SVID line %s", line)
372*9c5db199SXin Li                continue
373*9c5db199SXin Li            active = match.group(1) == '*'
374*9c5db199SXin Li            svid = match.group(2)
375*9c5db199SXin Li            configs_str = match.group(3)
376*9c5db199SXin Li            configs = list()
377*9c5db199SXin Li            opos = None
378*9c5db199SXin Li            for i,config in enumerate(configs_str.split(), 1):
379*9c5db199SXin Li                if config.startswith('*'):
380*9c5db199SXin Li                    opos = i
381*9c5db199SXin Li                    config = config[1:]
382*9c5db199SXin Li                config = int(config, 16)
383*9c5db199SXin Li                # ignore unpopulated configs
384*9c5db199SXin Li                if config == 0:
385*9c5db199SXin Li                    continue
386*9c5db199SXin Li                configs.append(config)
387*9c5db199SXin Li            svids[svid] = dict(active=active, configs=configs, opos=opos)
388*9c5db199SXin Li
389*9c5db199SXin Li        logging.debug("Port %d svids = %s", self.index, svids)
390*9c5db199SXin Li        return svids
391*9c5db199SXin Li
392*9c5db199SXin Li    def is_dfp(self):
393*9c5db199SXin Li        """Determine if data role is Downstream Facing Port (DFP).
394*9c5db199SXin Li
395*9c5db199SXin Li        @returns True if DFP False otherwise.
396*9c5db199SXin Li        """
397*9c5db199SXin Li        if self._port_info is None:
398*9c5db199SXin Li            self._port_info = self._get_port_info()
399*9c5db199SXin Li
400*9c5db199SXin Li        return self._port_info['data_role'] == 'DFP'
401*9c5db199SXin Li
402*9c5db199SXin Li    def is_amode_supported(self, svid):
403*9c5db199SXin Li        """Check if alternate mode is supported by port partner.
404*9c5db199SXin Li
405*9c5db199SXin Li        @param svid: alternate mode SVID hexstring (i.e. 0xff01)
406*9c5db199SXin Li        """
407*9c5db199SXin Li        if self._amodes is None:
408*9c5db199SXin Li            self._amodes = self._get_amodes()
409*9c5db199SXin Li
410*9c5db199SXin Li        if svid in self._amodes.keys():
411*9c5db199SXin Li            return True
412*9c5db199SXin Li        return False
413*9c5db199SXin Li
414*9c5db199SXin Li    def is_amode_entered(self, svid, opos):
415*9c5db199SXin Li        """Check if alternate mode is entered.
416*9c5db199SXin Li
417*9c5db199SXin Li        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
418*9c5db199SXin Li        @param opos: object position of config to act on.
419*9c5db199SXin Li
420*9c5db199SXin Li        @returns True if entered False otherwise
421*9c5db199SXin Li        """
422*9c5db199SXin Li        if self._amodes is None:
423*9c5db199SXin Li            self._amodes = self._get_amodes()
424*9c5db199SXin Li
425*9c5db199SXin Li        if not self.is_amode_supported(svid):
426*9c5db199SXin Li            return False
427*9c5db199SXin Li
428*9c5db199SXin Li        if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
429*9c5db199SXin Li            return True
430*9c5db199SXin Li
431*9c5db199SXin Li        return False
432*9c5db199SXin Li
433*9c5db199SXin Li    def set_amode(self, svid, opos, enter, delay_secs=2):
434*9c5db199SXin Li        """Set alternate mode.
435*9c5db199SXin Li
436*9c5db199SXin Li        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
437*9c5db199SXin Li        @param opos: object position of config to act on.
438*9c5db199SXin Li        @param enter: Boolean of whether to enter mode.
439*9c5db199SXin Li
440*9c5db199SXin Li        @raises error.TestError if ...
441*9c5db199SXin Li           mode not supported.
442*9c5db199SXin Li           opos is > number of configs.
443*9c5db199SXin Li
444*9c5db199SXin Li        @returns True if successful False otherwise
445*9c5db199SXin Li        """
446*9c5db199SXin Li        if self._amodes is None:
447*9c5db199SXin Li            self._amodes = self._get_amodes()
448*9c5db199SXin Li
449*9c5db199SXin Li        if svid not in self._amodes.keys():
450*9c5db199SXin Li            raise error.TestError("SVID %s not supported" % svid)
451*9c5db199SXin Li
452*9c5db199SXin Li        if opos > len(self._amodes[svid]['configs']):
453*9c5db199SXin Li            raise error.TestError("opos > available configs")
454*9c5db199SXin Li
455*9c5db199SXin Li        cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
456*9c5db199SXin Li                                         1 if enter else 0)
457*9c5db199SXin Li        self.ec_command(cmd, ignore_status=True)
458*9c5db199SXin Li        self._invalidate_port_data()
459*9c5db199SXin Li
460*9c5db199SXin Li        # allow some time for mode entry/exit
461*9c5db199SXin Li        time.sleep(delay_secs)
462*9c5db199SXin Li        return self.is_amode_entered(svid, opos) == enter
463*9c5db199SXin Li
464*9c5db199SXin Li    def get_flash_info(self):
465*9c5db199SXin Li        mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
466*9c5db199SXin Li        mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
467*9c5db199SXin Li        flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
468*9c5db199SXin Li                                    'dev_minor', 'rw_hash', 'image_status'])
469*9c5db199SXin Li
470*9c5db199SXin Li        cmd = 'infopddev %d' % self.index
471*9c5db199SXin Li
472*9c5db199SXin Li        tries = 3
473*9c5db199SXin Li        while (tries):
474*9c5db199SXin Li            res = self.ec_command(cmd, ignore_status=True)
475*9c5db199SXin Li            if not 'has no discovered device' in res:
476*9c5db199SXin Li                break
477*9c5db199SXin Li
478*9c5db199SXin Li            tries -= 1
479*9c5db199SXin Li            time.sleep(1)
480*9c5db199SXin Li
481*9c5db199SXin Li        for ln in res.split('\n'):
482*9c5db199SXin Li            mat1 = re.match(mat1_re, ln)
483*9c5db199SXin Li            if mat1:
484*9c5db199SXin Li                flash_dict['ptype'] = int(mat1.group(1))
485*9c5db199SXin Li                flash_dict['vid'] = mat1.group(2)
486*9c5db199SXin Li                flash_dict['pid'] = mat1.group(3)
487*9c5db199SXin Li                continue
488*9c5db199SXin Li
489*9c5db199SXin Li            mat2 = re.match(mat2_re, ln)
490*9c5db199SXin Li            if mat2:
491*9c5db199SXin Li                flash_dict['dev_major'] = int(mat2.group(1))
492*9c5db199SXin Li                flash_dict['dev_minor'] = int(mat2.group(2))
493*9c5db199SXin Li                flash_dict['rw_hash'] = mat2.group(3)
494*9c5db199SXin Li                flash_dict['image_status'] = mat2.group(4)
495*9c5db199SXin Li                break
496*9c5db199SXin Li
497*9c5db199SXin Li        return flash_dict
498*9c5db199SXin Li
499*9c5db199SXin Li
500*9c5db199SXin Liclass EC_USBPD(EC_Common):
501*9c5db199SXin Li    """Class for CrOS embedded controller for USB-PD.
502*9c5db199SXin Li
503*9c5db199SXin Li    Public attributes:
504*9c5db199SXin Li        ports: list EC_USBPD_Port instances
505*9c5db199SXin Li
506*9c5db199SXin Li    Public Methods:
507*9c5db199SXin Li        get_num_ports: get number of USB-PD ports device has.
508*9c5db199SXin Li
509*9c5db199SXin Li    Private attributes:
510*9c5db199SXin Li        _num_ports: integer number of USB-PD ports device has.
511*9c5db199SXin Li    """
512*9c5db199SXin Li    def __init__(self, num_ports=None):
513*9c5db199SXin Li        """Constructor.
514*9c5db199SXin Li
515*9c5db199SXin Li        @param num_ports: total number of USB-PD ports on device.  This is an
516*9c5db199SXin Li          override.  If left 'None' will try to determine.
517*9c5db199SXin Li        """
518*9c5db199SXin Li        self._num_ports = num_ports
519*9c5db199SXin Li        self.ports = list()
520*9c5db199SXin Li
521*9c5db199SXin Li        # TODO(crosbug.com/p/38133) target= only works for samus
522*9c5db199SXin Li        super(EC_USBPD, self).__init__(target='cros_pd')
523*9c5db199SXin Li
524*9c5db199SXin Li        if (self.get_num_ports() == 0):
525*9c5db199SXin Li            raise error.TestNAError("Device has no USB-PD ports")
526*9c5db199SXin Li
527*9c5db199SXin Li        for i in range(self._num_ports):
528*9c5db199SXin Li            self.ports.append(EC_USBPD_Port(i))
529*9c5db199SXin Li
530*9c5db199SXin Li    def get_num_ports(self):
531*9c5db199SXin Li        """Determine the number of ports for device.
532*9c5db199SXin Li
533*9c5db199SXin Li        Uses ectool's usbpdpower command which in turn makes host command call
534*9c5db199SXin Li        to EC_CMD_USB_PD_PORTS to determine the number of ports.
535*9c5db199SXin Li
536*9c5db199SXin Li        TODO(tbroch) May want to consider adding separate ectool command to
537*9c5db199SXin Li        surface the number of ports directly instead of via usbpdpower
538*9c5db199SXin Li
539*9c5db199SXin Li        @returns number of ports.
540*9c5db199SXin Li        """
541*9c5db199SXin Li        if (self._num_ports is not None):
542*9c5db199SXin Li            return self._num_ports
543*9c5db199SXin Li
544*9c5db199SXin Li        self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
545*9c5db199SXin Li        return self._num_ports
546