xref: /aosp_15_r20/external/autotest/site_utils/admin_audit/servo_updater.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/env python3
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging
7
8import common
9from autotest_lib.client.common_lib import utils as client_utils
10from autotest_lib.server.cros.servo.topology import topology_constants
11
12try:
13    from autotest_lib.utils.frozen_chromite.lib import metrics
14except ImportError:
15    metrics = client_utils.metrics_mock
16
17
18class ServoFwVersionMissedError(Exception):
19    """Raised when Available version is not detected."""
20
21
22class ServoUpdaterError(Exception):
23    """Raised when detected issue with servo_updater."""
24
25
26class _BaseUpdateServoFw(object):
27    """Base class to update firmware on servo"""
28
29    # Commands to kill active servo_updater fail with timeout
30    ACTIVE_UPDATER_CORE = 'ps aux | grep -ie [s]ervo_updater |grep "%s" '
31    ACTIVE_UPDATER_PRINT = ACTIVE_UPDATER_CORE + "| awk '{print $2}' "
32    ACTIVE_UPDATER_KILL = ACTIVE_UPDATER_PRINT + "| xargs kill -9 "
33
34    # Command to update FW for servo. Always reboot servo after update.
35    UPDATER_TAIL = '-b %s -s "%s" -c %s --reboot'
36    UPDATER_CMD = 'servo_updater ' + UPDATER_TAIL
37    UPDATER_CONTAINER_CMD = 'python /update_servo_firmware.py ' + UPDATER_TAIL
38
39    # Command to get servo firmware version for requested board and channel.
40    LATEST_VERSION_CMD = 'servo_updater -p -b "%s" -c %s | grep firmware'
41
42    # Default firmware channel.
43    DEFAULT_FW_CHANNEL = 'stable'
44
45    def __init__(self, servo_host, device):
46        """Init servo-updater instance.
47
48        @params servo_host: ServoHost instance to run terminal commands
49        @params device:     ConnectedServo instance provided servo info
50        """
51        self._host = servo_host
52        self._device = device
53
54    def need_update(self, ignore_version=False, channel=None):
55        """Verify that servo_update is required.
56
57        @params ignore_version: Do not check the version on the device.
58        @params channel:        Channel for servo firmware. Supported from
59                                version R90. Possible values: stable, prev,
60                                dev, alpha.
61
62        @returns: True if update required, False if not
63        """
64        if not channel:
65            channel = self.DEFAULT_FW_CHANNEL
66        if not self._host:
67            logging.debug('Skip update as host is provided.')
68            return False
69        elif not self.get_serial_number():
70            logging.debug('Skip update as servo serial is empty.')
71            return False
72        elif not (self._host.is_labstation()
73                  or self._host.is_containerized_servod()):
74            logging.debug('Skip as we run onlu from labstation and container.')
75            return False
76        elif not ignore_version:
77            if not self._is_outdated_version(channel=channel):
78                logging.debug('Skip as version is up today')
79                return False
80        return True
81
82    def update(self, force_update=False, ignore_version=False, channel=None):
83        """Update firmware on the servo.
84
85        Steps:
86        1) Verify servo is not updated by checking the versions.
87        2) Try to get serial number for the servo.
88        3) Updating firmware.
89
90        @params force_update:   Run updater with force option.
91        @params ignore_version: Do not check the version on the device.
92        @params channel:        Channel for servo firmware. Supported from
93                                version R90. Possible values: stable, prev,
94                                dev, alpha.
95        """
96        if not channel:
97            channel = self.DEFAULT_FW_CHANNEL
98        if not self.need_update(ignore_version, channel=channel):
99            logging.info("The board %s doesn't need update.", self.get_board())
100            return
101        if not self.get_serial_number():
102            logging.info('Serial number is not detected. It means no update'
103                         ' will be performed on servo.')
104            return
105        if self._device.get_type() != self.get_board():
106            logging.info('Attempt use incorrect updater for %s. Expected: %s.',
107                         self._device.get_type(), self.get_board())
108            return
109        self._update_firmware(force_update, channel)
110
111    def get_board(self):
112        """Return servo type supported by updater."""
113        raise NotImplementedError('Please implement method to return'
114                                  ' servo type')
115
116    def get_device(self):
117        """Return ConnectedServo instance"""
118        return self._device
119
120    def get_serial_number(self):
121        """Return serial number for servo device"""
122        return self._device.get_serial_number()
123
124    def _get_updater_cmd(self, force_update, channel):
125        """Return command to run firmware updater for the servo device.
126
127        @params force_update:   Run updater with force option.
128        @params channel:        Channel for servo firmware.
129        """
130        if self._host.is_containerized_servod():
131            cmd = self.UPDATER_CONTAINER_CMD
132        else:
133            cmd = self.UPDATER_CMD
134        board = self.get_board()
135        serial_number = self.get_serial_number()
136        cmd = cmd % (board, serial_number, channel.lower())
137        if force_update:
138            cmd += ' --force '
139        return cmd
140
141    def _update_firmware(self, force_update, channel):
142        """Execute firmware updater command.
143
144        @params force_update:   Run updater with force option.
145        @params channel:        UpdateCompare version from special firmware channel
146        """
147        cmd = self._get_updater_cmd(force_update, channel)
148        logging.info('Try to update servo fw update by running: %s', cmd)
149        try:
150            res = self._host.run(cmd, timeout=120)
151            logging.debug('Servo fw update finished; %s', res.stdout.strip())
152            logging.info('Servo fw update finished')
153        finally:
154            self._kill_active_update_process()
155
156    def _kill_active_update_process(self):
157        """Kill active servo_update processes when stuck after attempt."""
158        try:
159            cmd = self.ACTIVE_UPDATER_KILL % self.get_serial_number()
160            self._host.run(cmd, timeout=30, ignore_status=True)
161        except Exception as e:
162            logging.debug('Fail kill active processes; %s', e)
163
164    def _current_version(self):
165        """Get current version on servo device"""
166        return self._device.get_version()
167
168    def _latest_version(self, channel):
169        """Get latest available version from servo_updater.
170
171        @params channel: Compare version from special firmware channel
172        """
173        cmd = self.LATEST_VERSION_CMD % (self.get_board(), channel.lower())
174        re = self._host.run(cmd, ignore_status=True)
175        if re.exit_status == 0:
176            result = re.stdout.strip().split(':')
177            if len(result) == 2:
178                return result[-1].strip()
179        return None
180
181    def _is_outdated_version(self, channel):
182        """Compare version to determine request to update the Servo or not.
183
184        @params channel: Compare version from special firmware channel
185        """
186        current_version = self._current_version()
187        logging.debug('Servo fw on the device: "%s"', current_version)
188        latest_version = self._latest_version(channel)
189        logging.debug('Latest servo fw: "%s"', latest_version)
190        if not current_version:
191            return True
192        if not latest_version:
193            raise ServoFwVersionMissedError()
194        if current_version == latest_version:
195            return False
196        return True
197
198
199class UpdateServoV4Fw(_BaseUpdateServoFw):
200    """Servo firmware updater for servo_v4."""
201
202    def get_board(self):
203        """Return servo type supported by updater"""
204        return topology_constants.ST_V4_TYPE
205
206
207class UpdateServoV4p1Fw(_BaseUpdateServoFw):
208    """Servo firmware updater for servo_v4p1."""
209
210    def get_board(self):
211        """Return servo type supported by updater"""
212        return topology_constants.ST_V4P1_TYPE
213
214
215class UpdateServoMicroFw(_BaseUpdateServoFw):
216    """Servo firmware updater for servo_micro."""
217
218    def get_board(self):
219        """Return servo type supported by updater"""
220        return topology_constants.ST_SERVO_MICRO_TYPE
221
222
223class UpdateC2D2Fw(_BaseUpdateServoFw):
224    """Servo firmware updater for c2d2."""
225
226    def get_board(self):
227        """Return servo type supported by updater"""
228        return topology_constants.ST_C2D2_TYPE
229
230
231class UpdateSweetberryFw(_BaseUpdateServoFw):
232    """Servo firmware updater for sweetberry."""
233
234    def get_board(self):
235        """Return servo type supported by updater"""
236        return topology_constants.ST_SWEETBERRY_TYPE
237
238
239# List servo firmware updaters mapped to the type
240SERVO_UPDATERS = {
241        topology_constants.ST_V4_TYPE: UpdateServoV4Fw,
242        topology_constants.ST_V4P1_TYPE: UpdateServoV4p1Fw,
243        topology_constants.ST_SERVO_MICRO_TYPE: UpdateServoMicroFw,
244        topology_constants.ST_C2D2_TYPE: UpdateC2D2Fw,
245        topology_constants.ST_SWEETBERRY_TYPE: UpdateSweetberryFw,
246}
247
248# List known, tracking issue related to servo_updater.
249SERVO_UPDATER_ISSUE_MSGS = ('Configuration not set', )
250
251
252def _run_update_attempt(updater, topology, try_count, force_update,
253                        ignore_version, channel):
254    """Run servo update attempt.
255
256    @params updater:        Servo updater instance.
257    @params topology:       ServoTopology instance to update version.
258    @params try_count:      Count of attempt to run update.
259    @params force_update:   Run updater with force option.
260    @params ignore_version: Do not check the version on the device.
261    @params channel:        Request servo firmware from special channel
262
263    @returns:   True is finished without any error, False - with error
264    """
265    board = updater.get_board()
266    success = False
267    for a in range(try_count):
268        msg = 'Starting attempt: %d (of %d) to update "%s".'
269        if force_update:
270            msg += ' with force'
271        logging.info(msg, a + 1, try_count, board)
272        try:
273            updater.update(force_update=force_update,
274                           ignore_version=ignore_version,
275                           channel=channel)
276            topology.update_servo_version(updater.get_device())
277            if not updater.need_update(ignore_version=ignore_version,
278                                       channel=channel):
279                success = True
280        except Exception as er:
281            error_message = str(er)
282            logging.debug('(Not critical) fail to update %s; %s', board,
283                          error_message)
284            for message in SERVO_UPDATER_ISSUE_MSGS:
285                if message in error_message:
286                    raise ServoUpdaterError()
287        if success:
288            break
289    return success
290
291
292def any_servo_needs_firmware_update(host):
293    """Verify if any servo requires firmware update.
294
295    @params host:   ServoHost instance to run required commands
296                    and access to topology.
297    @returns:       True if any servo requires an update.
298    """
299    if not host:
300        raise ValueError('ServoHost is not provided.')
301
302    has_servo_requires_update = False
303    for device in host.get_topology().get_list_of_devices():
304        # Verify that device can provide serial and servo_type.
305        if not device.is_good():
306            continue
307        board = device.get_type()
308        updater_type = SERVO_UPDATERS.get(board, None)
309        if not updater_type:
310            logging.debug('No specified updater for %s', board)
311            continue
312        logging.debug('Specified updater found for %s', board)
313        # Creating update instance
314        updater = updater_type(host, device)
315        if updater.need_update(ignore_version=False,
316                               channel=host.servo_fw_channel):
317            logging.info('The servo: %s requires firmware update!', board)
318            has_servo_requires_update = True
319        else:
320            logging.info('The servo: %s does not require firmware update!',
321                         board)
322    return has_servo_requires_update
323
324
325def update_servo_firmware(host,
326                          boards=None,
327                          try_attempt_count=1,
328                          force_update=False,
329                          try_force_update=False,
330                          ignore_version=False):
331    """Update firmware on servo devices.
332
333    @params host:               ServoHost instance to run required commands
334                                and access to topology.
335    @params try_attempt_count:  Count of attempts to update servo. For force
336                                option the count attempts is always 1 (one).
337    @params try_force_update:   Try force force option if fail to update in
338                                normal mode.
339    @params force_update:       Run updater with force option. Override
340                                try_force_update option.
341    @params ignore_version:     Do not check the version on the device.
342
343    @returns:                   True is all servos updated or does not need it,
344                                False if any device could not updated.
345    """
346    if boards is None:
347        boards = []
348    if ignore_version:
349        logging.info('Running servo_updater with ignore_version=True')
350
351    if not host:
352        raise ValueError('ServoHost is not provided.')
353
354    # Use force option as first attempt
355    use_force_option_as_first_attempt = False
356    # If requested to update with force then first attempt will be with force
357    # and there no second attempt.
358    if force_update:
359        try_attempt_count = 1
360        try_force_update = False
361        use_force_option_as_first_attempt = True
362    # to run updater we need make sure the servod is not running
363    host.stop_servod()
364    if host.is_containerized_servod():
365        # Starting container as servo_updated located in it.
366        # Starting without servod as it can block access to the servos.
367        host.start_containerized_servod(with_servod=False)
368
369    # Collection to count which board failed to update
370    fail_boards = []
371
372    servo_topology = host.get_topology()
373    # Get list connected servos
374    for device in servo_topology.get_list_of_devices():
375        # Verify that device can provide serial and servo_type.
376        if not device.is_good():
377            continue
378        board = device.get_type()
379        if len(boards) > 0 and board not in boards:
380            logging.info('The %s is not requested for update', board)
381            continue
382        updater_type = SERVO_UPDATERS.get(board, None)
383        if not updater_type:
384            logging.info('No specified updater for %s', board)
385            continue
386        # Creating update instance
387        updater = updater_type(host, device)
388        is_success_update = _run_update_attempt(
389                updater=updater,
390                topology=servo_topology,
391                try_count=try_attempt_count,
392                force_update=use_force_option_as_first_attempt,
393                ignore_version=ignore_version,
394                channel=host.servo_fw_channel)
395        # If fail to update and we got requested to try force option then
396        # run second time with force.
397        if not is_success_update and try_force_update:
398            is_success_update = _run_update_attempt(
399                    updater=updater,
400                    topology=servo_topology,
401                    try_count=1,
402                    force_update=True,
403                    ignore_version=ignore_version,
404                    channel=host.servo_fw_channel)
405        if not is_success_update:
406            logging.info('Fail update firmware for %s', board)
407            hostname = host.get_dut_hostname() or host.hostname
408            metrics.Counter('chromeos/autotest/servo/fw_update_fail'
409                            ).increment(fields={'host': hostname})
410            fail_boards.append(board)
411
412    # Need stop containr without servod we started above.
413    if host.is_containerized_servod():
414        host.stop_servod()
415
416    if len(fail_boards) == 0:
417        logging.info('Successfull updated all requested servos.')
418        return True
419    return False
420