xref: /aosp_15_r20/build/make/tools/releasetools/add_img_to_target_files.py (revision 9e94795a3d4ef5c1d47486f9a02bb378756cea8a)
1#!/usr/bin/env python
2#
3# Copyright (C) 2014 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Given a target-files zipfile that does not contain images (ie, does
19not have an IMAGES/ top-level subdirectory), produce the images and
20add them to the zipfile.
21
22Usage:  add_img_to_target_files [flag] target_files
23
24  -a  (--add_missing)
25      Build and add missing images to "IMAGES/". If this option is
26      not specified, this script will simply exit when "IMAGES/"
27      directory exists in the target file.
28
29  -r  (--rebuild_recovery)
30      Rebuild the recovery patch and write it to the system image. Only
31      meaningful when system image needs to be rebuilt and there're separate
32      boot / recovery images.
33
34  --replace_verity_private_key
35      Replace the private key used for verity signing. (same as the option
36      in sign_target_files_apks)
37
38  --replace_verity_public_key
39       Replace the certificate (public key) used for verity verification. (same
40       as the option in sign_target_files_apks)
41
42  --is_signing
43      Skip building & adding the images for "userdata" and "cache" if we
44      are signing the target files.
45
46  --avb-resolve-rollback-index-location-conflict
47      If provided, resolve the conflict AVB rollback index location when
48      necessary.
49"""
50
51from __future__ import print_function
52
53import avbtool
54import datetime
55import logging
56import os
57import shlex
58import shutil
59import stat
60import sys
61import uuid
62import tempfile
63import zipfile
64
65import build_image
66import build_super_image
67import common
68import verity_utils
69import ota_metadata_pb2
70import rangelib
71import sparse_img
72from concurrent.futures import ThreadPoolExecutor
73from apex_utils import GetApexInfoFromTargetFiles
74from common import ZipDelete, PARTITIONS_WITH_CARE_MAP, ExternalError, RunAndCheckOutput, IsSparseImage, MakeTempFile, ZipWrite
75from build_image import FIXED_FILE_TIMESTAMP
76
77if sys.hexversion < 0x02070000:
78  print("Python 2.7 or newer is required.", file=sys.stderr)
79  sys.exit(1)
80
81logger = logging.getLogger(__name__)
82
83OPTIONS = common.OPTIONS
84OPTIONS.add_missing = False
85OPTIONS.rebuild_recovery = False
86OPTIONS.replace_updated_files_list = []
87OPTIONS.is_signing = False
88OPTIONS.avb_resolve_rollback_index_location_conflict = False
89
90
91def ParseAvbFooter(img_path) -> avbtool.AvbFooter:
92  with open(img_path, 'rb') as fp:
93    fp.seek(-avbtool.AvbFooter.SIZE, os.SEEK_END)
94    data = fp.read(avbtool.AvbFooter.SIZE)
95    return avbtool.AvbFooter(data)
96
97
98def GetCareMap(which, imgname):
99  """Returns the care_map string for the given partition.
100
101  Args:
102    which: The partition name, must be listed in PARTITIONS_WITH_CARE_MAP.
103    imgname: The filename of the image.
104
105  Returns:
106    (which, care_map_ranges): care_map_ranges is the raw string of the care_map
107    RangeSet; or None.
108  """
109  assert which in PARTITIONS_WITH_CARE_MAP
110
111  is_sparse_img = IsSparseImage(imgname)
112  unsparsed_image_size = os.path.getsize(imgname)
113
114  # A verified image contains original image + hash tree data + FEC data
115  # + AVB footer, all concatenated together. The caremap specifies a range
116  # of blocks that update_verifier should read on top of dm-verity device
117  # to verify correctness of OTA updates. When reading off of dm-verity device,
118  # the hashtree and FEC part of image isn't available. So caremap should
119  # only contain the original image blocks.
120  try:
121    avbfooter = None
122    if is_sparse_img:
123      with tempfile.NamedTemporaryFile() as tmpfile:
124        img = sparse_img.SparseImage(imgname)
125        unsparsed_image_size = img.total_blocks * img.blocksize
126        for data in img.ReadBlocks(img.total_blocks - 1, 1):
127          tmpfile.write(data)
128        tmpfile.flush()
129        avbfooter = ParseAvbFooter(tmpfile.name)
130    else:
131      avbfooter = ParseAvbFooter(imgname)
132  except LookupError as e:
133    logger.warning(
134        "Failed to parse avbfooter for partition %s image %s, %s", which, imgname, e)
135    return None
136
137  image_size = avbfooter.original_image_size
138  assert image_size < unsparsed_image_size, f"AVB footer's original image size {image_size} is larger than or equal to image size on disk {unsparsed_image_size}, this can't happen because a verified image = original image + hash tree data + FEC data + avbfooter."
139  assert image_size > 0
140
141  image_blocks = int(image_size) // 4096 - 1
142  # It's OK for image_blocks to be 0, because care map ranges are inclusive.
143  # So 0-0 means "just block 0", which is valid.
144  assert image_blocks >= 0, "blocks for {} must be non-negative, image size: {}".format(
145      which, image_size)
146
147  # For sparse images, we will only check the blocks that are listed in the care
148  # map, i.e. the ones with meaningful data.
149  if is_sparse_img:
150    simg = sparse_img.SparseImage(imgname)
151    care_map_ranges = simg.care_map.intersect(
152        rangelib.RangeSet("0-{}".format(image_blocks)))
153
154  # Otherwise for non-sparse images, we read all the blocks in the filesystem
155  # image.
156  else:
157    care_map_ranges = rangelib.RangeSet("0-{}".format(image_blocks))
158
159  return [which, care_map_ranges.to_string_raw()]
160
161
162def AddCareMapForAbOta(output_file, ab_partitions, image_paths):
163  """Generates and adds care_map.pb for a/b partition that has care_map.
164
165  Args:
166    output_file: The output zip file (needs to be already open),
167        or file path to write care_map.pb.
168    ab_partitions: The list of A/B partitions.
169    image_paths: A map from the partition name to the image path.
170  """
171  if not output_file:
172    raise ExternalError('Expected output_file for AddCareMapForAbOta')
173
174  care_map_list = []
175  for partition in ab_partitions:
176    partition = partition.strip()
177    if partition not in PARTITIONS_WITH_CARE_MAP:
178      continue
179
180    verity_block_device = "{}_verity_block_device".format(partition)
181    avb_hashtree_enable = "avb_{}_hashtree_enable".format(partition)
182    if (verity_block_device in OPTIONS.info_dict or
183            OPTIONS.info_dict.get(avb_hashtree_enable) == "true"):
184      if partition not in image_paths:
185        logger.warning('Potential partition with care_map missing from images: %s',
186                       partition)
187        continue
188      image_path = image_paths[partition]
189      if not os.path.exists(image_path):
190        raise ExternalError('Expected image at path {}'.format(image_path))
191
192      care_map = GetCareMap(partition, image_path)
193      if not care_map:
194        continue
195      care_map_list += care_map
196
197      # adds fingerprint field to the care_map
198      # TODO(xunchang) revisit the fingerprint calculation for care_map.
199      partition_props = OPTIONS.info_dict.get(partition + ".build.prop")
200      prop_name_list = ["ro.{}.build.fingerprint".format(partition),
201                        "ro.{}.build.thumbprint".format(partition)]
202
203      present_props = [x for x in prop_name_list if
204                       partition_props and partition_props.GetProp(x)]
205      if not present_props:
206        logger.warning(
207            "fingerprint is not present for partition %s", partition)
208        property_id, fingerprint = "unknown", "unknown"
209      else:
210        property_id = present_props[0]
211        fingerprint = partition_props.GetProp(property_id)
212      care_map_list += [property_id, fingerprint]
213
214  if not care_map_list:
215    return
216
217  # Converts the list into proto buf message by calling care_map_generator; and
218  # writes the result to a temp file.
219  temp_care_map_text = MakeTempFile(prefix="caremap_text-",
220                                           suffix=".txt")
221  with open(temp_care_map_text, 'w') as text_file:
222    text_file.write('\n'.join(care_map_list))
223
224  temp_care_map = MakeTempFile(prefix="caremap-", suffix=".pb")
225  care_map_gen_cmd = ["care_map_generator", temp_care_map_text, temp_care_map]
226  RunAndCheckOutput(care_map_gen_cmd)
227
228  if not isinstance(output_file, zipfile.ZipFile):
229    shutil.copy(temp_care_map, output_file)
230    return
231  # output_file is a zip file
232  care_map_path = "META/care_map.pb"
233  if care_map_path in output_file.namelist():
234    # Copy the temp file into the OPTIONS.input_tmp dir and update the
235    # replace_updated_files_list used by add_img_to_target_files
236    if not OPTIONS.replace_updated_files_list:
237      OPTIONS.replace_updated_files_list = []
238    shutil.copy(temp_care_map, os.path.join(OPTIONS.input_tmp, care_map_path))
239    OPTIONS.replace_updated_files_list.append(care_map_path)
240  else:
241    ZipWrite(output_file, temp_care_map, arcname=care_map_path)
242
243
244class OutputFile(object):
245  """A helper class to write a generated file to the given dir or zip.
246
247  When generating images, we want the outputs to go into the given zip file, or
248  the given dir.
249
250  Attributes:
251    name: The name of the output file, regardless of the final destination.
252  """
253
254  def __init__(self, output_zip, input_dir, *args):
255    # We write the intermediate output file under the given input_dir, even if
256    # the final destination is a zip archive.
257    self.name = os.path.join(input_dir, *args)
258    self._output_zip = output_zip
259    if self._output_zip:
260      self._zip_name = os.path.join(*args)
261
262  def Write(self, compress_type=None):
263    if self._output_zip:
264      common.ZipWrite(self._output_zip, self.name,
265                      self._zip_name, compress_type=compress_type)
266
267
268def AddSystem(output_zip, recovery_img=None, boot_img=None):
269  """Turn the contents of SYSTEM into a system image and store it in
270  output_zip. Returns the name of the system image file."""
271
272  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system.img")
273  if os.path.exists(img.name):
274    logger.info("system.img already exists; no need to rebuild...")
275    return img.name
276
277  def output_sink(fn, data):
278    output_file = os.path.join(OPTIONS.input_tmp, "SYSTEM", fn)
279    with open(output_file, "wb") as ofile:
280      ofile.write(data)
281
282    if output_zip:
283      arc_name = "SYSTEM/" + fn
284      if arc_name in output_zip.namelist():
285        OPTIONS.replace_updated_files_list.append(arc_name)
286      else:
287        common.ZipWrite(output_zip, output_file, arc_name)
288
289  board_uses_vendorimage = OPTIONS.info_dict.get(
290      "board_uses_vendorimage") == "true"
291
292  if (OPTIONS.rebuild_recovery and not board_uses_vendorimage and
293          recovery_img is not None and boot_img is not None):
294    logger.info("Building new recovery patch on system at system/vendor")
295    common.MakeRecoveryPatch(OPTIONS.input_tmp, output_sink, recovery_img,
296                             boot_img, info_dict=OPTIONS.info_dict)
297
298  block_list = OutputFile(output_zip, OPTIONS.input_tmp,
299                          "IMAGES", "system.map")
300  CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "system", img,
301              block_list=block_list)
302  return img.name
303
304
305def AddSystemOther(output_zip):
306  """Turn the contents of SYSTEM_OTHER into a system_other image
307  and store it in output_zip."""
308
309  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system_other.img")
310  if os.path.exists(img.name):
311    logger.info("system_other.img already exists; no need to rebuild...")
312    return
313
314  CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "system_other", img)
315
316
317def AddVendor(output_zip, recovery_img=None, boot_img=None):
318  """Turn the contents of VENDOR into a vendor image and store in it
319  output_zip."""
320
321  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "vendor.img")
322  if os.path.exists(img.name):
323    logger.info("vendor.img already exists; no need to rebuild...")
324    return img.name
325
326  def output_sink(fn, data):
327    output_file = os.path.join(OPTIONS.input_tmp, "VENDOR", fn)
328    with open(output_file, "wb") as ofile:
329      ofile.write(data)
330
331    if output_zip:
332      arc_name = "VENDOR/" + fn
333      if arc_name in output_zip.namelist():
334        OPTIONS.replace_updated_files_list.append(arc_name)
335      else:
336        common.ZipWrite(output_zip, output_file, arc_name)
337
338  board_uses_vendorimage = OPTIONS.info_dict.get(
339      "board_uses_vendorimage") == "true"
340
341  if (OPTIONS.rebuild_recovery and board_uses_vendorimage and
342          recovery_img is not None and boot_img is not None):
343    logger.info("Building new recovery patch on vendor")
344    common.MakeRecoveryPatch(OPTIONS.input_tmp, output_sink, recovery_img,
345                             boot_img, info_dict=OPTIONS.info_dict)
346
347  block_list = OutputFile(output_zip, OPTIONS.input_tmp,
348                          "IMAGES", "vendor.map")
349  CreateImage(OPTIONS.input_tmp, OPTIONS.info_dict, "vendor", img,
350              block_list=block_list)
351  return img.name
352
353
354def AddProduct(output_zip):
355  """Turn the contents of PRODUCT into a product image and store it in
356  output_zip."""
357
358  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "product.img")
359  if os.path.exists(img.name):
360    logger.info("product.img already exists; no need to rebuild...")
361    return img.name
362
363  block_list = OutputFile(
364      output_zip, OPTIONS.input_tmp, "IMAGES", "product.map")
365  CreateImage(
366      OPTIONS.input_tmp, OPTIONS.info_dict, "product", img,
367      block_list=block_list)
368  return img.name
369
370
371def AddSystemExt(output_zip):
372  """Turn the contents of SYSTEM_EXT into a system_ext image and store it in
373  output_zip."""
374
375  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES",
376                   "system_ext.img")
377  if os.path.exists(img.name):
378    logger.info("system_ext.img already exists; no need to rebuild...")
379    return img.name
380
381  block_list = OutputFile(
382      output_zip, OPTIONS.input_tmp, "IMAGES", "system_ext.map")
383  CreateImage(
384      OPTIONS.input_tmp, OPTIONS.info_dict, "system_ext", img,
385      block_list=block_list)
386  return img.name
387
388
389def AddOdm(output_zip):
390  """Turn the contents of ODM into an odm image and store it in output_zip."""
391
392  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "odm.img")
393  if os.path.exists(img.name):
394    logger.info("odm.img already exists; no need to rebuild...")
395    return img.name
396
397  block_list = OutputFile(
398      output_zip, OPTIONS.input_tmp, "IMAGES", "odm.map")
399  CreateImage(
400      OPTIONS.input_tmp, OPTIONS.info_dict, "odm", img,
401      block_list=block_list)
402  return img.name
403
404
405def AddVendorDlkm(output_zip):
406  """Turn the contents of VENDOR_DLKM into an vendor_dlkm image and store it in output_zip."""
407
408  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "vendor_dlkm.img")
409  if os.path.exists(img.name):
410    logger.info("vendor_dlkm.img already exists; no need to rebuild...")
411    return img.name
412
413  block_list = OutputFile(
414      output_zip, OPTIONS.input_tmp, "IMAGES", "vendor_dlkm.map")
415  CreateImage(
416      OPTIONS.input_tmp, OPTIONS.info_dict, "vendor_dlkm", img,
417      block_list=block_list)
418  return img.name
419
420
421def AddOdmDlkm(output_zip):
422  """Turn the contents of OdmDlkm into an odm_dlkm image and store it in output_zip."""
423
424  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "odm_dlkm.img")
425  if os.path.exists(img.name):
426    logger.info("odm_dlkm.img already exists; no need to rebuild...")
427    return img.name
428
429  block_list = OutputFile(
430      output_zip, OPTIONS.input_tmp, "IMAGES", "odm_dlkm.map")
431  CreateImage(
432      OPTIONS.input_tmp, OPTIONS.info_dict, "odm_dlkm", img,
433      block_list=block_list)
434  return img.name
435
436
437def AddSystemDlkm(output_zip):
438  """Turn the contents of SystemDlkm into an system_dlkm image and store it in output_zip."""
439
440  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "system_dlkm.img")
441  if os.path.exists(img.name):
442    logger.info("system_dlkm.img already exists; no need to rebuild...")
443    return img.name
444
445  block_list = OutputFile(
446      output_zip, OPTIONS.input_tmp, "IMAGES", "system_dlkm.map")
447  CreateImage(
448      OPTIONS.input_tmp, OPTIONS.info_dict, "system_dlkm", img,
449      block_list=block_list)
450  return img.name
451
452
453def AddDtbo(output_zip):
454  """Adds the DTBO image.
455
456  Uses the image under IMAGES/ if it already exists. Otherwise looks for the
457  image under PREBUILT_IMAGES/, signs it as needed, and returns the image name.
458  """
459  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "dtbo.img")
460  if os.path.exists(img.name):
461    logger.info("dtbo.img already exists; no need to rebuild...")
462    return img.name
463
464  dtbo_prebuilt_path = os.path.join(
465      OPTIONS.input_tmp, "PREBUILT_IMAGES", "dtbo.img")
466  assert os.path.exists(dtbo_prebuilt_path)
467  os.makedirs(os.path.dirname(img.name), exist_ok=True)
468  shutil.copy(dtbo_prebuilt_path, img.name)
469
470  # AVB-sign the image as needed.
471  if OPTIONS.info_dict.get("avb_enable") == "true":
472    # Signing requires +w
473    os.chmod(img.name, os.stat(img.name).st_mode | stat.S_IWUSR)
474
475    avbtool = OPTIONS.info_dict["avb_avbtool"]
476    part_size = OPTIONS.info_dict["dtbo_size"]
477    # The AVB hash footer will be replaced if already present.
478    cmd = [avbtool, "add_hash_footer", "--image", img.name,
479           "--partition_size", str(part_size), "--partition_name", "dtbo"]
480    common.AppendAVBSigningArgs(cmd, "dtbo")
481    args = OPTIONS.info_dict.get("avb_dtbo_add_hash_footer_args")
482    if args and args.strip():
483      cmd.extend(shlex.split(args))
484    common.RunAndCheckOutput(cmd)
485
486  img.Write()
487  return img.name
488
489
490def AddPvmfw(output_zip):
491  """Adds the pvmfw image.
492
493  Uses the image under IMAGES/ if it already exists. Otherwise looks for the
494  image under PREBUILT_IMAGES/, signs it as needed, and returns the image name.
495  """
496  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "pvmfw.img")
497  if os.path.exists(img.name):
498    logger.info("pvmfw.img already exists; no need to rebuild...")
499    return img.name
500
501  pvmfw_prebuilt_path = os.path.join(
502      OPTIONS.input_tmp, "PREBUILT_IMAGES", "pvmfw.img")
503  assert os.path.exists(pvmfw_prebuilt_path)
504  shutil.copy(pvmfw_prebuilt_path, img.name)
505
506  # AVB-sign the image as needed.
507  if OPTIONS.info_dict.get("avb_enable") == "true":
508    # Signing requires +w
509    os.chmod(img.name, os.stat(img.name).st_mode | stat.S_IWUSR)
510
511    avbtool = OPTIONS.info_dict["avb_avbtool"]
512    part_size = OPTIONS.info_dict["pvmfw_size"]
513    # The AVB hash footer will be replaced if already present.
514    cmd = [avbtool, "add_hash_footer", "--image", img.name,
515           "--partition_size", str(part_size), "--partition_name", "pvmfw"]
516    common.AppendAVBSigningArgs(cmd, "pvmfw")
517    args = OPTIONS.info_dict.get("avb_pvmfw_add_hash_footer_args")
518    if args and args.strip():
519      cmd.extend(shlex.split(args))
520    common.RunAndCheckOutput(cmd)
521
522  img.Write()
523  return img.name
524
525
526def AddCustomImages(output_zip, partition_name, image_list):
527  """Adds and signs avb custom images as needed in IMAGES/.
528
529  Args:
530    output_zip: The output zip file (needs to be already open), or None to
531        write images to OPTIONS.input_tmp/.
532    partition_name: The custom image partition name.
533    image_list: The image list of the custom image partition.
534
535  Uses the image under IMAGES/ if it already exists. Otherwise looks for the
536  image under PREBUILT_IMAGES/, signs it as needed, and returns the image name.
537
538  Raises:
539    AssertionError: If image can't be found.
540  """
541
542  builder = None
543  key_path = OPTIONS.info_dict.get("avb_{}_key_path".format(partition_name))
544  if key_path is not None:
545    algorithm = OPTIONS.info_dict.get("avb_{}_algorithm".format(partition_name))
546    extra_args = OPTIONS.info_dict.get(
547        "avb_{}_add_hashtree_footer_args".format(partition_name))
548    partition_size = OPTIONS.info_dict.get(
549        "avb_{}_partition_size".format(partition_name))
550
551    builder = verity_utils.CreateCustomImageBuilder(
552        OPTIONS.info_dict, partition_name, partition_size,
553        key_path, algorithm, extra_args)
554
555  for img_name in image_list:
556    custom_image = OutputFile(
557        output_zip, OPTIONS.input_tmp, "IMAGES", img_name)
558    if os.path.exists(custom_image.name):
559      continue
560
561    custom_image_prebuilt_path = os.path.join(
562        OPTIONS.input_tmp, "PREBUILT_IMAGES", img_name)
563    assert os.path.exists(custom_image_prebuilt_path), \
564        "Failed to find %s at %s" % (img_name, custom_image_prebuilt_path)
565
566    shutil.copy(custom_image_prebuilt_path, custom_image.name)
567
568    if builder is not None:
569      builder.Build(custom_image.name)
570
571    custom_image.Write()
572
573  default = os.path.join(OPTIONS.input_tmp, "IMAGES", partition_name + ".img")
574  assert os.path.exists(default), \
575      "There should be one %s.img" % (partition_name)
576  return default
577
578
579def CreateImage(input_dir, info_dict, what, output_file, block_list=None):
580  logger.info("creating %s.img...", what)
581
582  image_props = build_image.ImagePropFromGlobalDict(info_dict, what)
583  image_props["timestamp"] = FIXED_FILE_TIMESTAMP
584
585  if what == "system":
586    fs_config_prefix = ""
587  else:
588    fs_config_prefix = what + "_"
589
590  fs_config = os.path.join(
591      input_dir, "META/" + fs_config_prefix + "filesystem_config.txt")
592  if not os.path.exists(fs_config):
593    fs_config = None
594
595  # Override values loaded from info_dict.
596  if fs_config:
597    image_props["fs_config"] = fs_config
598  if block_list:
599    image_props["block_list"] = block_list.name
600
601  build_image.BuildImage(
602      os.path.join(input_dir, what.upper()), image_props, output_file.name)
603
604  output_file.Write()
605  if block_list:
606    block_list.Write()
607
608  # Set the '_image_size' for given image size.
609  is_verity_partition = "verity_block_device" in image_props
610  verity_supported = (image_props.get("avb_enable") == "true")
611  is_avb_enable = image_props.get("avb_hashtree_enable") == "true"
612  if verity_supported and (is_verity_partition or is_avb_enable):
613    image_size = image_props.get("image_size")
614    if image_size:
615      image_size_key = what + "_image_size"
616      info_dict[image_size_key] = int(image_size)
617
618  use_dynamic_size = (
619      info_dict.get("use_dynamic_partition_size") == "true" and
620      what in shlex.split(info_dict.get("dynamic_partition_list", "").strip()))
621  if use_dynamic_size:
622    info_dict.update(build_image.GlobalDictFromImageProp(image_props, what))
623
624
625def AddUserdata(output_zip):
626  """Create a userdata image and store it in output_zip.
627
628  In most case we just create and store an empty userdata.img;
629  But the invoker can also request to create userdata.img with real
630  data from the target files, by setting "userdata_img_with_data=true"
631  in OPTIONS.info_dict.
632  """
633
634  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "userdata.img")
635  if os.path.exists(img.name):
636    logger.info("userdata.img already exists; no need to rebuild...")
637    return
638
639  # Skip userdata.img if no size.
640  image_props = build_image.ImagePropFromGlobalDict(OPTIONS.info_dict, "data")
641  if not image_props.get("partition_size"):
642    return
643
644  logger.info("creating userdata.img...")
645
646  image_props["timestamp"] = FIXED_FILE_TIMESTAMP
647
648  if OPTIONS.info_dict.get("userdata_img_with_data") == "true":
649    user_dir = os.path.join(OPTIONS.input_tmp, "DATA")
650  else:
651    user_dir = common.MakeTempDir()
652
653  build_image.BuildImage(user_dir, image_props, img.name)
654
655  common.CheckSize(img.name, "userdata.img", OPTIONS.info_dict)
656  # Always use compression for useradata image.
657  # As it's likely huge and consist of lots of 0s.
658  img.Write(zipfile.ZIP_DEFLATED)
659
660
661def AddVBMeta(output_zip, partitions, name, needed_partitions):
662  """Creates a VBMeta image and stores it in output_zip.
663
664  It generates the requested VBMeta image. The requested image could be for
665  top-level or chained VBMeta image, which is determined based on the name.
666
667  Args:
668    output_zip: The output zip file, which needs to be already open.
669    partitions: A dict that's keyed by partition names with image paths as
670        values. Only valid partition names are accepted, as partitions listed
671        in common.AVB_PARTITIONS and custom partitions listed in
672        OPTIONS.info_dict.get("avb_custom_images_partition_list")
673    name: Name of the VBMeta partition, e.g. 'vbmeta', 'vbmeta_system'.
674    needed_partitions: Partitions whose descriptors should be included into the
675        generated VBMeta image.
676
677  Returns:
678    Path to the created image.
679
680  Raises:
681    AssertionError: On invalid input args.
682  """
683  assert needed_partitions, "Needed partitions must be specified"
684
685  img = OutputFile(
686      output_zip, OPTIONS.input_tmp, "IMAGES", "{}.img".format(name))
687  if os.path.exists(img.name):
688    logger.info("%s.img already exists; not rebuilding...", name)
689    return img.name
690
691  common.BuildVBMeta(img.name, partitions, name, needed_partitions,
692                     OPTIONS.avb_resolve_rollback_index_location_conflict)
693  img.Write()
694  return img.name
695
696
697def AddCache(output_zip):
698  """Create an empty cache image and store it in output_zip."""
699
700  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "cache.img")
701  if os.path.exists(img.name):
702    logger.info("cache.img already exists; no need to rebuild...")
703    return
704
705  image_props = build_image.ImagePropFromGlobalDict(OPTIONS.info_dict, "cache")
706  # The build system has to explicitly request for cache.img.
707  if "fs_type" not in image_props:
708    return
709
710  logger.info("creating cache.img...")
711
712  image_props["timestamp"] = FIXED_FILE_TIMESTAMP
713
714  user_dir = common.MakeTempDir()
715  build_image.BuildImage(user_dir, image_props, img.name)
716
717  common.CheckSize(img.name, "cache.img", OPTIONS.info_dict)
718  img.Write()
719
720
721def CheckAbOtaImages(output_zip, ab_partitions):
722  """Checks that all the listed A/B partitions have their images available.
723
724  The images need to be available under IMAGES/ or RADIO/, with the former takes
725  a priority.
726
727  Args:
728    output_zip: The output zip file (needs to be already open), or None to
729        find images in OPTIONS.input_tmp/.
730    ab_partitions: The list of A/B partitions.
731
732  Raises:
733    AssertionError: If it can't find an image.
734  """
735  for partition in ab_partitions:
736    img_name = partition + ".img"
737
738    # Assert that the image is present under IMAGES/ now.
739    if output_zip:
740      # Zip spec says: All slashes MUST be forward slashes.
741      images_path = "IMAGES/" + img_name
742      radio_path = "RADIO/" + img_name
743      available = (images_path in output_zip.namelist() or
744                   radio_path in output_zip.namelist())
745    else:
746      images_path = os.path.join(OPTIONS.input_tmp, "IMAGES", img_name)
747      radio_path = os.path.join(OPTIONS.input_tmp, "RADIO", img_name)
748      available = os.path.exists(images_path) or os.path.exists(radio_path)
749
750    assert available, "Failed to find " + img_name
751
752
753def AddPackRadioImages(output_zip, images):
754  """Copies images listed in META/pack_radioimages.txt from RADIO/ to IMAGES/.
755
756  Args:
757    output_zip: The output zip file (needs to be already open), or None to
758        write images to OPTIONS.input_tmp/.
759    images: A list of image names.
760
761  Raises:
762    AssertionError: If a listed image can't be found.
763  """
764  for image in images:
765    img_name = image.strip()
766    _, ext = os.path.splitext(img_name)
767    if not ext:
768      img_name += ".img"
769
770    prebuilt_path = os.path.join(OPTIONS.input_tmp, "IMAGES", img_name)
771    if os.path.exists(prebuilt_path):
772      logger.info("%s already exists, no need to overwrite...", img_name)
773      continue
774
775    img_radio_path = os.path.join(OPTIONS.input_tmp, "RADIO", img_name)
776    assert os.path.exists(img_radio_path), \
777        "Failed to find %s at %s" % (img_name, img_radio_path)
778
779    if output_zip:
780      common.ZipWrite(output_zip, img_radio_path, "IMAGES/" + img_name)
781    else:
782      shutil.copy(img_radio_path, prebuilt_path)
783
784
785def AddSuperEmpty(output_zip):
786  """Create a super_empty.img and store it in output_zip."""
787
788  img = OutputFile(output_zip, OPTIONS.input_tmp, "IMAGES", "super_empty.img")
789  if os.path.exists(img.name):
790    logger.info("super_empty.img already exists; no need to rebuild...")
791    return
792  build_super_image.BuildSuperImage(OPTIONS.info_dict, img.name)
793  img.Write()
794
795
796def AddSuperSplit(output_zip):
797  """Create split super_*.img and store it in output_zip."""
798
799  outdir = os.path.join(OPTIONS.input_tmp, "OTA")
800  built = build_super_image.BuildSuperImage(OPTIONS.input_tmp, outdir)
801
802  if built:
803    for dev in OPTIONS.info_dict['super_block_devices'].strip().split():
804      img = OutputFile(output_zip, OPTIONS.input_tmp, "OTA",
805                       "super_" + dev + ".img")
806      img.Write()
807
808
809def ReplaceUpdatedFiles(zip_filename, files_list):
810  """Updates all the ZIP entries listed in files_list.
811
812  For now the list includes META/care_map.pb, and the related files under
813  SYSTEM/ after rebuilding recovery.
814  """
815  common.ZipDelete(zip_filename, files_list)
816  output_zip = zipfile.ZipFile(zip_filename, "a",
817                               compression=zipfile.ZIP_DEFLATED,
818                               allowZip64=True)
819  for item in files_list:
820    file_path = os.path.join(OPTIONS.input_tmp, item)
821    assert os.path.exists(file_path)
822    common.ZipWrite(output_zip, file_path, arcname=item)
823  common.ZipClose(output_zip)
824
825
826def HasPartition(partition_name):
827  """Determines if the target files archive should build a given partition."""
828
829  return ((os.path.isdir(
830      os.path.join(OPTIONS.input_tmp, partition_name.upper())) and
831      OPTIONS.info_dict.get(
832      "building_{}_image".format(partition_name)) == "true") or
833      os.path.exists(
834      os.path.join(OPTIONS.input_tmp, "IMAGES",
835                   "{}.img".format(partition_name))))
836
837
838def AddApexInfo(output_zip):
839  apex_infos = GetApexInfoFromTargetFiles(OPTIONS.input_tmp)
840  apex_metadata_proto = ota_metadata_pb2.ApexMetadata()
841  apex_metadata_proto.apex_info.extend(apex_infos)
842  apex_info_bytes = apex_metadata_proto.SerializeToString()
843
844  output_file = os.path.join(OPTIONS.input_tmp, "META", "apex_info.pb")
845  with open(output_file, "wb") as ofile:
846    ofile.write(apex_info_bytes)
847  if output_zip:
848    arc_name = "META/apex_info.pb"
849    if arc_name in output_zip.namelist():
850      OPTIONS.replace_updated_files_list.append(arc_name)
851    else:
852      common.ZipWrite(output_zip, output_file, arc_name)
853
854
855def AddVbmetaDigest(output_zip):
856  """Write the vbmeta digest to the output dir and zipfile."""
857
858  # Calculate the vbmeta digest and put the result in to META/
859  boot_images = OPTIONS.info_dict.get("boot_images")
860  # Disable the digest calculation if the target_file is used as a container
861  # for boot images. A boot container might contain boot-5.4.img, boot-5.10.img
862  # etc., instead of just a boot.img and will fail in vbmeta digest calculation.
863  boot_container = boot_images and (
864      len(boot_images.split()) >= 2 or boot_images.split()[0] != 'boot.img')
865  if (OPTIONS.info_dict.get("avb_enable") == "true" and not boot_container and
866          OPTIONS.info_dict.get("avb_building_vbmeta_image") == "true"):
867    avbtool = OPTIONS.info_dict["avb_avbtool"]
868    digest = verity_utils.CalculateVbmetaDigest(OPTIONS.input_tmp, avbtool)
869    vbmeta_digest_txt = os.path.join(OPTIONS.input_tmp, "META",
870                                     "vbmeta_digest.txt")
871    with open(vbmeta_digest_txt, 'w') as f:
872      f.write(digest)
873    # writes to the output zipfile
874    if output_zip:
875      arc_name = "META/vbmeta_digest.txt"
876      if arc_name in output_zip.namelist():
877        OPTIONS.replace_updated_files_list.append(arc_name)
878      else:
879        common.ZipWriteStr(output_zip, arc_name, digest)
880
881
882def AddImagesToTargetFiles(filename):
883  """Creates and adds images (boot/recovery/system/...) to a target_files.zip.
884
885  It works with either a zip file (zip mode), or a directory that contains the
886  files to be packed into a target_files.zip (dir mode). The latter is used when
887  being called from build/make/core/Makefile.
888
889  The images will be created under IMAGES/ in the input target_files.zip.
890
891  Args:
892    filename: the target_files.zip, or the zip root directory.
893  """
894  if os.path.isdir(filename):
895    OPTIONS.input_tmp = os.path.abspath(filename)
896  else:
897    OPTIONS.input_tmp = common.UnzipTemp(filename)
898
899  if not OPTIONS.add_missing:
900    if os.path.isdir(os.path.join(OPTIONS.input_tmp, "IMAGES")):
901      logger.warning("target_files appears to already contain images.")
902      sys.exit(1)
903
904  OPTIONS.info_dict = common.LoadInfoDict(OPTIONS.input_tmp, repacking=True)
905
906  has_recovery = OPTIONS.info_dict.get("no_recovery") != "true"
907  has_boot = OPTIONS.info_dict.get("no_boot") != "true"
908  has_init_boot = OPTIONS.info_dict.get("init_boot") == "true"
909  has_vendor_boot = OPTIONS.info_dict.get("vendor_boot") == "true"
910  has_vendor_kernel_boot = OPTIONS.info_dict.get(
911      "vendor_kernel_boot") == "true"
912
913  # {vendor,odm,product,system_ext,vendor_dlkm,odm_dlkm, system_dlkm, system, system_other}.img
914  # can be built from source, or  dropped into target_files.zip as a prebuilt blob.
915  has_vendor = HasPartition("vendor")
916  has_odm = HasPartition("odm")
917  has_vendor_dlkm = HasPartition("vendor_dlkm")
918  has_odm_dlkm = HasPartition("odm_dlkm")
919  has_system_dlkm = HasPartition("system_dlkm")
920  has_product = HasPartition("product")
921  has_system_ext = HasPartition("system_ext")
922  has_system = HasPartition("system")
923  has_system_other = HasPartition("system_other")
924  has_userdata = OPTIONS.info_dict.get("building_userdata_image") == "true"
925  has_cache = OPTIONS.info_dict.get("building_cache_image") == "true"
926
927  # Set up the output destination. It writes to the given directory for dir
928  # mode; otherwise appends to the given ZIP.
929  if os.path.isdir(filename):
930    output_zip = None
931  else:
932    output_zip = zipfile.ZipFile(filename, "a",
933                                 compression=zipfile.ZIP_DEFLATED,
934                                 allowZip64=True)
935
936  # Always make input_tmp/IMAGES available, since we may stage boot / recovery
937  # images there even under zip mode. The directory will be cleaned up as part
938  # of OPTIONS.input_tmp.
939  images_dir = os.path.join(OPTIONS.input_tmp, "IMAGES")
940  if not os.path.isdir(images_dir):
941    os.makedirs(images_dir)
942
943  # A map between partition names and their paths, which could be used when
944  # generating AVB vbmeta image.
945  partitions = {}
946
947  def banner(s):
948    logger.info("\n\n++++ %s  ++++\n\n", s)
949
950  boot_image = None
951  if has_boot:
952    banner("boot")
953    boot_images = OPTIONS.info_dict.get("boot_images")
954    if boot_images is None:
955      boot_images = "boot.img"
956    for index, b in enumerate(boot_images.split()):
957      # common.GetBootableImage() returns the image directly if present.
958      boot_image = common.GetBootableImage(
959          "IMAGES/" + b, b, OPTIONS.input_tmp, "BOOT")
960      # boot.img may be unavailable in some targets (e.g. aosp_arm64).
961      if boot_image:
962        boot_image_path = os.path.join(OPTIONS.input_tmp, "IMAGES", b)
963        # Although multiple boot images can be generated, include the image
964        # descriptor of only the first boot image in vbmeta
965        if index == 0:
966          partitions['boot'] = boot_image_path
967        if not os.path.exists(boot_image_path):
968          boot_image.WriteToDir(OPTIONS.input_tmp)
969          if output_zip:
970            boot_image.AddToZip(output_zip)
971
972  if has_init_boot:
973    banner("init_boot")
974    init_boot_image = common.GetBootableImage(
975        "IMAGES/init_boot.img", "init_boot.img", OPTIONS.input_tmp, "INIT_BOOT",
976        dev_nodes=True)
977    if init_boot_image:
978      partitions['init_boot'] = os.path.join(
979          OPTIONS.input_tmp, "IMAGES", "init_boot.img")
980      if not os.path.exists(partitions['init_boot']):
981        init_boot_image.WriteToDir(OPTIONS.input_tmp)
982        if output_zip:
983          init_boot_image.AddToZip(output_zip)
984
985  if has_vendor_boot:
986    banner("vendor_boot")
987    vendor_boot_image = common.GetVendorBootImage(
988        "IMAGES/vendor_boot.img", "vendor_boot.img", OPTIONS.input_tmp,
989        "VENDOR_BOOT")
990    if vendor_boot_image:
991      partitions['vendor_boot'] = os.path.join(OPTIONS.input_tmp, "IMAGES",
992                                               "vendor_boot.img")
993      if not os.path.exists(partitions['vendor_boot']):
994        vendor_boot_image.WriteToDir(OPTIONS.input_tmp)
995        if output_zip:
996          vendor_boot_image.AddToZip(output_zip)
997
998  if has_vendor_kernel_boot:
999    banner("vendor_kernel_boot")
1000    vendor_kernel_boot_image = common.GetVendorKernelBootImage(
1001        "IMAGES/vendor_kernel_boot.img", "vendor_kernel_boot.img", OPTIONS.input_tmp,
1002        "VENDOR_KERNEL_BOOT")
1003    if vendor_kernel_boot_image:
1004      partitions['vendor_kernel_boot'] = os.path.join(OPTIONS.input_tmp, "IMAGES",
1005                                                      "vendor_kernel_boot.img")
1006      if not os.path.exists(partitions['vendor_kernel_boot']):
1007        vendor_kernel_boot_image.WriteToDir(OPTIONS.input_tmp)
1008        if output_zip:
1009          vendor_kernel_boot_image.AddToZip(output_zip)
1010
1011  recovery_image = None
1012  if has_recovery:
1013    banner("recovery")
1014    recovery_image = common.GetBootableImage(
1015        "IMAGES/recovery.img", "recovery.img", OPTIONS.input_tmp, "RECOVERY")
1016    assert recovery_image, "Failed to create recovery.img."
1017    partitions['recovery'] = os.path.join(
1018        OPTIONS.input_tmp, "IMAGES", "recovery.img")
1019    if not os.path.exists(partitions['recovery']):
1020      recovery_image.WriteToDir(OPTIONS.input_tmp)
1021      if output_zip:
1022        recovery_image.AddToZip(output_zip)
1023
1024      banner("recovery (two-step image)")
1025      # The special recovery.img for two-step package use.
1026      recovery_two_step_image = common.GetBootableImage(
1027          "OTA/recovery-two-step.img", "recovery-two-step.img",
1028          OPTIONS.input_tmp, "RECOVERY", two_step_image=True)
1029      assert recovery_two_step_image, "Failed to create recovery-two-step.img."
1030      recovery_two_step_image_path = os.path.join(
1031          OPTIONS.input_tmp, "OTA", "recovery-two-step.img")
1032      if not os.path.exists(recovery_two_step_image_path):
1033        recovery_two_step_image.WriteToDir(OPTIONS.input_tmp)
1034        if output_zip:
1035          recovery_two_step_image.AddToZip(output_zip)
1036
1037  def add_partition(partition, has_partition, add_func, add_args):
1038    if has_partition:
1039      banner(partition)
1040      partitions[partition] = add_func(output_zip, *add_args)
1041
1042  add_partition_calls = (
1043      ("system", has_system, AddSystem, [recovery_image, boot_image]),
1044      ("vendor", has_vendor, AddVendor, [recovery_image, boot_image]),
1045      ("product", has_product, AddProduct, []),
1046      ("system_ext", has_system_ext, AddSystemExt, []),
1047      ("odm", has_odm, AddOdm, []),
1048      ("vendor_dlkm", has_vendor_dlkm, AddVendorDlkm, []),
1049      ("odm_dlkm", has_odm_dlkm, AddOdmDlkm, []),
1050      ("system_dlkm", has_system_dlkm, AddSystemDlkm, []),
1051      ("system_other", has_system_other, AddSystemOther, []),
1052  )
1053  # If output_zip exists, each add_partition_calls writes bytes to the same output_zip,
1054  # which is not thread-safe. So, run them in serial if output_zip exists.
1055  if output_zip:
1056    for call in add_partition_calls:
1057      add_partition(*call)
1058  else:
1059    with ThreadPoolExecutor(max_workers=len(add_partition_calls)) as executor:
1060      for future in [executor.submit(add_partition, *call) for call in add_partition_calls]:
1061        future.result()
1062
1063  AddApexInfo(output_zip)
1064
1065  if not OPTIONS.is_signing:
1066    banner("userdata")
1067    AddUserdata(output_zip)
1068    banner("cache")
1069    AddCache(output_zip)
1070
1071  add_partition("dtbo",
1072                OPTIONS.info_dict.get("has_dtbo") == "true", AddDtbo, [])
1073  add_partition("pvmfw",
1074                OPTIONS.info_dict.get("has_pvmfw") == "true", AddPvmfw, [])
1075
1076  # Custom images.
1077  custom_partitions = OPTIONS.info_dict.get(
1078      "custom_images_partition_list", "").strip().split()
1079  for partition_name in custom_partitions:
1080    partition_name = partition_name.strip()
1081    banner("custom images for " + partition_name)
1082    image_list = OPTIONS.info_dict.get(
1083          "{}_image_list".format(partition_name)).split()
1084    partitions[partition_name] = AddCustomImages(output_zip, partition_name, image_list)
1085
1086  avb_custom_partitions = OPTIONS.info_dict.get(
1087      "avb_custom_images_partition_list", "").strip().split()
1088  for partition_name in avb_custom_partitions:
1089    partition_name = partition_name.strip()
1090    banner("avb custom images for " + partition_name)
1091    image_list = OPTIONS.info_dict.get(
1092          "avb_{}_image_list".format(partition_name)).split()
1093    partitions[partition_name] = AddCustomImages(output_zip, partition_name, image_list)
1094
1095  if OPTIONS.info_dict.get("avb_enable") == "true":
1096    # vbmeta_partitions includes the partitions that should be included into
1097    # top-level vbmeta.img, which are the ones that are not included in any
1098    # chained VBMeta image plus the chained VBMeta images themselves.
1099    # Currently avb_custom_partitions are all chained to VBMeta image.
1100    vbmeta_partitions = common.AVB_PARTITIONS[:] + tuple(avb_custom_partitions)
1101
1102    vbmeta_system = OPTIONS.info_dict.get("avb_vbmeta_system", "").strip()
1103    if vbmeta_system and set(vbmeta_system.split()).intersection(partitions):
1104      banner("vbmeta_system")
1105      partitions["vbmeta_system"] = AddVBMeta(
1106          output_zip, partitions, "vbmeta_system", vbmeta_system.split())
1107      vbmeta_partitions = [
1108          item for item in vbmeta_partitions
1109          if item not in vbmeta_system.split()]
1110      vbmeta_partitions.append("vbmeta_system")
1111
1112    vbmeta_vendor = OPTIONS.info_dict.get("avb_vbmeta_vendor", "").strip()
1113    if vbmeta_vendor and set(vbmeta_vendor.split()).intersection(partitions):
1114      banner("vbmeta_vendor")
1115      partitions["vbmeta_vendor"] = AddVBMeta(
1116          output_zip, partitions, "vbmeta_vendor", vbmeta_vendor.split())
1117      vbmeta_partitions = [
1118          item for item in vbmeta_partitions
1119          if item not in vbmeta_vendor.split()]
1120      vbmeta_partitions.append("vbmeta_vendor")
1121    custom_avb_partitions = OPTIONS.info_dict.get(
1122        "avb_custom_vbmeta_images_partition_list", "").strip().split()
1123    if custom_avb_partitions:
1124      for avb_part in custom_avb_partitions:
1125        partition_name = "vbmeta_" + avb_part
1126        included_partitions = OPTIONS.info_dict.get(
1127            "avb_vbmeta_{}".format(avb_part), "").strip().split()
1128        assert included_partitions, "Custom vbmeta partition {0} missing avb_vbmeta_{0} prop".format(
1129            avb_part)
1130        banner(partition_name)
1131        logger.info("VBMeta partition {} needs {}".format(
1132            partition_name, included_partitions))
1133        partitions[partition_name] = AddVBMeta(
1134            output_zip, partitions, partition_name, included_partitions)
1135        vbmeta_partitions = [
1136            item for item in vbmeta_partitions
1137            if item not in included_partitions]
1138        vbmeta_partitions.append(partition_name)
1139
1140    if OPTIONS.info_dict.get("avb_building_vbmeta_image") == "true" and set(vbmeta_partitions).intersection(partitions):
1141      banner("vbmeta")
1142      AddVBMeta(output_zip, partitions, "vbmeta", vbmeta_partitions)
1143
1144  if OPTIONS.info_dict.get("use_dynamic_partitions") == "true":
1145    if OPTIONS.info_dict.get("build_super_empty_partition") == "true":
1146      banner("super_empty")
1147      AddSuperEmpty(output_zip)
1148
1149  if OPTIONS.info_dict.get("build_super_partition") == "true":
1150    if OPTIONS.info_dict.get(
1151            "build_retrofit_dynamic_partitions_ota_package") == "true":
1152      banner("super split images")
1153      AddSuperSplit(output_zip)
1154
1155  banner("radio")
1156  ab_partitions_txt = os.path.join(OPTIONS.input_tmp, "META",
1157                                   "ab_partitions.txt")
1158  if os.path.exists(ab_partitions_txt):
1159    with open(ab_partitions_txt) as f:
1160      ab_partitions = f.read().splitlines()
1161
1162    # For devices using A/B update, make sure we have all the needed images
1163    # ready under IMAGES/ or RADIO/.
1164    CheckAbOtaImages(output_zip, ab_partitions)
1165
1166    # Generate care_map.pb for ab_partitions, then write this file to
1167    # target_files package.
1168    output_care_map = os.path.join(OPTIONS.input_tmp, "META", "care_map.pb")
1169    AddCareMapForAbOta(output_zip if output_zip else output_care_map,
1170                       ab_partitions, partitions)
1171
1172  # Radio images that need to be packed into IMAGES/, and product-img.zip.
1173  pack_radioimages_txt = os.path.join(
1174      OPTIONS.input_tmp, "META", "pack_radioimages.txt")
1175  if os.path.exists(pack_radioimages_txt):
1176    with open(pack_radioimages_txt) as f:
1177      AddPackRadioImages(output_zip, f.readlines())
1178
1179  AddVbmetaDigest(output_zip)
1180
1181  if output_zip:
1182    common.ZipClose(output_zip)
1183    if OPTIONS.replace_updated_files_list:
1184      ReplaceUpdatedFiles(output_zip.filename,
1185                          OPTIONS.replace_updated_files_list)
1186
1187
1188def OptimizeCompressedEntries(zipfile_path):
1189  """Convert files that do not compress well to uncompressed storage
1190
1191  EROFS images tend to be compressed already, so compressing them again
1192  yields little space savings. Leaving them uncompressed will make
1193  downstream tooling's job easier, and save compute time.
1194  """
1195  if not zipfile.is_zipfile(zipfile_path):
1196    return
1197  entries_to_store = []
1198  with tempfile.TemporaryDirectory() as tmpdir:
1199    with zipfile.ZipFile(zipfile_path, "r", allowZip64=True) as zfp:
1200      for zinfo in zfp.filelist:
1201        if not zinfo.filename.startswith("IMAGES/") and not zinfo.filename.startswith("META"):
1202          continue
1203        # Don't try to store userdata.img uncompressed, it's usually huge.
1204        if zinfo.filename.endswith("userdata.img"):
1205          continue
1206        if zinfo.compress_size > zinfo.file_size * 0.80 and zinfo.compress_type != zipfile.ZIP_STORED:
1207          entries_to_store.append(zinfo)
1208          zfp.extract(zinfo, tmpdir)
1209    if len(entries_to_store) == 0:
1210      return
1211    # Remove these entries, then re-add them as ZIP_STORED
1212    ZipDelete(zipfile_path, [entry.filename for entry in entries_to_store])
1213    with zipfile.ZipFile(zipfile_path, "a", allowZip64=True) as zfp:
1214      for entry in entries_to_store:
1215        zfp.write(os.path.join(tmpdir, entry.filename),
1216                  entry.filename, compress_type=zipfile.ZIP_STORED)
1217
1218
1219def main(argv):
1220  def option_handler(o, a):
1221    if o in ("-a", "--add_missing"):
1222      OPTIONS.add_missing = True
1223    elif o in ("-r", "--rebuild_recovery",):
1224      OPTIONS.rebuild_recovery = True
1225    elif o == "--replace_verity_private_key":
1226      raise ValueError("--replace_verity_private_key is no longer supported,"
1227                       " please switch to AVB")
1228    elif o == "--replace_verity_public_key":
1229      raise ValueError("--replace_verity_public_key is no longer supported,"
1230                       " please switch to AVB")
1231    elif o == "--is_signing":
1232      OPTIONS.is_signing = True
1233    elif o == "--avb_resolve_rollback_index_location_conflict":
1234      OPTIONS.avb_resolve_rollback_index_location_conflict = True
1235    else:
1236      return False
1237    return True
1238
1239  args = common.ParseOptions(
1240      argv, __doc__, extra_opts="ar",
1241      extra_long_opts=["add_missing", "rebuild_recovery",
1242                       "replace_verity_public_key=",
1243                       "replace_verity_private_key=",
1244                       "is_signing",
1245                       "avb_resolve_rollback_index_location_conflict"],
1246      extra_option_handler=option_handler)
1247
1248  if len(args) != 1:
1249    common.Usage(__doc__)
1250    sys.exit(1)
1251
1252  common.InitLogging()
1253
1254  AddImagesToTargetFiles(args[0])
1255  OptimizeCompressedEntries(args[0])
1256  logger.info("done.")
1257
1258
1259if __name__ == '__main__':
1260  try:
1261    common.CloseInheritedPipes()
1262    main(sys.argv[1:])
1263  finally:
1264    common.Cleanup()
1265