xref: /aosp_15_r20/external/autotest/server/hosts/cros_label.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""This class defines the CrosHost Label class."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import collections
13import configparser
14import logging
15import re
16
17import common
18
19from autotest_lib.client.bin import utils
20from autotest_lib.client.cros.audio import cras_utils
21from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
22from autotest_lib.server.hosts import base_label
23from autotest_lib.server.hosts import common_label
24from autotest_lib.server.hosts import servo_constants
25from six.moves import zip
26
27# pylint: disable=missing-docstring
28LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board'])
29
30# Repair and Deploy taskName
31REPAIR_TASK_NAME = 'repair'
32DEPLOY_TASK_NAME = 'deploy'
33
34
35def _parse_lsb_output(host):
36    """Parses the LSB output and returns key data points for labeling.
37
38    @param host: Host that the command will be executed against
39    @returns: LsbOutput with the result of parsing the /etc/lsb-release output
40    """
41    release_info = utils.parse_cmd_output('cat /etc/lsb-release',
42                                          run_method=host.run)
43
44    unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1'
45    return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD'])
46
47
48class DeviceSkuLabel(base_label.StringPrefixLabel):
49    """Determine the correct device_sku label for the device."""
50
51    _NAME =  ds_constants.DEVICE_SKU_LABEL
52
53    def generate_labels(self, host):
54        device_sku = host.host_info_store.get().device_sku
55        if device_sku:
56            return [device_sku]
57
58        mosys_cmd = 'mosys platform sku'
59        result = host.run(command=mosys_cmd, ignore_status=True)
60        if result.exit_status == 0:
61            return [result.stdout.strip()]
62
63        return []
64
65    def update_for_task(self, task_name):
66        # This label is stored in the lab config.
67        return task_name in (DEPLOY_TASK_NAME, REPAIR_TASK_NAME, '')
68
69
70class BrandCodeLabel(base_label.StringPrefixLabel):
71    """Determine the correct brand_code (aka RLZ-code) for the device."""
72
73    _NAME =  ds_constants.BRAND_CODE_LABEL
74
75    def generate_labels(self, host):
76        brand_code = host.host_info_store.get().brand_code
77        if brand_code:
78            return [brand_code]
79
80        cros_config_cmd = 'cros_config / brand-code'
81        result = host.run(command=cros_config_cmd, ignore_status=True)
82        if result.exit_status == 0:
83            return [result.stdout.strip()]
84
85        return []
86
87
88class BluetoothPeerLabel(base_label.StringPrefixLabel):
89    """Return the Bluetooth peer labels.
90
91    working_bluetooth_btpeer label is applied if a Raspberry Pi Bluetooth peer
92    is detected.There can be up to 4 Bluetooth peers. Labels
93    working_bluetooth_btpeer:[1-4] will be assigned depending on the number of
94    peers present.
95
96    """
97
98    _NAME = 'working_bluetooth_btpeer'
99
100    def exists(self, host):
101        return  len(host._btpeer_host_list) > 0
102
103    def generate_labels(self, host):
104        labels_list = []
105        count = 1
106
107        for (btpeer, btpeer_host) in \
108                        zip(host.btpeer_list, host._btpeer_host_list):
109            try:
110                # Initialize one device type to make sure the peer is working
111                bt_hid_device = btpeer.get_bluetooth_hid_mouse()
112                if bt_hid_device.CheckSerialConnection():
113                    labels_list.append(str(count))
114                    count += 1
115            except Exception as e:
116                logging.error('Error with initializing bt_hid_mouse on '
117                              'btpeer %s %s', btpeer_host.hostname, e)
118
119        logging.info('Bluetooth Peer labels are %s', labels_list)
120        return labels_list
121
122    def update_for_task(self, task_name):
123        # This label is stored in the state config, so only repair tasks update
124        # it or when no task name is mentioned.
125        return task_name in (REPAIR_TASK_NAME, '')
126
127
128class Cr50Label(base_label.StringPrefixLabel):
129    """Label indicating the cr50 image type."""
130
131    _NAME = 'cr50'
132
133    def __init__(self):
134        self.ver = None
135
136    def exists(self, host):
137        # Make sure the gsctool version command runs ok
138        self.ver = host.run('gsctool -a -f', ignore_status=True)
139        return self.ver.exit_status == 0
140
141    def _get_version(self, region):
142        """Get the version number of the given region"""
143        return re.search(region + ' (\d+\.\d+\.\d+)', self.ver.stdout).group(1)
144
145    def generate_labels(self, host):
146        # Check the major version to determine prePVT vs PVT
147        version = self._get_version('RW')
148        major_version = int(version.split('.')[1])
149        # PVT images have a odd major version prePVT have even
150        return ['pvt' if (major_version % 2) else 'prepvt']
151
152    def update_for_task(self, task_name):
153        # This label is stored in the state config, so only repair tasks update
154        # it or when no task name is mentioned.
155        return task_name in (REPAIR_TASK_NAME, '')
156
157
158class Cr50RWKeyidLabel(Cr50Label):
159    """Label indicating the cr50 RW version."""
160    _REGION = 'RW'
161    _NAME = 'cr50-rw-keyid'
162
163    def _get_keyid_info(self, region):
164        """Get the keyid of the given region."""
165        match = re.search('keyids:.*%s (\S+)' % region, self.ver.stdout)
166        keyid = match.group(1).rstrip(',')
167        is_prod = int(keyid, 16) & (1 << 2)
168        return [keyid, 'prod' if is_prod else 'dev']
169
170    def generate_labels(self, host):
171        """Get the key type."""
172        return self._get_keyid_info(self._REGION)
173
174
175class Cr50ROKeyidLabel(Cr50RWKeyidLabel):
176    """Label indicating the RO key type."""
177    _REGION = 'RO'
178    _NAME = 'cr50-ro-keyid'
179
180
181class ChameleonLabel(base_label.BaseLabel):
182    """Determine if a Chameleon is connected to this host."""
183
184    _NAME = 'chameleon'
185
186    def exists(self, host):
187        # See crbug.com/1004500#2 for details.
188        has_chameleon = host._chameleon_host is not None
189        # TODO(crbug.com/995900) -- debug why chameleon label is flipping
190        try:
191            logging.info("has_chameleon %s", has_chameleon)
192            logging.info("_chameleon_host %s",
193                         getattr(host, "_chameleon_host", "NO_ATTRIBUTE"))
194            logging.info("chameleon %s",
195                         getattr(host, "chameleon", "NO_ATTRIBUTE"))
196        except:
197            pass
198        return has_chameleon
199
200    def update_for_task(self, task_name):
201        # This label is stored in the state config, so only repair tasks update
202        # it or when no task name is mentioned.
203        return task_name in (REPAIR_TASK_NAME, '')
204
205
206class ChameleonConnectionLabel(base_label.StringPrefixLabel):
207    """Return the Chameleon connection label."""
208
209    _NAME = 'chameleon'
210
211    def exists(self, host):
212        return host._chameleon_host is not None
213
214    def generate_labels(self, host):
215        return [host.chameleon.get_label()]
216
217    def update_for_task(self, task_name):
218        # This label is stored in the lab config, so only deploy tasks update it
219        # or when no task name is mentioned.
220        return task_name in (DEPLOY_TASK_NAME, '')
221
222
223class AudioConfigLabel(base_label.StringPrefixLabel):
224    """Determine the label of CRAS configuration for the device.
225
226    It parses config keys from the board.ini file content as the example below:
227
228    [hotword]
229    pause_at_suspend=1
230    [processing]
231    nc_supported=1
232
233    """
234
235    _NAME = 'audio'
236
237    def generate_labels(self, host):
238        # Get model name for determining the board.ini file path.
239        cros_config_cmd = 'cros_config / name'
240        result = host.run(command=cros_config_cmd, ignore_status=True)
241        if result.exit_status != 0:
242            logging.error('Failed to run command: %s', cros_config_cmd)
243            return []
244
245        model = result.stdout.strip()
246        cras_config_cmd = 'cat /etc/cras/{}/board.ini'.format(model)
247        result = host.run(command=cras_config_cmd, ignore_status=True)
248        if result.exit_status != 0:
249            logging.error('Failed to run command: %s', cras_config_cmd)
250            return []
251
252        config = configparser.ConfigParser()
253        config.read_string(result.stdout)
254        labels = []
255        # Generate "has_noise_cancellation" from "processing:nc_supported".
256        if config.getboolean('processing', 'nc_supported', fallback=False):
257            labels.append('has_noise_cancellation')
258
259        return labels
260
261
262class AudioLoopbackDongleLabel(base_label.BaseLabel):
263    """Return the label if an audio loopback dongle is plugged in."""
264
265    _NAME = 'audio_loopback_dongle'
266
267    def exists(self, host):
268        # Based on crbug.com/991285, AudioLoopbackDongle sometimes flips.
269        # Ensure that AudioLoopbackDongle.exists returns True
270        # forever, after it returns True *once*.
271        if self._cached_exists(host):
272            # If the current state is True, return it, don't run the command on
273            # the DUT and potentially flip the state.
274            return True
275        # If the current state is not True, run the command on
276        # the DUT. The new state will be set to whatever the command
277        # produces.
278        return self._host_run_exists(host)
279
280    def _cached_exists(self, host):
281        """Get the state of AudioLoopbackDongle in the data store"""
282        info = host.host_info_store.get()
283        for label in info.labels:
284            if label.startswith(self._NAME):
285                return True
286        return False
287
288    def _host_run_exists(self, host):
289        """Detect presence of audio_loopback_dongle by physically
290        running a command on the DUT."""
291        nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(),
292                              ignore_status=True).stdout
293        if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and
294            cras_utils.node_type_is_plugged('MIC', nodes_info)):
295            return True
296        return False
297
298    def update_for_task(self, task_name):
299        # This label is stored in the state config, so only repair tasks update
300        # it or when no task name is mentioned.
301        return task_name in (REPAIR_TASK_NAME, '')
302
303
304class ServoTypeLabel(base_label.StringPrefixLabel):
305    _NAME = servo_constants.SERVO_TYPE_LABEL_PREFIX
306
307    def generate_labels(self, host):
308        info = host.host_info_store.get()
309
310        servo_type = self._get_from_labels(info)
311        if servo_type != '':
312            logging.info("Using servo_type: %s from cache!", servo_type)
313            return [servo_type]
314
315        if host.servo is not None:
316            try:
317                servo_type = host.servo.get_servo_version()
318                if servo_type != '':
319                    return [servo_type]
320                logging.warning('Cannot collect servo_type from servo'
321                ' by `dut-control servo_type`! Please file a bug'
322                ' and inform infra team as we are not expected '
323                ' to reach this point.')
324            except Exception as e:
325                # We don't want fail the label and break DUTs here just
326                # because of servo issue.
327                logging.error("Failed to update servo_type, %s", str(e))
328        return []
329
330    def _get_from_labels(self, info):
331        prefix = self._NAME + ':'
332        for label in info.labels:
333            if  label.startswith(prefix):
334                suffix_length = len(prefix)
335                return label[suffix_length:]
336        return ''
337
338    def update_for_task(self, task_name):
339        # This label is stored in the lab config,
340        # only deploy and repair tasks update it
341        # or when no task name is mentioned.
342        return task_name in (DEPLOY_TASK_NAME, '')
343
344
345def _parse_hwid_labels(hwid_info_list):
346    if len(hwid_info_list) == 0:
347        return hwid_info_list
348
349    res = []
350    # See crbug.com/997816#c7 for details of two potential formats of returns
351    # from HWID server.
352    if isinstance(hwid_info_list[0], dict):
353        # Format of hwid_info:
354        # [{u'name': u'sku', u'value': u'xxx'}, ..., ]
355        for hwid_info in hwid_info_list:
356            value = hwid_info.get('value', '')
357            name = hwid_info.get('name', '')
358            # There should always be a name but just in case there is not.
359            if name:
360                new_label = name if not value else '%s:%s' % (name, value)
361                res.append(new_label)
362    else:
363        # Format of hwid_info:
364        # [<DUTLabel name: 'sku' value: u'xxx'>, ..., ]
365        for hwid_info in hwid_info_list:
366            new_label = str(hwid_info)
367            logging.info('processing hwid label: %s', new_label)
368            res.append(new_label)
369
370    return res
371
372
373CROS_LABELS = [
374    AudioConfigLabel(),
375    AudioLoopbackDongleLabel(), #STATECONFIG
376    BluetoothPeerLabel(), #STATECONFIG
377    ChameleonConnectionLabel(), #LABCONFIG
378    ChameleonLabel(), #STATECONFIG
379    common_label.OSLabel(),
380    DeviceSkuLabel(), #LABCONFIG
381    ServoTypeLabel(), #LABCONFIG
382    # Temporarily add back as there's no way to reference cr50 configs.
383    # See crbug.com/1057145 for the root cause.
384    # See crbug.com/1057719 for future tracking.
385    Cr50Label(),
386    Cr50ROKeyidLabel(),
387]
388
389LABSTATION_LABELS = [
390    common_label.OSLabel(),
391]
392