xref: /aosp_15_r20/external/autotest/server/cros/repair/mac_address_helper.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2020 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6
7import re
8
9import common
10from autotest_lib.client.common_lib import error
11
12
13class MacAddressHelper():
14    """Verify and update cached NIC mac address on servo.
15
16    Servo_v4 plugged to the DUT and providing NIC for that. We caching mac
17    address on servod side to better debugging.
18    """
19
20    # HUB and NIC VID/PID.
21    # Values presented as the string of the hex without 0x to match
22    # representation in sysfs (idVendor/idProduct).
23    HUB_VID = '04b4'
24    HUB_PID = '6502'
25    NIC_VID = '0bda'
26    NIC_PID = '8153'
27
28    # Regex to check mac address format.
29    # eg: f4:f5:e8:50:e9:45
30    RE_MACADDR = re.compile('^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$')
31
32    def is_supported(self, host):
33        """Verify if setup is support cached NIC mac address on servo
34
35        @param host:    CrosHost instance
36        """
37        if not host._servo_host.is_labstation():
38            logging.info('Only servo_v4 has NIC; Skipping the action')
39            return False
40        if not host.servo.has_control('macaddr'):
41            logging.info('"macaddr" control not supported;'
42                         'Skipping the action')
43            return False
44        return True
45
46    def update_if_needed(self, host):
47        """Update the cached NIC mac address on servo
48
49        The process will verify if NIC mac changes and update only if
50        it required.
51
52        @param host:    CrosHost instance
53        """
54
55        if not self.is_supported(host):
56            return
57
58        servo = host.servo
59        # Path to the NIC has to be located in the HUB.
60        # eg.
61        # HUB: /sys/bus/usb/devices/1-1
62        # NIC: /sys/bus/usb/devices/1-1.1
63        hub_path = self._get_device_path(host, None, self.HUB_VID,
64                                         self.HUB_PID)
65        if not hub_path or hub_path == '.':
66            raise Exception('The servo_v4 HUB not detected from DUT.')
67        logging.debug('Path to the servo_v4 HUB device: %s', hub_path)
68        nic_path = self._get_device_path(host, hub_path, self.NIC_VID,
69                                         self.NIC_PID)
70        if not nic_path or nic_path == '.':
71            raise Exception('The servo_v4 NIC not detected in HUB folder.')
72        logging.debug('Path to the servo_v4 NIC device: %s', nic_path)
73        if hub_path == nic_path or not nic_path.startswith(hub_path):
74            raise Exception('The servo_v4 NIC was detect out of servo_v4 HUB')
75
76        macaddr = self._get_mac_address(host, nic_path)
77        if not macaddr:
78            raise Exception('Failed to extract mac address from host.')
79
80        cached_mac = self._get_cached_mac_address(host)
81        if not cached_mac or macaddr != cached_mac:
82            try:
83                servo.set('macaddr', macaddr)
84                logging.info('Successfully updated the servo "macaddr"!')
85            except error.TestFail as e:
86                logging.debug('Fail to update macaddr value; %s', e)
87                raise Exception('Fail to update the "macaddr" value!')
88        else:
89            logging.info('The servo "macaddr" doe not need update.')
90
91    def _get_cached_mac_address(self, host):
92        """Get NIC mac address from servo cache"""
93        try:
94            return host.servo.get('macaddr')
95        except error.TestFail as e:
96            logging.debug('(Non-critical) Fail to get macaddr: %s', e)
97            return None
98
99    def _get_mac_address(self, host, nic_path):
100        """Get NIC mac address from host
101
102        @param host:        CrosHost instance
103        @param nic_path:    Path to network device on the host
104        """
105        cmd = r'find %s/ | grep /net/ | grep /address' % nic_path
106        res = host.run(cmd,
107                       timeout=30,
108                       ignore_status=True,
109                       ignore_timeout=True)
110        if not res:
111            logging.info('Timeout during retriving NIC address files.')
112            return None
113        addrs = res.stdout.splitlines()
114        if not addrs or len(addrs) == 0:
115            logging.info('No NIC address file found.')
116            return None
117        if len(addrs) > 1:
118            logging.info('More than one NIC address file found.')
119            return None
120        logging.info('Found NIC address file: %s', addrs[0])
121        cmd = r'cat %s' % addrs[0]
122        res = host.run(cmd,
123                       timeout=30,
124                       ignore_status=True,
125                       ignore_timeout=True)
126        if not res:
127            logging.info('Timeout during attemp read NIC address file: %s',
128                         addrs[0])
129            return None
130        mac_addr = res.stdout.strip()
131        if not self.RE_MACADDR.match(mac_addr):
132            logging.info('incorrect format of the mac address: %s', mac_addr)
133            return None
134        logging.info('Servo_v4 NIC mac address from DUT side: %s', mac_addr)
135        return mac_addr
136
137    def _get_device_path(self, host, base_path, vid, pid):
138        """Find a device by VID/PID under particular path.
139
140        1) Get path to the unique idVendor file with VID
141        2) Get path to the unique idProduct file with PID
142        3) Get directions of both file and compare them
143
144        @param host:        CrosHost instance
145        @param base_path:   Path to the directory where to look for the device.
146        @param vid:         Vendor ID of the looking device.
147        @param pid:         Product ID of the looking device.
148
149        @returns: path to the folder of the device
150        """
151
152        def _run(cmd):
153            res = host.run(cmd,
154                           timeout=30,
155                           ignore_status=True,
156                           ignore_timeout=True)
157            l = res.stdout.splitlines()
158            if not l or len(l) != 1:
159                return None
160            return l[0]
161
162        if not base_path:
163            base_path = '/sys/bus/usb/devices/*/'
164        else:
165            base_path += '*/'
166        cmd_template = 'grep -l %s $(find %s -maxdepth 1 -name %s)'
167        vid_path = _run(cmd_template % (vid, base_path, 'idVendor'))
168        if not vid_path:
169            return None
170
171        pid_path = _run(cmd_template % (pid, base_path, 'idProduct'))
172        if not pid_path:
173            return None
174
175        # check if both files locates in the same folder
176        return _run('LC_ALL=C comm -12 <(dirname %s) <(dirname %s)' %
177                    (vid_path, pid_path))
178