1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Generate and serialize update bundles.""" 15 16import argparse 17import logging 18import os 19from pathlib import Path 20import shutil 21from typing import Iterable 22 23from pw_software_update import metadata 24from pw_software_update.tuf_pb2 import SignedRootMetadata, SignedTargetsMetadata 25from pw_software_update.update_bundle_pb2 import UpdateBundle 26 27_LOG = logging.getLogger(__package__) 28 29 30def targets_from_directory( 31 root_dir: Path, 32 exclude: Iterable[Path] = tuple(), 33 remap_paths: dict[Path, str] | None = None, 34) -> dict[str, Path]: 35 """Given a directory on dist, generate a dict of target names to files. 36 37 Args: 38 root_dir: Directory to crawl for targets. 39 exclude: Paths relative to root_dir to exclude as targets. 40 remap_paths: Custom target names to use for targets. 41 42 Each file in the input directory will be read in as a target file, unless 43 its path (relative to the TUF repo root) is among the excludes. 44 45 Default behavior is to treat root_dir-relative paths as the strings to use 46 as targets file names, but remapping can be used to change a target file 47 name to any string. If some remappings are provided but a file is found that 48 does not have a remapping, a warning will be logged. If a remap is declared 49 for a file that does not exist, FileNotFoundError will be raised. 50 """ 51 if not root_dir.is_dir(): 52 raise ValueError( 53 f'Cannot generate TUF targets from {root_dir}; not a directory.' 54 ) 55 targets = {} 56 for path in root_dir.glob('**/*'): 57 if path.is_dir(): 58 continue 59 rel_path = path.relative_to(root_dir) 60 if rel_path in exclude: 61 continue 62 target_name = str(rel_path.as_posix()) 63 if remap_paths: 64 if rel_path in remap_paths: 65 target_name = remap_paths[rel_path] 66 else: 67 _LOG.warning('Some remaps defined, but not "%s"', target_name) 68 targets[target_name] = path 69 70 if remap_paths is not None: 71 for original_path, new_target_file_name in remap_paths.items(): 72 if new_target_file_name not in targets: 73 raise FileNotFoundError( 74 f'Unable to remap "{original_path}" to' 75 f' "{new_target_file_name}"; file not found in root dir.' 76 ) 77 78 return targets 79 80 81def gen_empty_update_bundle( 82 targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION, 83) -> UpdateBundle: 84 """Generates an empty bundle 85 86 Given an optional target metadata version, generates an empty bundle. 87 88 Args: 89 targets_metadata_version: default set to 1 90 91 Returns: 92 UpdateBundle: empty bundle 93 """ 94 95 targets_metadata = metadata.gen_targets_metadata( 96 target_payloads={}, version=targets_metadata_version 97 ) 98 unsigned_targets_metadata = SignedTargetsMetadata( 99 serialized_targets_metadata=targets_metadata.SerializeToString() 100 ) 101 102 return UpdateBundle( 103 root_metadata=None, 104 targets_metadata=dict(targets=unsigned_targets_metadata), 105 target_payloads=None, 106 ) 107 108 109def gen_unsigned_update_bundle( 110 targets: dict[Path, str], 111 persist: Path | None = None, 112 targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION, 113 root_metadata: SignedRootMetadata | None = None, 114) -> UpdateBundle: 115 """Given a set of targets, generates an unsigned UpdateBundle. 116 117 Args: 118 targets: A dict mapping payload Paths to their target names. 119 persist: If not None, persist the raw TUF repository to this directory. 120 targets_metadata_version: version number for the targets metadata. 121 root_metadata: Optional signed Root metadata. 122 123 The input targets will be treated as an ephemeral TUF repository for the 124 purposes of building an UpdateBundle instance. This approach differs 125 slightly from the normal concept of a TUF repository, which is typically a 126 directory on disk. For ease in debugging raw repository contents, the 127 `persist` argument can be supplied. If a persist Path is supplied, the TUF 128 repository will be persisted to disk at that location. 129 130 NOTE: If path separator characters (like '/') are used in target names, then 131 persisting the repository to disk via the 'persist' argument will create the 132 corresponding directory structure. 133 134 NOTE: If a root metadata is included, the client is expected to first 135 upgrade its on-device trusted root metadata before verifying the rest of 136 the bundle. 137 """ 138 if persist: 139 if persist.exists() and not persist.is_dir(): 140 raise ValueError( 141 f'TUF repo cannot be persisted to "{persist}";' 142 ' file exists and is not a directory.' 143 ) 144 if persist.exists(): 145 shutil.rmtree(persist) 146 147 os.makedirs(persist) 148 149 target_payloads = {} 150 for path, target_name in targets.items(): 151 target_payloads[target_name] = path.read_bytes() 152 if persist: 153 target_persist_path = persist / target_name 154 os.makedirs(target_persist_path.parent, exist_ok=True) 155 shutil.copy(path, target_persist_path) 156 157 targets_metadata = metadata.gen_targets_metadata( 158 target_payloads, version=targets_metadata_version 159 ) 160 unsigned_targets_metadata = SignedTargetsMetadata( 161 serialized_targets_metadata=targets_metadata.SerializeToString() 162 ) 163 164 return UpdateBundle( 165 root_metadata=root_metadata, 166 targets_metadata=dict(targets=unsigned_targets_metadata), 167 target_payloads=target_payloads, 168 ) 169 170 171def parse_target_arg(target_arg: str) -> tuple[Path, str]: 172 """Parse an individual target string passed in to the --targets argument. 173 174 Target strings take the following form: 175 "FILE_PATH > TARGET_NAME" 176 177 For example: 178 "fw_images/main_image.bin > main" 179 """ 180 try: 181 file_path_str, target_name = target_arg.split('>') 182 return Path(file_path_str.strip()), target_name.strip() 183 except ValueError as err: 184 raise ValueError( 185 'Targets must be strings of the form:\n' 186 ' "FILE_PATH > TARGET_NAME"' 187 ) from err 188 189 190def parse_args() -> argparse.Namespace: 191 """Parse CLI arguments.""" 192 parser = argparse.ArgumentParser(description=__doc__) 193 parser.add_argument( 194 '-t', 195 '--targets', 196 type=str, 197 nargs='+', 198 required=True, 199 help='Strings defining targets to bundle', 200 ) 201 parser.add_argument( 202 '-o', 203 '--out', 204 type=Path, 205 required=True, 206 help='Output path for serialized UpdateBundle', 207 ) 208 parser.add_argument( 209 '--persist', 210 type=Path, 211 default=None, 212 help=( 213 'If provided, TUF repo will be persisted to disk' 214 ' at this path for debugging' 215 ), 216 ) 217 parser.add_argument( 218 '--targets-metadata-version', 219 type=int, 220 default=metadata.DEFAULT_METADATA_VERSION, 221 help='Version number for the targets metadata', 222 ) 223 parser.add_argument( 224 '--targets-metadata-version-file', 225 type=Path, 226 default=None, 227 help='Read version number string from this file. When ' 228 'provided, content of this file supersede ' 229 '--targets-metadata-version', 230 ) 231 parser.add_argument( 232 '--signed-root-metadata', 233 type=Path, 234 default=None, 235 help='Path to the signed Root metadata', 236 ) 237 return parser.parse_args() 238 239 240def main( 241 targets: Iterable[str], 242 out: Path, 243 persist: Path | None = None, 244 targets_metadata_version: int = metadata.DEFAULT_METADATA_VERSION, 245 targets_metadata_version_file: Path | None = None, 246 signed_root_metadata: Path | None = None, 247) -> None: 248 """Generates an UpdateBundle and serializes it to disk.""" 249 target_dict = {} 250 for target_arg in targets: 251 path, target_name = parse_target_arg(target_arg) 252 target_dict[path] = target_name 253 254 root_metadata = None 255 if signed_root_metadata: 256 root_metadata = SignedRootMetadata.FromString( 257 signed_root_metadata.read_bytes() 258 ) 259 260 if targets_metadata_version_file: 261 with targets_metadata_version_file.open() as version_file: 262 targets_metadata_version = int(version_file.read().strip()) 263 264 bundle = gen_unsigned_update_bundle( 265 target_dict, persist, targets_metadata_version, root_metadata 266 ) 267 268 out.write_bytes(bundle.SerializeToString()) 269 270 271if __name__ == '__main__': 272 logging.basicConfig() 273 main(**vars(parse_args())) 274