xref: /aosp_15_r20/external/autotest/client/cros/faft/utils/kernel_handler.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A module containing kernel handler class used by SAFT."""
5
6import hashlib
7import os
8import re
9
10# Types of kernel modifications.
11KERNEL_BODY_MOD = 1
12KERNEL_VERSION_MOD = 2
13KERNEL_RESIGN_MOD = 3
14
15
16class KernelHandlerError(Exception):
17    """KernelHandler-specific exception."""
18    pass
19
20
21class KernelHandler(object):
22    """An object to provide ChromeOS kernel related actions.
23
24    Mostly it allows to corrupt and restore a particular kernel partition
25    (designated by the partition name, A or B.
26
27    @type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface
28    @param is_minios: True if it is a MiniOS kernel; otherwise, False.
29    """
30
31    # This value is used to alter contents of a byte in the appropriate kernel
32    # image. First added to corrupt the image, then subtracted to restore the
33    # image.
34    DELTA = 1
35
36    # The maximum kernel size in MB.
37    KERNEL_SIZE_MB = 16
38
39    def __init__(self, os_if, is_minios=False):
40        self.os_if = os_if
41        self.dump_file_name = None
42        self.partition_map = {}
43        self.root_dev = None
44        self.initialized = False
45        if is_minios:
46            self.kernel_type = 'MINIOS'
47            self.data_key = 'minios_kernel_data_key.vbprivk'
48            self.keyblock = 'minios_kernel.keyblock'
49        else:
50            self.kernel_type = 'KERN'
51            self.data_key = 'kernel_data_key.vbprivk'
52            self.keyblock = 'kernel.keyblock'
53        self.tmp_file_name = 'kernel_header_dump_%s' % self.kernel_type
54
55    def _get_version(self, device):
56        """Get version of the kernel hosted on the passed in partition."""
57        # 16 K should be enough to include headers and keys
58        data = self.os_if.read_partition(device, 0x4000)
59        return self.os_if.retrieve_body_version(data)
60
61    def _get_datakey_version(self, device):
62        """Get datakey version of kernel hosted on the passed in partition."""
63        # 16 K should be enought to include headers and keys
64        data = self.os_if.read_partition(device, 0x4000)
65        return self.os_if.retrieve_datakey_version(data)
66
67    def _get_partition_map(self, internal_disk=True):
68        """Scan `cgpt show <device> output to find kernel devices.
69
70        Args:
71          internal_disk - decide whether to use internal kernel disk.
72        """
73        if internal_disk:
74            target_device = self.os_if.get_internal_disk(
75                    self.os_if.get_root_part())
76        else:
77            target_device = self.root_dev
78
79        kernel_partitions = re.compile('%s-([AB])' % self.kernel_type)
80        disk_map = self.os_if.run_shell_command_get_output(
81                'cgpt show %s' % target_device)
82
83        for line in disk_map:
84            matched_line = kernel_partitions.search(line)
85            if not matched_line:
86                continue
87            label = matched_line.group(1)
88            part_info = {}
89            device = self.os_if.join_part(target_device, line.split()[2])
90            part_info['device'] = device
91            part_info['version'] = self._get_version(device)
92            part_info['datakey_version'] = self._get_datakey_version(device)
93            self.partition_map[label] = part_info
94
95    def dump_kernel(self, section, kernel_path):
96        """Dump the specified kernel to a file.
97
98        @param section: The kernel to dump. May be A or B.
99        @param kernel_path: The path to the kernel image.
100        """
101        dev = self.partition_map[section.upper()]['device']
102        cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path,
103                                                 self.KERNEL_SIZE_MB)
104        self.os_if.run_shell_command(cmd)
105
106    def write_kernel(self, section, kernel_path):
107        """Write a kernel image to the specified section.
108
109        @param section: The kernel to write. May be A or B.
110        @param kernel_path: The path to the kernel image to write.
111        """
112        dev = self.partition_map[section.upper()]['device']
113        dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev,
114                                                    self.KERNEL_SIZE_MB)
115        self.os_if.run_shell_command(dd_cmd, modifies_device=True)
116
117    def _modify_kernel(self,
118                       section,
119                       delta,
120                       modification_type=KERNEL_BODY_MOD,
121                       key_path=None):
122        """Modify kernel image on a disk partition.
123
124        This method supports three types of kernel modification. KERNEL_BODY_MOD
125        just adds the value of delta to the first byte of the kernel blob.
126        This might leave the kernel corrupted (as required by the test).
127
128        The second type, KERNEL_VERSION_MOD - will use 'delta' as the new
129        version number, it will put it in the kernel header, and then will
130        resign the kernel blob.
131
132        The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in
133        argument key_path. If key_path is None, choose dev_key_path as resign
134        key directory.
135        """
136        self.dump_kernel(section, self.dump_file_name)
137        data = list(self.os_if.read_file(self.dump_file_name))
138        if modification_type == KERNEL_BODY_MOD:
139            data[0] = (data[0] + delta) % 0x100
140            self.os_if.write_file(self.dump_file_name, bytes(data))
141            kernel_to_write = self.dump_file_name
142        elif modification_type == KERNEL_VERSION_MOD:
143            new_version = delta
144            kernel_to_write = self.dump_file_name + '.new'
145            self.os_if.run_shell_command(
146                    'vbutil_kernel --repack %s --version %d '
147                    '--signprivate %s --oldblob %s' %
148                    (kernel_to_write, new_version,
149                     os.path.join(self.dev_key_path,
150                                  self.data_key), self.dump_file_name))
151        elif modification_type == KERNEL_RESIGN_MOD:
152            if key_path and self.os_if.is_dir(key_path):
153                resign_key_path = key_path
154            else:
155                resign_key_path = self.dev_key_path
156
157            kernel_to_write = self.dump_file_name + '.new'
158            self.os_if.run_shell_command(
159                    'vbutil_kernel --repack %s '
160                    '--signprivate %s --oldblob %s --keyblock %s' %
161                    (kernel_to_write,
162                     os.path.join(resign_key_path,
163                                  self.data_key), self.dump_file_name,
164                     os.path.join(resign_key_path, self.keyblock)))
165        else:
166            return  # Unsupported mode, ignore.
167        self.write_kernel(section, kernel_to_write)
168
169    def corrupt_kernel(self, section):
170        """Corrupt a kernel section (add DELTA to the first byte)."""
171        self._modify_kernel(section.upper(), self.DELTA)
172
173    def restore_kernel(self, section):
174        """Restore the previously corrupted kernel."""
175        self._modify_kernel(section.upper(), -self.DELTA)
176
177    def get_version(self, section):
178        """Return version read from this section blob's header."""
179        return self.partition_map[section.upper()]['version']
180
181    def get_datakey_version(self, section):
182        """Return datakey version read from this section blob's header."""
183        return self.partition_map[section.upper()]['datakey_version']
184
185    def get_sha(self, section):
186        """Return the SHA1 hash of the section blob."""
187        s = hashlib.sha1()
188        dev = self.partition_map[section.upper()]['device']
189        s.update(self.os_if.read_file(dev))
190        return s.hexdigest()
191
192    def set_version(self, section, version):
193        """Set version of this kernel blob and re-sign it."""
194        if version < 0:
195            raise KernelHandlerError('Bad version value %d' % version)
196        self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD)
197
198    def resign_kernel(self, section, key_path=None):
199        """Resign kernel with original kernel version and keys in key_path."""
200        self._modify_kernel(section.upper(), self.get_version(section),
201                            KERNEL_RESIGN_MOD, key_path)
202
203    def init(self, dev_key_path='.', internal_disk=True):
204        """Initialize the kernel handler object.
205
206        Input argument is an OS interface object reference.
207        """
208        self.dev_key_path = dev_key_path
209        self.root_dev = self.os_if.get_root_dev()
210        self.dump_file_name = self.os_if.state_dir_file(self.tmp_file_name)
211        self._get_partition_map(internal_disk)
212        self.initialized = True
213