xref: /aosp_15_r20/external/pigweed/pw_software_update/py/pw_software_update/verify.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Facilities to verify an update bundle."""
15
16import argparse
17import inspect
18import logging
19from pathlib import Path
20import sys
21from typing import Iterable
22
23from pw_software_update import keys, metadata
24from pw_software_update.tuf_pb2 import (
25    RootMetadata,
26    SignedRootMetadata,
27    SignedTargetsMetadata,
28    TargetsMetadata,
29)
30from pw_software_update.update_bundle_pb2 import UpdateBundle
31
32_LOG = logging.getLogger(__package__)
33
34
35def log_progress(message: str, indent_offset: int = -5, indent_str: str = '  '):
36    """Logs verification progress.
37
38    The default indent offset is chosen per actual output of 'python -m verify'.
39    """
40    indentation = 2 * (len(inspect.stack(0)) + indent_offset)
41    _LOG.info('%s%s', indent_str * indentation, message)
42
43
44class VerificationError(Exception):
45    """Raised upon any verification error."""
46
47
48def lint_root_metadata(root: RootMetadata) -> Iterable[str]:
49    """Checks a RootMetadata for content or format errors.
50
51    Returns:
52      A list of all errors found.
53    """
54    errors = []
55
56    # Check role type first-thing to deter chosen-ciphertext attacks.
57    log_progress('Checking role type')
58    if root.common_metadata.role != metadata.RoleType.ROOT.value:
59        errors.append('Role type is not "root"')
60
61    # Check keys database.
62    log_progress('Checking keys database')
63    for entry in root.keys:
64        if not entry.key_id:
65            errors.append('Missing key_id in keys list')
66        elif not entry.key.keyval:
67            errors.append(f'Key {entry.key_id.hex()} does not have a value')
68        elif not entry.key_id == keys.gen_key_id(entry.key):
69            errors.append(
70                f'Key id "{entry.key_id.hex()}" cannot be derived'
71                f'from key content'
72            )
73
74    # Check root signature requirement.
75    log_progress('Checking root signature requirement')
76    root_sig_req = root.root_signature_requirement
77    if not root_sig_req.threshold:
78        errors.append('Root signature threshold not set')
79
80    if len(root_sig_req.key_ids) < root_sig_req.threshold:
81        errors.append(
82            f'Insufficient root keys: '
83            f'{len(root_sig_req.key_ids)} < {root_sig_req.threshold}'
84        )
85
86    for key_id in root_sig_req.key_ids:
87        if key_id not in [km.key_id for km in root.keys]:
88            errors.append(f'Unregistered root key: {key_id.hex()}')
89
90    # Check targets signature requirement.
91    log_progress('Checking targets signature requirement')
92    targets_sig_req = root.targets_signature_requirement
93    if not targets_sig_req.threshold:
94        errors.append('Targets signature threshold not set')
95
96    if len(targets_sig_req.key_ids) < targets_sig_req.threshold:
97        errors.append(
98            f'Insufficient Targets keys: '
99            f'{len(targets_sig_req.key_ids)} < {targets_sig_req.threshold}'
100        )
101
102    for key_id in targets_sig_req.key_ids:
103        if key_id not in [km.key_id for km in root.keys]:
104            errors.append(f'Unregistered targets key: {key_id.hex()}')
105
106    # Make sure no two roles share the same key.
107    log_progress('Checking for key sharing')
108    for key_id in targets_sig_req.key_ids:
109        if key_id in root_sig_req.key_ids:
110            errors.append(f'Targets shares the same key: "{key_id.hex()}"')
111
112    return errors
113
114
115def verify_root_metadata_signatures(
116    incoming: SignedRootMetadata, trusted: RootMetadata
117) -> None:
118    """Verifies the signatures of an incoming root metadata.
119
120    Verifies the signatures of an incoming root metadata against signature
121    requirements from the trusted root metadata.
122
123    Raises:
124      VerificationError if `incoming` is incorrectly or insufficiently signed.
125    """
126    sig_requirement = trusted.root_signature_requirement
127
128    log_progress(
129        f'Total={len(incoming.signatures)}, '
130        f'threshold={sig_requirement.threshold}'
131    )
132    good_signature_count = 0
133    for sig in incoming.signatures:
134        if sig.key_id not in sig_requirement.key_ids:
135            continue
136
137        key = None
138        for key_mapping in trusted.keys:
139            if key_mapping.key_id == sig.key_id:
140                key = key_mapping.key
141                break
142        if not key:
143            raise VerificationError(f'Invalid key_id: {sig.key_id.hex()}.')
144
145        if not keys.verify_ecdsa_signature(
146            sig.sig, incoming.serialized_root_metadata, key
147        ):
148            raise VerificationError('Invalid signature, key_id={sig.key_id}.')
149
150        good_signature_count += 1
151
152    log_progress(f'Verified: {good_signature_count}')
153    if good_signature_count < sig_requirement.threshold:
154        raise VerificationError('Not enough good signatures.')
155
156
157def verify_root_metadata(
158    incoming: SignedRootMetadata, trusted: RootMetadata
159) -> bool:
160    """Verifies an incoming root metadata against a trusted root metadata.
161
162    Returns:
163      A boolean flag indicating if the targets metadata has been rotated.
164
165    Raises:
166      VerificationError if the incoming root is incorrectly formatted,
167      insufficiently signed, or rolling back to an older version.
168    """
169    # Verify the incoming is signed with a threshold of keys specified in the
170    # trusted root metadata.
171    log_progress('Checking signatures against current root')
172    verify_root_metadata_signatures(incoming, trusted)
173
174    # Now that we've verified the signer of the incoming root, check its content
175    # before parsing it to guard against chosen-ciphertext attacks.
176    log_progress('Checking content')
177    lint_errors = lint_root_metadata(
178        RootMetadata.FromString(incoming.serialized_root_metadata)
179    )
180    if lint_errors:
181        log_progress(f'Lint errors: {lint_errors}')
182        raise VerificationError('Malformed root metadata.')
183
184    # Verify the target is signed with a threshold of keys specified in the
185    # target root metadata.
186    log_progress('Checking signatures against current root')
187    verify_root_metadata_signatures(
188        incoming, RootMetadata.FromString(incoming.serialized_root_metadata)
189    )
190
191    # Check rollback attack.
192    log_progress('Checking for version rollback')
193    incoming_meta = RootMetadata.FromString(incoming.serialized_root_metadata)
194    new_ver = incoming_meta.common_metadata.version
195    cur_ver = trusted.common_metadata.version
196    if new_ver < cur_ver:
197        raise VerificationError(
198            f'Root metadata version rollback ({cur_ver}->{new_ver}) detected!'
199        )
200
201    # Any signature requirement change indicates a targets key rotation.
202    new_sig_req = incoming_meta.targets_signature_requirement
203    cur_sig_req = trusted.targets_signature_requirement
204    targets_key_rotated = not (
205        set(new_sig_req.key_ids) == set(cur_sig_req.key_ids)
206        and new_sig_req.threshold == cur_sig_req.threshold
207    )
208    log_progress(f'Targets key rotation: {targets_key_rotated}')
209    return targets_key_rotated
210
211
212def lint_targets_metadata(meta: TargetsMetadata) -> Iterable[str]:
213    """Checks a targets metadata for format errors.
214
215    Returns:
216      A list of all errors found.
217    """
218    errors = []
219
220    # Always check the role type first to guard against chosen-ciphertext
221    # attacks.
222    log_progress("Checking role type")
223    if meta.common_metadata.role != metadata.RoleType.TARGETS.value:
224        errors.append(
225            f'Role type is not "targets" but "{meta.common_metadata.role}"'
226        )
227
228    for file in meta.target_files:
229        if not file.file_name:
230            errors.append('Target file missing a name')
231        if not file.hashes:
232            errors.append('Target file missing hashes')
233
234    return errors
235
236
237def verify_targets_metadata(
238    signed: SignedTargetsMetadata, root: RootMetadata
239) -> None:
240    """Verifies a targets metadata is sufficiently signed and well-formed.
241
242    Raises:
243      VerificationError if the targets metadata is insufficiently signed or
244        malformed.
245    """
246    sig_requirement = root.targets_signature_requirement
247    log_progress(
248        f'Checking signatures: total={len(signed.signatures)}, '
249        f'threshold={sig_requirement.threshold}'
250    )
251    good_signatures_count = 0
252    for sig in signed.signatures:
253        # Ignore extraneous signatures.
254        if sig.key_id not in sig_requirement.key_ids:
255            continue
256
257        # Extract the public key associated with sig.key_id. There is one and
258        # only one way to derive a key_id from a key object, which has been
259        # previously verified as part of root metadata verification.
260        key = None
261        for key_mapping in root.keys:
262            if key_mapping.key_id == sig.key_id:
263                key = key_mapping.key
264                break
265        if not key:
266            raise VerificationError(
267                f'No such key_id in root: {sig.key_id.hex()}.'
268            )
269
270        if not keys.verify_ecdsa_signature(
271            sig=sig.sig, data=signed.serialized_targets_metadata, key=key
272        ):
273            raise VerificationError(
274                f'Invalid signature, key_id={sig.key_id.hex()}.'
275            )
276
277        good_signatures_count += 1
278
279    log_progress(f'Verified signatures: {good_signatures_count}')
280    if good_signatures_count < sig_requirement.threshold:
281        raise VerificationError(
282            f'Not enough good signatures: {good_signatures_count} < '
283            f'{sig_requirement.threshold}.'
284        )
285
286    log_progress('Checking content')
287    lint_errors = lint_targets_metadata(
288        TargetsMetadata.FromString(signed.serialized_targets_metadata)
289    )
290    if lint_errors:
291        log_progress(f'Lint errors: {lint_errors}')
292        raise VerificationError('Malformed targets metadata.')
293
294
295def verify_bundle(incoming: UpdateBundle, trusted: UpdateBundle) -> None:
296    """Verifies an incoming TUF bundle against metadata in `trusted`.
297
298    Raises VerificationError upon the first verification failure.
299    """
300
301    # Root metadata in `trusted` is our trust anchor.
302    if not trusted.HasField('root_metadata'):
303        raise VerificationError('Trusted bundle missing root metadata')
304    trusted_root = RootMetadata.FromString(
305        trusted.root_metadata.serialized_root_metadata
306    )
307
308    # Check the contents of the trusted root metadata. This is optional
309    # in practice as we generally trust what is provisioned in the factory.
310    log_progress('Checking content of the trusted root metadata')
311    lint_errors = lint_root_metadata(trusted_root)
312    if lint_errors:
313        log_progress(f'Lint errors: {lint_errors}')
314        raise VerificationError('Malformed root metadata.')
315
316    # If the incoming bundle includes a root metadata, verify it using the
317    # current trusted root metadata and set the current trusted root to the
318    # new root upon successful verification.
319
320    # Record whether the new root metadata rotates the targets key. This
321    # information is used later to perform or skip targets metadata version
322    # rollback check.
323    targets_key_rotated = False
324    incoming_root = incoming.root_metadata
325    if incoming_root:
326        log_progress('Verifying incoming root metadata')
327        targets_key_rotated = verify_root_metadata(
328            incoming=incoming_root, trusted=trusted_root
329        )
330        log_progress('Upgrading trust to the incoming root metadata')
331        trusted_root = RootMetadata.FromString(
332            incoming_root.serialized_root_metadata
333        )
334
335    log_progress('Verifying targets metadata')
336    signed_targets_metadata = incoming.targets_metadata['targets']
337    verify_targets_metadata(signed_targets_metadata, trusted_root)
338
339    # Unless the targets signing key has been rotated, check for version
340    # rollback attack.
341    targets_metadata = TargetsMetadata.FromString(
342        signed_targets_metadata.serialized_targets_metadata
343    )
344    if not targets_key_rotated:
345        log_progress('Checking targets metadata for version rollback')
346        new_ver = targets_metadata.common_metadata.version
347        cur_ver = TargetsMetadata.FromString(
348            trusted.targets_metadata['targets'].serialized_targets_metadata
349        ).common_metadata.version
350        if new_ver < cur_ver:
351            raise VerificationError(
352                f'Targets metadata rolling back: {cur_ver} ' f'-> {new_ver}.'
353            )
354
355    # Verify all files listed in the targets metadata exist along with the
356    # correct sizes and hashes.
357    for file in targets_metadata.target_files:
358        log_progress(f'Verifying target file: "{file.file_name}"')
359
360        payload = incoming.target_payloads[file.file_name]
361        if file.length != len(payload):
362            raise VerificationError(
363                f'Wrong file size for {file.file_name}: '
364                f'expected: {file.length}, '
365                f'got: {len(payload)}.'
366            )
367
368        if not file.hashes:
369            raise VerificationError(f'Missing hashes for: {file.file_name}.')
370        calculated_hashes = metadata.gen_hashes(
371            payload, [h.function for h in file.hashes]
372        )
373        if list(calculated_hashes) != list(file.hashes):
374            raise VerificationError(f'Mismatched hashes for: {file.file_name}.')
375
376
377def parse_args():
378    """Parse CLI arguments."""
379    parser = argparse.ArgumentParser(description=__doc__)
380
381    parser.add_argument(
382        '--incoming',
383        type=Path,
384        required=True,
385        help='Path to the TUF bundle to be verified',
386    )
387
388    parser.add_argument(
389        '--trusted',
390        type=Path,
391        help=(
392            'Path to the TUF bundle to be trusted; '
393            'defaults to the value of `--incoming` '
394            'if unspecified.'
395        ),
396    )
397
398    return parser.parse_args()
399
400
401def main(incoming: Path, trusted: Path) -> int:
402    """Verifies an incoming TUF bundle against metadata in `trusted`.
403
404    Verifies an incoming TUF bundle against metadata from a given trusted
405    bundle. If `trusted` is not specified, the target bundle itself will
406    be used as the trusted bundle.
407
408    Returns:
409      0 on success, non-zero otherwise.
410    """
411    log_progress(f'Verifying: {incoming}')
412    incoming_bundle = UpdateBundle.FromString(incoming.read_bytes())
413
414    is_self_verification = not trusted
415    if is_self_verification:
416        trusted_bundle = incoming_bundle
417        log_progress('(self-verification)')
418    else:
419        trusted_bundle = UpdateBundle.FromString(trusted.read_bytes())
420
421    try:
422        verify_bundle(incoming_bundle, trusted_bundle)
423    except VerificationError as error:
424        log_progress(f'Verification failed: {error}')
425        return 1
426
427    log_progress('Verification passed.')
428    return 0
429
430
431if __name__ == '__main__':
432    logging.basicConfig(format='%(message)s', level=logging.INFO)
433    exit_code = main(**vars(parse_args()))
434    sys.exit(exit_code)
435