xref: /aosp_15_r20/external/autotest/client/common_lib/cros/kernel_utils.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2020 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import glob
6import logging
7import os
8import re
9
10from autotest_lib.client.bin import utils
11
12_KERNEL_A = {'name': 'KERN-A', 'kernel': 2, 'root': 3}
13_KERNEL_B = {'name': 'KERN-B', 'kernel': 4, 'root': 5}
14_MINIOS_A = 'A'
15_MINIOS_B = 'B'
16
17# Time to wait for new kernel to be marked successful after auto update.
18_KERNEL_UPDATE_TIMEOUT = 120
19
20_BOOT_ERR_MSG = 'The active image slot did not change after the update.'
21
22def _run(cmd, host=None):
23    """
24    Function to execue commands.
25
26    This allows the util to be used by client and server tests.
27
28    @param cmd: The command to execute
29    @param host: The host to use in a server test. None to use utils.run.
30
31    """
32    if host is not None:
33        return host.run(cmd)
34    else:
35        return utils.run(cmd)
36
37def _cgpt(flag, kernel, host=None):
38    """
39    Helper function to return numeric cgpt value.
40
41    @param flag: Additional flag to pass to cgpt
42    @param kernel: The kernel we want to interact with.
43    @param host: The DUT to execute the command on. None to execute locally.
44
45    """
46    rootdev = _run(['rootdev','-s', '-d'], host).stdout.strip()
47    return int(_run(['cgpt', 'show', '-n', '-i', str(kernel['kernel']), flag,
48                     rootdev], host).stdout.strip())
49
50def get_kernel_state(host=None):
51    """
52    Returns the (<active>, <inactive>) kernel state as a pair.
53
54    @param host: The DUT to execute the command on. None to execute locally.
55
56    """
57    rootdev = _run(['rootdev','-s'], host).stdout.strip()
58    active_root = int(re.findall('\d+\Z', rootdev)[0])
59    if active_root == _KERNEL_A['root']:
60        return _KERNEL_A, _KERNEL_B
61    elif active_root == _KERNEL_B['root']:
62        return _KERNEL_B, _KERNEL_A
63    else:
64        raise Exception('Encountered unknown root partition: %s' % active_root)
65
66def get_next_kernel(host=None):
67    """
68    Return the kernel that has priority for the next boot.
69
70    @param host: The DUT to execute the command on. None to execute locally.
71
72    """
73    priority_a = _cgpt('-P', _KERNEL_A, host)
74    priority_b = _cgpt('-P', _KERNEL_B, host)
75    return _KERNEL_A if priority_a > priority_b else _KERNEL_B
76
77def get_kernel_success(kernel, host=None):
78    """
79    Return boolean success flag for the specified kernel.
80
81    @param kernel: Information of the given kernel, either _KERNEL_A
82                   or _KERNEL_B.
83    @param host: The DUT to execute the command on. None to execute locally.
84
85    """
86    return _cgpt('-S', kernel, host) != 0
87
88def get_kernel_tries(kernel, host=None):
89    """Return tries count for the specified kernel.
90
91    @param kernel: Information of the given kernel, either _KERNEL_A
92                   or _KERNEL_B.
93    @param host: The DUT to execute the command on. None to execute locally.
94
95    """
96    return _cgpt('-T', kernel, host)
97
98
99def verify_kernel_state_after_update(host=None, inactive_kernel=True):
100    """
101    Ensure the next kernel to boot is the expected kernel.
102
103    This is useful for checking after completing an update.
104
105    @param host: The DUT to execute the command on. None to execute locally.
106    @param inactive_kernel: Indicates if the expected kernel is the inactive
107                            kernel (True) or the active kernel (False).
108    @returns the next kernel.
109
110    """
111    expected_kernel = get_kernel_state(host)[1 if inactive_kernel else 0]
112    next_kernel = get_next_kernel(host)
113    if next_kernel != expected_kernel:
114        raise Exception(
115                'The kernel for next boot is %s, but %s was expected.' %
116                (next_kernel['name'], expected_kernel['name']))
117    return next_kernel
118
119
120def verify_boot_expectations(expected_kernel, error_message=_BOOT_ERR_MSG,
121                             host=None):
122    """Verifies that we fully booted into the expected kernel state.
123
124    This method both verifies that we booted using the correct kernel
125    state and that the OS has marked the kernel as good.
126
127    @param expected_kernel: kernel that we are verifying with,
128                            eg I expect to be booted onto partition 4.
129    @param error_message: string include in except message text
130                          if we booted with the wrong partition.
131    @param host: The DUT to execute the command on. None to execute locally.
132
133    """
134    # Figure out the newly active kernel.
135    active_kernel = get_kernel_state(host)[0]
136
137    if active_kernel != expected_kernel:
138        # Kernel crash reports should be wiped between test runs, but
139        # may persist from earlier parts of the test, or from problems
140        # with provisioning.
141        #
142        # Kernel crash reports will NOT be present if the crash happened
143        # before encrypted stateful is mounted.
144        kernel_crashes = glob.glob('/var/spool/crash/kernel.*.kcrash')
145        if kernel_crashes:
146            error_message += ': kernel_crash'
147            logging.debug('Found %d kernel crash reports:',
148                          len(kernel_crashes))
149            # The crash names contain timestamps that may be useful:
150            #   kernel.20131207.005945.12345.0.kcrash
151            for crash in kernel_crashes:
152                logging.debug('  %s', os.path.basename(crash))
153
154        # Print out some information to make it easier to debug
155        # the rollback.
156        logging.debug('Dumping partition table.')
157        _run('cgpt show $(rootdev -s -d)', host)
158        logging.debug('Dumping crossystem for firmware debugging.')
159        _run('crossystem --all', host)
160        raise Exception(error_message)
161
162    # Make sure chromeos-setgoodkernel runs.
163    try:
164        utils.poll_for_condition(
165            lambda: (get_kernel_tries(active_kernel, host) == 0
166                     and get_kernel_success(active_kernel, host)),
167            timeout=_KERNEL_UPDATE_TIMEOUT, sleep_interval=5)
168    except Exception:
169        services_status = _run(['status', 'system-services'], host).stdout
170        if services_status != 'system-services start/running\n':
171            raise Exception('Chrome failed to reach login screen')
172        else:
173            raise Exception('update-engine failed to call '
174                            'chromeos-setgoodkernel')
175
176
177def get_minios_priority(host=None):
178    """
179    Returns the (<active>, <inactive>) MiniOS partition as a pair.
180
181    @param host: The DUT to execute the command on. None to execute locally.
182
183    """
184    active = _run(['crossystem', 'minios_priority'], host).stdout.strip()
185    if active != _MINIOS_A and active != _MINIOS_B:
186        raise Exception('Encountered unknown MiniOS partition: %s' % active)
187    return (active, _MINIOS_B if active == _MINIOS_A else _MINIOS_A)
188
189
190def verify_minios_priority_after_update(host=None, expected=None):
191    """
192    Ensure the next MiniOS to boot is the expected one.
193
194    This is useful for checking after completing an update.
195
196    @param host: The DUT to execute the command on. None to execute locally.
197    @param expected: The expected MiniOS partition for the next boot.
198    """
199    active = _run(['crossystem', 'minios_priority'], host).stdout.strip()
200    if active != expected:
201        raise Exception(
202                'The MiniOS partition for next boot is %s, but %s was expected.'
203                % (active, expected))
204    else:
205        logging.debug(
206                'The MiniOS partition for next boot is %s, matches expected.',
207                active)
208