1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module containing kernel handler class used by SAFT.""" 5 6import hashlib 7import os 8import re 9 10# Types of kernel modifications. 11KERNEL_BODY_MOD = 1 12KERNEL_VERSION_MOD = 2 13KERNEL_RESIGN_MOD = 3 14 15 16class KernelHandlerError(Exception): 17 """KernelHandler-specific exception.""" 18 pass 19 20 21class KernelHandler(object): 22 """An object to provide ChromeOS kernel related actions. 23 24 Mostly it allows to corrupt and restore a particular kernel partition 25 (designated by the partition name, A or B. 26 27 @type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface 28 @param is_minios: True if it is a MiniOS kernel; otherwise, False. 29 """ 30 31 # This value is used to alter contents of a byte in the appropriate kernel 32 # image. First added to corrupt the image, then subtracted to restore the 33 # image. 34 DELTA = 1 35 36 # The maximum kernel size in MB. 37 KERNEL_SIZE_MB = 16 38 39 def __init__(self, os_if, is_minios=False): 40 self.os_if = os_if 41 self.dump_file_name = None 42 self.partition_map = {} 43 self.root_dev = None 44 self.initialized = False 45 if is_minios: 46 self.kernel_type = 'MINIOS' 47 self.data_key = 'minios_kernel_data_key.vbprivk' 48 self.keyblock = 'minios_kernel.keyblock' 49 else: 50 self.kernel_type = 'KERN' 51 self.data_key = 'kernel_data_key.vbprivk' 52 self.keyblock = 'kernel.keyblock' 53 self.tmp_file_name = 'kernel_header_dump_%s' % self.kernel_type 54 55 def _get_version(self, device): 56 """Get version of the kernel hosted on the passed in partition.""" 57 # 16 K should be enough to include headers and keys 58 data = self.os_if.read_partition(device, 0x4000) 59 return self.os_if.retrieve_body_version(data) 60 61 def _get_datakey_version(self, device): 62 """Get datakey version of kernel hosted on the passed in partition.""" 63 # 16 K should be enought to include headers and keys 64 data = self.os_if.read_partition(device, 0x4000) 65 return self.os_if.retrieve_datakey_version(data) 66 67 def _get_partition_map(self, internal_disk=True): 68 """Scan `cgpt show <device> output to find kernel devices. 69 70 Args: 71 internal_disk - decide whether to use internal kernel disk. 72 """ 73 if internal_disk: 74 target_device = self.os_if.get_internal_disk( 75 self.os_if.get_root_part()) 76 else: 77 target_device = self.root_dev 78 79 kernel_partitions = re.compile('%s-([AB])' % self.kernel_type) 80 disk_map = self.os_if.run_shell_command_get_output( 81 'cgpt show %s' % target_device) 82 83 for line in disk_map: 84 matched_line = kernel_partitions.search(line) 85 if not matched_line: 86 continue 87 label = matched_line.group(1) 88 part_info = {} 89 device = self.os_if.join_part(target_device, line.split()[2]) 90 part_info['device'] = device 91 part_info['version'] = self._get_version(device) 92 part_info['datakey_version'] = self._get_datakey_version(device) 93 self.partition_map[label] = part_info 94 95 def dump_kernel(self, section, kernel_path): 96 """Dump the specified kernel to a file. 97 98 @param section: The kernel to dump. May be A or B. 99 @param kernel_path: The path to the kernel image. 100 """ 101 dev = self.partition_map[section.upper()]['device'] 102 cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path, 103 self.KERNEL_SIZE_MB) 104 self.os_if.run_shell_command(cmd) 105 106 def write_kernel(self, section, kernel_path): 107 """Write a kernel image to the specified section. 108 109 @param section: The kernel to write. May be A or B. 110 @param kernel_path: The path to the kernel image to write. 111 """ 112 dev = self.partition_map[section.upper()]['device'] 113 dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev, 114 self.KERNEL_SIZE_MB) 115 self.os_if.run_shell_command(dd_cmd, modifies_device=True) 116 117 def _modify_kernel(self, 118 section, 119 delta, 120 modification_type=KERNEL_BODY_MOD, 121 key_path=None): 122 """Modify kernel image on a disk partition. 123 124 This method supports three types of kernel modification. KERNEL_BODY_MOD 125 just adds the value of delta to the first byte of the kernel blob. 126 This might leave the kernel corrupted (as required by the test). 127 128 The second type, KERNEL_VERSION_MOD - will use 'delta' as the new 129 version number, it will put it in the kernel header, and then will 130 resign the kernel blob. 131 132 The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in 133 argument key_path. If key_path is None, choose dev_key_path as resign 134 key directory. 135 """ 136 self.dump_kernel(section, self.dump_file_name) 137 data = list(self.os_if.read_file(self.dump_file_name)) 138 if modification_type == KERNEL_BODY_MOD: 139 data[0] = (data[0] + delta) % 0x100 140 self.os_if.write_file(self.dump_file_name, bytes(data)) 141 kernel_to_write = self.dump_file_name 142 elif modification_type == KERNEL_VERSION_MOD: 143 new_version = delta 144 kernel_to_write = self.dump_file_name + '.new' 145 self.os_if.run_shell_command( 146 'vbutil_kernel --repack %s --version %d ' 147 '--signprivate %s --oldblob %s' % 148 (kernel_to_write, new_version, 149 os.path.join(self.dev_key_path, 150 self.data_key), self.dump_file_name)) 151 elif modification_type == KERNEL_RESIGN_MOD: 152 if key_path and self.os_if.is_dir(key_path): 153 resign_key_path = key_path 154 else: 155 resign_key_path = self.dev_key_path 156 157 kernel_to_write = self.dump_file_name + '.new' 158 self.os_if.run_shell_command( 159 'vbutil_kernel --repack %s ' 160 '--signprivate %s --oldblob %s --keyblock %s' % 161 (kernel_to_write, 162 os.path.join(resign_key_path, 163 self.data_key), self.dump_file_name, 164 os.path.join(resign_key_path, self.keyblock))) 165 else: 166 return # Unsupported mode, ignore. 167 self.write_kernel(section, kernel_to_write) 168 169 def corrupt_kernel(self, section): 170 """Corrupt a kernel section (add DELTA to the first byte).""" 171 self._modify_kernel(section.upper(), self.DELTA) 172 173 def restore_kernel(self, section): 174 """Restore the previously corrupted kernel.""" 175 self._modify_kernel(section.upper(), -self.DELTA) 176 177 def get_version(self, section): 178 """Return version read from this section blob's header.""" 179 return self.partition_map[section.upper()]['version'] 180 181 def get_datakey_version(self, section): 182 """Return datakey version read from this section blob's header.""" 183 return self.partition_map[section.upper()]['datakey_version'] 184 185 def get_sha(self, section): 186 """Return the SHA1 hash of the section blob.""" 187 s = hashlib.sha1() 188 dev = self.partition_map[section.upper()]['device'] 189 s.update(self.os_if.read_file(dev)) 190 return s.hexdigest() 191 192 def set_version(self, section, version): 193 """Set version of this kernel blob and re-sign it.""" 194 if version < 0: 195 raise KernelHandlerError('Bad version value %d' % version) 196 self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD) 197 198 def resign_kernel(self, section, key_path=None): 199 """Resign kernel with original kernel version and keys in key_path.""" 200 self._modify_kernel(section.upper(), self.get_version(section), 201 KERNEL_RESIGN_MOD, key_path) 202 203 def init(self, dev_key_path='.', internal_disk=True): 204 """Initialize the kernel handler object. 205 206 Input argument is an OS interface object reference. 207 """ 208 self.dev_key_path = dev_key_path 209 self.root_dev = self.os_if.get_root_dev() 210 self.dump_file_name = self.os_if.state_dir_file(self.tmp_file_name) 211 self._get_partition_map(internal_disk) 212 self.initialized = True 213