1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Facilities to verify an update bundle.""" 15 16import argparse 17import inspect 18import logging 19from pathlib import Path 20import sys 21from typing import Iterable 22 23from pw_software_update import keys, metadata 24from pw_software_update.tuf_pb2 import ( 25 RootMetadata, 26 SignedRootMetadata, 27 SignedTargetsMetadata, 28 TargetsMetadata, 29) 30from pw_software_update.update_bundle_pb2 import UpdateBundle 31 32_LOG = logging.getLogger(__package__) 33 34 35def log_progress(message: str, indent_offset: int = -5, indent_str: str = ' '): 36 """Logs verification progress. 37 38 The default indent offset is chosen per actual output of 'python -m verify'. 39 """ 40 indentation = 2 * (len(inspect.stack(0)) + indent_offset) 41 _LOG.info('%s%s', indent_str * indentation, message) 42 43 44class VerificationError(Exception): 45 """Raised upon any verification error.""" 46 47 48def lint_root_metadata(root: RootMetadata) -> Iterable[str]: 49 """Checks a RootMetadata for content or format errors. 50 51 Returns: 52 A list of all errors found. 53 """ 54 errors = [] 55 56 # Check role type first-thing to deter chosen-ciphertext attacks. 57 log_progress('Checking role type') 58 if root.common_metadata.role != metadata.RoleType.ROOT.value: 59 errors.append('Role type is not "root"') 60 61 # Check keys database. 62 log_progress('Checking keys database') 63 for entry in root.keys: 64 if not entry.key_id: 65 errors.append('Missing key_id in keys list') 66 elif not entry.key.keyval: 67 errors.append(f'Key {entry.key_id.hex()} does not have a value') 68 elif not entry.key_id == keys.gen_key_id(entry.key): 69 errors.append( 70 f'Key id "{entry.key_id.hex()}" cannot be derived' 71 f'from key content' 72 ) 73 74 # Check root signature requirement. 75 log_progress('Checking root signature requirement') 76 root_sig_req = root.root_signature_requirement 77 if not root_sig_req.threshold: 78 errors.append('Root signature threshold not set') 79 80 if len(root_sig_req.key_ids) < root_sig_req.threshold: 81 errors.append( 82 f'Insufficient root keys: ' 83 f'{len(root_sig_req.key_ids)} < {root_sig_req.threshold}' 84 ) 85 86 for key_id in root_sig_req.key_ids: 87 if key_id not in [km.key_id for km in root.keys]: 88 errors.append(f'Unregistered root key: {key_id.hex()}') 89 90 # Check targets signature requirement. 91 log_progress('Checking targets signature requirement') 92 targets_sig_req = root.targets_signature_requirement 93 if not targets_sig_req.threshold: 94 errors.append('Targets signature threshold not set') 95 96 if len(targets_sig_req.key_ids) < targets_sig_req.threshold: 97 errors.append( 98 f'Insufficient Targets keys: ' 99 f'{len(targets_sig_req.key_ids)} < {targets_sig_req.threshold}' 100 ) 101 102 for key_id in targets_sig_req.key_ids: 103 if key_id not in [km.key_id for km in root.keys]: 104 errors.append(f'Unregistered targets key: {key_id.hex()}') 105 106 # Make sure no two roles share the same key. 107 log_progress('Checking for key sharing') 108 for key_id in targets_sig_req.key_ids: 109 if key_id in root_sig_req.key_ids: 110 errors.append(f'Targets shares the same key: "{key_id.hex()}"') 111 112 return errors 113 114 115def verify_root_metadata_signatures( 116 incoming: SignedRootMetadata, trusted: RootMetadata 117) -> None: 118 """Verifies the signatures of an incoming root metadata. 119 120 Verifies the signatures of an incoming root metadata against signature 121 requirements from the trusted root metadata. 122 123 Raises: 124 VerificationError if `incoming` is incorrectly or insufficiently signed. 125 """ 126 sig_requirement = trusted.root_signature_requirement 127 128 log_progress( 129 f'Total={len(incoming.signatures)}, ' 130 f'threshold={sig_requirement.threshold}' 131 ) 132 good_signature_count = 0 133 for sig in incoming.signatures: 134 if sig.key_id not in sig_requirement.key_ids: 135 continue 136 137 key = None 138 for key_mapping in trusted.keys: 139 if key_mapping.key_id == sig.key_id: 140 key = key_mapping.key 141 break 142 if not key: 143 raise VerificationError(f'Invalid key_id: {sig.key_id.hex()}.') 144 145 if not keys.verify_ecdsa_signature( 146 sig.sig, incoming.serialized_root_metadata, key 147 ): 148 raise VerificationError('Invalid signature, key_id={sig.key_id}.') 149 150 good_signature_count += 1 151 152 log_progress(f'Verified: {good_signature_count}') 153 if good_signature_count < sig_requirement.threshold: 154 raise VerificationError('Not enough good signatures.') 155 156 157def verify_root_metadata( 158 incoming: SignedRootMetadata, trusted: RootMetadata 159) -> bool: 160 """Verifies an incoming root metadata against a trusted root metadata. 161 162 Returns: 163 A boolean flag indicating if the targets metadata has been rotated. 164 165 Raises: 166 VerificationError if the incoming root is incorrectly formatted, 167 insufficiently signed, or rolling back to an older version. 168 """ 169 # Verify the incoming is signed with a threshold of keys specified in the 170 # trusted root metadata. 171 log_progress('Checking signatures against current root') 172 verify_root_metadata_signatures(incoming, trusted) 173 174 # Now that we've verified the signer of the incoming root, check its content 175 # before parsing it to guard against chosen-ciphertext attacks. 176 log_progress('Checking content') 177 lint_errors = lint_root_metadata( 178 RootMetadata.FromString(incoming.serialized_root_metadata) 179 ) 180 if lint_errors: 181 log_progress(f'Lint errors: {lint_errors}') 182 raise VerificationError('Malformed root metadata.') 183 184 # Verify the target is signed with a threshold of keys specified in the 185 # target root metadata. 186 log_progress('Checking signatures against current root') 187 verify_root_metadata_signatures( 188 incoming, RootMetadata.FromString(incoming.serialized_root_metadata) 189 ) 190 191 # Check rollback attack. 192 log_progress('Checking for version rollback') 193 incoming_meta = RootMetadata.FromString(incoming.serialized_root_metadata) 194 new_ver = incoming_meta.common_metadata.version 195 cur_ver = trusted.common_metadata.version 196 if new_ver < cur_ver: 197 raise VerificationError( 198 f'Root metadata version rollback ({cur_ver}->{new_ver}) detected!' 199 ) 200 201 # Any signature requirement change indicates a targets key rotation. 202 new_sig_req = incoming_meta.targets_signature_requirement 203 cur_sig_req = trusted.targets_signature_requirement 204 targets_key_rotated = not ( 205 set(new_sig_req.key_ids) == set(cur_sig_req.key_ids) 206 and new_sig_req.threshold == cur_sig_req.threshold 207 ) 208 log_progress(f'Targets key rotation: {targets_key_rotated}') 209 return targets_key_rotated 210 211 212def lint_targets_metadata(meta: TargetsMetadata) -> Iterable[str]: 213 """Checks a targets metadata for format errors. 214 215 Returns: 216 A list of all errors found. 217 """ 218 errors = [] 219 220 # Always check the role type first to guard against chosen-ciphertext 221 # attacks. 222 log_progress("Checking role type") 223 if meta.common_metadata.role != metadata.RoleType.TARGETS.value: 224 errors.append( 225 f'Role type is not "targets" but "{meta.common_metadata.role}"' 226 ) 227 228 for file in meta.target_files: 229 if not file.file_name: 230 errors.append('Target file missing a name') 231 if not file.hashes: 232 errors.append('Target file missing hashes') 233 234 return errors 235 236 237def verify_targets_metadata( 238 signed: SignedTargetsMetadata, root: RootMetadata 239) -> None: 240 """Verifies a targets metadata is sufficiently signed and well-formed. 241 242 Raises: 243 VerificationError if the targets metadata is insufficiently signed or 244 malformed. 245 """ 246 sig_requirement = root.targets_signature_requirement 247 log_progress( 248 f'Checking signatures: total={len(signed.signatures)}, ' 249 f'threshold={sig_requirement.threshold}' 250 ) 251 good_signatures_count = 0 252 for sig in signed.signatures: 253 # Ignore extraneous signatures. 254 if sig.key_id not in sig_requirement.key_ids: 255 continue 256 257 # Extract the public key associated with sig.key_id. There is one and 258 # only one way to derive a key_id from a key object, which has been 259 # previously verified as part of root metadata verification. 260 key = None 261 for key_mapping in root.keys: 262 if key_mapping.key_id == sig.key_id: 263 key = key_mapping.key 264 break 265 if not key: 266 raise VerificationError( 267 f'No such key_id in root: {sig.key_id.hex()}.' 268 ) 269 270 if not keys.verify_ecdsa_signature( 271 sig=sig.sig, data=signed.serialized_targets_metadata, key=key 272 ): 273 raise VerificationError( 274 f'Invalid signature, key_id={sig.key_id.hex()}.' 275 ) 276 277 good_signatures_count += 1 278 279 log_progress(f'Verified signatures: {good_signatures_count}') 280 if good_signatures_count < sig_requirement.threshold: 281 raise VerificationError( 282 f'Not enough good signatures: {good_signatures_count} < ' 283 f'{sig_requirement.threshold}.' 284 ) 285 286 log_progress('Checking content') 287 lint_errors = lint_targets_metadata( 288 TargetsMetadata.FromString(signed.serialized_targets_metadata) 289 ) 290 if lint_errors: 291 log_progress(f'Lint errors: {lint_errors}') 292 raise VerificationError('Malformed targets metadata.') 293 294 295def verify_bundle(incoming: UpdateBundle, trusted: UpdateBundle) -> None: 296 """Verifies an incoming TUF bundle against metadata in `trusted`. 297 298 Raises VerificationError upon the first verification failure. 299 """ 300 301 # Root metadata in `trusted` is our trust anchor. 302 if not trusted.HasField('root_metadata'): 303 raise VerificationError('Trusted bundle missing root metadata') 304 trusted_root = RootMetadata.FromString( 305 trusted.root_metadata.serialized_root_metadata 306 ) 307 308 # Check the contents of the trusted root metadata. This is optional 309 # in practice as we generally trust what is provisioned in the factory. 310 log_progress('Checking content of the trusted root metadata') 311 lint_errors = lint_root_metadata(trusted_root) 312 if lint_errors: 313 log_progress(f'Lint errors: {lint_errors}') 314 raise VerificationError('Malformed root metadata.') 315 316 # If the incoming bundle includes a root metadata, verify it using the 317 # current trusted root metadata and set the current trusted root to the 318 # new root upon successful verification. 319 320 # Record whether the new root metadata rotates the targets key. This 321 # information is used later to perform or skip targets metadata version 322 # rollback check. 323 targets_key_rotated = False 324 incoming_root = incoming.root_metadata 325 if incoming_root: 326 log_progress('Verifying incoming root metadata') 327 targets_key_rotated = verify_root_metadata( 328 incoming=incoming_root, trusted=trusted_root 329 ) 330 log_progress('Upgrading trust to the incoming root metadata') 331 trusted_root = RootMetadata.FromString( 332 incoming_root.serialized_root_metadata 333 ) 334 335 log_progress('Verifying targets metadata') 336 signed_targets_metadata = incoming.targets_metadata['targets'] 337 verify_targets_metadata(signed_targets_metadata, trusted_root) 338 339 # Unless the targets signing key has been rotated, check for version 340 # rollback attack. 341 targets_metadata = TargetsMetadata.FromString( 342 signed_targets_metadata.serialized_targets_metadata 343 ) 344 if not targets_key_rotated: 345 log_progress('Checking targets metadata for version rollback') 346 new_ver = targets_metadata.common_metadata.version 347 cur_ver = TargetsMetadata.FromString( 348 trusted.targets_metadata['targets'].serialized_targets_metadata 349 ).common_metadata.version 350 if new_ver < cur_ver: 351 raise VerificationError( 352 f'Targets metadata rolling back: {cur_ver} ' f'-> {new_ver}.' 353 ) 354 355 # Verify all files listed in the targets metadata exist along with the 356 # correct sizes and hashes. 357 for file in targets_metadata.target_files: 358 log_progress(f'Verifying target file: "{file.file_name}"') 359 360 payload = incoming.target_payloads[file.file_name] 361 if file.length != len(payload): 362 raise VerificationError( 363 f'Wrong file size for {file.file_name}: ' 364 f'expected: {file.length}, ' 365 f'got: {len(payload)}.' 366 ) 367 368 if not file.hashes: 369 raise VerificationError(f'Missing hashes for: {file.file_name}.') 370 calculated_hashes = metadata.gen_hashes( 371 payload, [h.function for h in file.hashes] 372 ) 373 if list(calculated_hashes) != list(file.hashes): 374 raise VerificationError(f'Mismatched hashes for: {file.file_name}.') 375 376 377def parse_args(): 378 """Parse CLI arguments.""" 379 parser = argparse.ArgumentParser(description=__doc__) 380 381 parser.add_argument( 382 '--incoming', 383 type=Path, 384 required=True, 385 help='Path to the TUF bundle to be verified', 386 ) 387 388 parser.add_argument( 389 '--trusted', 390 type=Path, 391 help=( 392 'Path to the TUF bundle to be trusted; ' 393 'defaults to the value of `--incoming` ' 394 'if unspecified.' 395 ), 396 ) 397 398 return parser.parse_args() 399 400 401def main(incoming: Path, trusted: Path) -> int: 402 """Verifies an incoming TUF bundle against metadata in `trusted`. 403 404 Verifies an incoming TUF bundle against metadata from a given trusted 405 bundle. If `trusted` is not specified, the target bundle itself will 406 be used as the trusted bundle. 407 408 Returns: 409 0 on success, non-zero otherwise. 410 """ 411 log_progress(f'Verifying: {incoming}') 412 incoming_bundle = UpdateBundle.FromString(incoming.read_bytes()) 413 414 is_self_verification = not trusted 415 if is_self_verification: 416 trusted_bundle = incoming_bundle 417 log_progress('(self-verification)') 418 else: 419 trusted_bundle = UpdateBundle.FromString(trusted.read_bytes()) 420 421 try: 422 verify_bundle(incoming_bundle, trusted_bundle) 423 except VerificationError as error: 424 log_progress(f'Verification failed: {error}') 425 return 1 426 427 log_progress('Verification passed.') 428 return 0 429 430 431if __name__ == '__main__': 432 logging.basicConfig(format='%(message)s', level=logging.INFO) 433 exit_code = main(**vars(parse_args())) 434 sys.exit(exit_code) 435