xref: /aosp_15_r20/external/pigweed/pw_software_update/py/pw_software_update/update_bundle.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Generate and serialize update bundles."""
15
16import argparse
17import logging
18import os
19from pathlib import Path
20import shutil
21from typing import Iterable
22
23from pw_software_update import metadata
24from pw_software_update.tuf_pb2 import SignedRootMetadata, SignedTargetsMetadata
25from pw_software_update.update_bundle_pb2 import UpdateBundle
26
27_LOG = logging.getLogger(__package__)
28
29
30def targets_from_directory(
31    root_dir: Path,
32    exclude: Iterable[Path] = tuple(),
33    remap_paths: dict[Path, str] | None = None,
34) -> dict[str, Path]:
35    """Given a directory on dist, generate a dict of target names to files.
36
37    Args:
38      root_dir: Directory to crawl for targets.
39      exclude: Paths relative to root_dir to exclude as targets.
40      remap_paths: Custom target names to use for targets.
41
42    Each file in the input directory will be read in as a target file, unless
43    its path (relative to the TUF repo root) is among the excludes.
44
45    Default behavior is to treat root_dir-relative paths as the strings to use
46    as targets file names, but remapping can be used to change a target file
47    name to any string. If some remappings are provided but a file is found that
48    does not have a remapping, a warning will be logged. If a remap is declared
49    for a file that does not exist, FileNotFoundError will be raised.
50    """
51    if not root_dir.is_dir():
52        raise ValueError(
53            f'Cannot generate TUF targets from {root_dir}; not a directory.'
54        )
55    targets = {}
56    for path in root_dir.glob('**/*'):
57        if path.is_dir():
58            continue
59        rel_path = path.relative_to(root_dir)
60        if rel_path in exclude:
61            continue
62        target_name = str(rel_path.as_posix())
63        if remap_paths:
64            if rel_path in remap_paths:
65                target_name = remap_paths[rel_path]
66            else:
67                _LOG.warning('Some remaps defined, but not "%s"', target_name)
68        targets[target_name] = path
69
70    if remap_paths is not None:
71        for original_path, new_target_file_name in remap_paths.items():
72            if new_target_file_name not in targets:
73                raise FileNotFoundError(
74                    f'Unable to remap "{original_path}" to'
75                    f' "{new_target_file_name}"; file not found in root dir.'
76                )
77
78    return targets
79
80
81def gen_empty_update_bundle(
82    targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION,
83) -> UpdateBundle:
84    """Generates an empty bundle
85
86    Given an optional target metadata version, generates an empty bundle.
87
88    Args:
89      targets_metadata_version: default set to 1
90
91    Returns:
92      UpdateBundle: empty bundle
93    """
94
95    targets_metadata = metadata.gen_targets_metadata(
96        target_payloads={}, version=targets_metadata_version
97    )
98    unsigned_targets_metadata = SignedTargetsMetadata(
99        serialized_targets_metadata=targets_metadata.SerializeToString()
100    )
101
102    return UpdateBundle(
103        root_metadata=None,
104        targets_metadata=dict(targets=unsigned_targets_metadata),
105        target_payloads=None,
106    )
107
108
109def gen_unsigned_update_bundle(
110    targets: dict[Path, str],
111    persist: Path | None = None,
112    targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION,
113    root_metadata: SignedRootMetadata | None = None,
114) -> UpdateBundle:
115    """Given a set of targets, generates an unsigned UpdateBundle.
116
117    Args:
118      targets: A dict mapping payload Paths to their target names.
119      persist: If not None, persist the raw TUF repository to this directory.
120      targets_metadata_version: version number for the targets metadata.
121      root_metadata: Optional signed Root metadata.
122
123    The input targets will be treated as an ephemeral TUF repository for the
124    purposes of building an UpdateBundle instance. This approach differs
125    slightly from the normal concept of a TUF repository, which is typically a
126    directory on disk. For ease in debugging raw repository contents, the
127    `persist` argument can be supplied. If a persist Path is supplied, the TUF
128    repository will be persisted to disk at that location.
129
130    NOTE: If path separator characters (like '/') are used in target names, then
131    persisting the repository to disk via the 'persist' argument will create the
132    corresponding directory structure.
133
134    NOTE: If a root metadata is included, the client is expected to first
135    upgrade its on-device trusted root metadata before verifying the rest of
136    the bundle.
137    """
138    if persist:
139        if persist.exists() and not persist.is_dir():
140            raise ValueError(
141                f'TUF repo cannot be persisted to "{persist}";'
142                ' file exists and is not a directory.'
143            )
144        if persist.exists():
145            shutil.rmtree(persist)
146
147        os.makedirs(persist)
148
149    target_payloads = {}
150    for path, target_name in targets.items():
151        target_payloads[target_name] = path.read_bytes()
152        if persist:
153            target_persist_path = persist / target_name
154            os.makedirs(target_persist_path.parent, exist_ok=True)
155            shutil.copy(path, target_persist_path)
156
157    targets_metadata = metadata.gen_targets_metadata(
158        target_payloads, version=targets_metadata_version
159    )
160    unsigned_targets_metadata = SignedTargetsMetadata(
161        serialized_targets_metadata=targets_metadata.SerializeToString()
162    )
163
164    return UpdateBundle(
165        root_metadata=root_metadata,
166        targets_metadata=dict(targets=unsigned_targets_metadata),
167        target_payloads=target_payloads,
168    )
169
170
171def parse_target_arg(target_arg: str) -> tuple[Path, str]:
172    """Parse an individual target string passed in to the --targets argument.
173
174    Target strings take the following form:
175      "FILE_PATH > TARGET_NAME"
176
177    For example:
178      "fw_images/main_image.bin > main"
179    """
180    try:
181        file_path_str, target_name = target_arg.split('>')
182        return Path(file_path_str.strip()), target_name.strip()
183    except ValueError as err:
184        raise ValueError(
185            'Targets must be strings of the form:\n'
186            '  "FILE_PATH > TARGET_NAME"'
187        ) from err
188
189
190def parse_args() -> argparse.Namespace:
191    """Parse CLI arguments."""
192    parser = argparse.ArgumentParser(description=__doc__)
193    parser.add_argument(
194        '-t',
195        '--targets',
196        type=str,
197        nargs='+',
198        required=True,
199        help='Strings defining targets to bundle',
200    )
201    parser.add_argument(
202        '-o',
203        '--out',
204        type=Path,
205        required=True,
206        help='Output path for serialized UpdateBundle',
207    )
208    parser.add_argument(
209        '--persist',
210        type=Path,
211        default=None,
212        help=(
213            'If provided, TUF repo will be persisted to disk'
214            ' at this path for debugging'
215        ),
216    )
217    parser.add_argument(
218        '--targets-metadata-version',
219        type=int,
220        default=metadata.DEFAULT_METADATA_VERSION,
221        help='Version number for the targets metadata',
222    )
223    parser.add_argument(
224        '--targets-metadata-version-file',
225        type=Path,
226        default=None,
227        help='Read version number string from this file. When '
228        'provided, content of this file supersede '
229        '--targets-metadata-version',
230    )
231    parser.add_argument(
232        '--signed-root-metadata',
233        type=Path,
234        default=None,
235        help='Path to the signed Root metadata',
236    )
237    return parser.parse_args()
238
239
240def main(
241    targets: Iterable[str],
242    out: Path,
243    persist: Path | None = None,
244    targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION,
245    targets_metadata_version_file: Path | None = None,
246    signed_root_metadata: Path | None = None,
247) -> None:
248    """Generates an UpdateBundle and serializes it to disk."""
249    target_dict = {}
250    for target_arg in targets:
251        path, target_name = parse_target_arg(target_arg)
252        target_dict[path] = target_name
253
254    root_metadata = None
255    if signed_root_metadata:
256        root_metadata = SignedRootMetadata.FromString(
257            signed_root_metadata.read_bytes()
258        )
259
260    if targets_metadata_version_file:
261        with targets_metadata_version_file.open() as version_file:
262            targets_metadata_version = int(version_file.read().strip())
263
264    bundle = gen_unsigned_update_bundle(
265        target_dict, persist, targets_metadata_version, root_metadata
266    )
267
268    out.write_bytes(bundle.SerializeToString())
269
270
271if __name__ == '__main__':
272    logging.basicConfig()
273    main(**vars(parse_args()))
274