xref: /aosp_15_r20/build/make/tools/releasetools/test_common.py (revision 9e94795a3d4ef5c1d47486f9a02bb378756cea8a)
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import subprocess
20import tempfile
21import unittest
22import zipfile
23from hashlib import sha1
24from typing import BinaryIO
25
26import common
27import test_utils
28import validate_target_files
29from images import EmptyImage, DataImage
30from rangelib import RangeSet
31
32
33KiB = 1024
34MiB = 1024 * KiB
35GiB = 1024 * MiB
36
37
38def get_2gb_file():
39  size = int(2 * GiB + 1)
40  block_size = 4 * KiB
41  step_size = 4 * MiB
42  tmpfile = tempfile.NamedTemporaryFile()
43  tmpfile.truncate(size)
44  for _ in range(0, size, step_size):
45    tmpfile.write(os.urandom(block_size))
46    tmpfile.seek(step_size - block_size, os.SEEK_CUR)
47  return tmpfile
48
49
50def hash_file(filename):
51  sha1_hash = sha1()
52  with open(filename, "rb") as fp:
53    for data in iter(lambda: fp.read(4*MiB), b''):
54      sha1_hash.update(data)
55  return sha1_hash
56
57
58class BuildInfoTest(test_utils.ReleaseToolsTestCase):
59
60  TEST_INFO_FINGERPRINT_DICT = {
61      'build.prop': common.PartitionBuildProps.FromDictionary(
62          'system', {
63              'ro.product.brand': 'product-brand',
64              'ro.product.name': 'product-name',
65              'ro.product.device': 'product-device',
66              'ro.build.version.release': 'version-release',
67              'ro.build.id': 'build-id',
68              'ro.build.version.incremental': 'version-incremental',
69              'ro.build.type': 'build-type',
70              'ro.build.tags': 'build-tags',
71              'ro.build.version.sdk': 30,
72          }
73      ),
74  }
75
76  TEST_INFO_DICT = {
77      'build.prop': common.PartitionBuildProps.FromDictionary(
78          'system', {
79              'ro.product.device': 'product-device',
80              'ro.product.name': 'product-name',
81              'ro.build.fingerprint': 'build-fingerprint',
82              'ro.build.foo': 'build-foo'}
83      ),
84      'system.build.prop': common.PartitionBuildProps.FromDictionary(
85          'system', {
86              'ro.product.system.brand': 'product-brand',
87              'ro.product.system.name': 'product-name',
88              'ro.product.system.device': 'product-device',
89              'ro.system.build.version.release': 'version-release',
90              'ro.system.build.id': 'build-id',
91              'ro.system.build.version.incremental': 'version-incremental',
92              'ro.system.build.type': 'build-type',
93              'ro.system.build.tags': 'build-tags',
94              'ro.system.build.foo': 'build-foo'}
95      ),
96      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
97          'vendor', {
98              'ro.product.vendor.brand': 'vendor-product-brand',
99              'ro.product.vendor.name': 'vendor-product-name',
100              'ro.product.vendor.device': 'vendor-product-device',
101              'ro.vendor.build.version.release': 'vendor-version-release',
102              'ro.vendor.build.id': 'vendor-build-id',
103              'ro.vendor.build.version.incremental':
104              'vendor-version-incremental',
105              'ro.vendor.build.type': 'vendor-build-type',
106              'ro.vendor.build.tags': 'vendor-build-tags'}
107      ),
108      'property1': 'value1',
109      'property2': 4096,
110  }
111
112  TEST_INFO_DICT_USES_OEM_PROPS = {
113      'build.prop': common.PartitionBuildProps.FromDictionary(
114          'system', {
115              'ro.product.name': 'product-name',
116              'ro.build.thumbprint': 'build-thumbprint',
117              'ro.build.bar': 'build-bar'}
118      ),
119      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
120          'vendor', {
121              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
122      ),
123      'property1': 'value1',
124      'property2': 4096,
125      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
126  }
127
128  TEST_OEM_DICTS = [
129      {
130          'ro.product.brand': 'brand1',
131          'ro.product.device': 'device1',
132      },
133      {
134          'ro.product.brand': 'brand2',
135          'ro.product.device': 'device2',
136      },
137      {
138          'ro.product.brand': 'brand3',
139          'ro.product.device': 'device3',
140      },
141  ]
142
143  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = {
144      'build.prop': common.PartitionBuildProps.FromDictionary(
145          'system', {
146              'ro.build.fingerprint': 'build-fingerprint',
147              'ro.product.property_source_order':
148                  'product,odm,vendor,system_ext,system'}
149      ),
150      'system.build.prop': common.PartitionBuildProps.FromDictionary(
151          'system', {
152              'ro.product.system.device': 'system-product-device'}
153      ),
154      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
155          'vendor', {
156              'ro.product.vendor.device': 'vendor-product-device'}
157      ),
158  }
159
160  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = {
161      'build.prop': common.PartitionBuildProps.FromDictionary(
162          'system', {
163              'ro.build.fingerprint': 'build-fingerprint',
164              'ro.product.property_source_order':
165                  'product,product_services,odm,vendor,system',
166              'ro.build.version.release': '10',
167              'ro.build.version.codename': 'REL'}
168      ),
169      'system.build.prop': common.PartitionBuildProps.FromDictionary(
170          'system', {
171              'ro.product.system.device': 'system-product-device'}
172      ),
173      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
174          'vendor', {
175              'ro.product.vendor.device': 'vendor-product-device'}
176      ),
177  }
178
179  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = {
180      'build.prop': common.PartitionBuildProps.FromDictionary(
181          'system', {
182              'ro.product.device': 'product-device',
183              'ro.build.fingerprint': 'build-fingerprint',
184              'ro.build.version.release': '9',
185              'ro.build.version.codename': 'REL'}
186      ),
187      'system.build.prop': common.PartitionBuildProps.FromDictionary(
188          'system', {
189              'ro.product.system.device': 'system-product-device'}
190      ),
191      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
192          'vendor', {
193              'ro.product.vendor.device': 'vendor-product-device'}
194      ),
195  }
196
197  def test_init(self):
198    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
199    self.assertEqual('product-device', target_info.device)
200    self.assertEqual('build-fingerprint', target_info.fingerprint)
201    self.assertFalse(target_info.is_ab)
202    self.assertIsNone(target_info.oem_props)
203
204  def test_init_with_oem_props(self):
205    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
206                                   self.TEST_OEM_DICTS)
207    self.assertEqual('device1', target_info.device)
208    self.assertEqual('brand1/product-name/device1:build-thumbprint',
209                     target_info.fingerprint)
210
211    # Swap the order in oem_dicts, which would lead to different BuildInfo.
212    oem_dicts = copy.copy(self.TEST_OEM_DICTS)
213    oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
214    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
215                                   oem_dicts)
216    self.assertEqual('device3', target_info.device)
217    self.assertEqual('brand3/product-name/device3:build-thumbprint',
218                     target_info.fingerprint)
219
220  def test_init_badFingerprint(self):
221    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
222    info_dict['build.prop'].build_props[
223        'ro.build.fingerprint'] = 'bad fingerprint'
224    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
225
226    info_dict['build.prop'].build_props[
227        'ro.build.fingerprint'] = 'bad\x80fingerprint'
228    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
229
230  def test_init_goodFingerprint(self):
231    info_dict = copy.deepcopy(self.TEST_INFO_FINGERPRINT_DICT)
232    build_info = common.BuildInfo(info_dict)
233    self.assertEqual(
234        'product-brand/product-name/product-device:version-release/build-id/'
235        'version-incremental:build-type/build-tags', build_info.fingerprint)
236
237    build_props = info_dict['build.prop'].build_props
238    del build_props['ro.build.id']
239    build_props['ro.build.legacy.id'] = 'legacy-build-id'
240    build_info = common.BuildInfo(info_dict, use_legacy_id=True)
241    self.assertEqual(
242        'product-brand/product-name/product-device:version-release/'
243        'legacy-build-id/version-incremental:build-type/build-tags',
244        build_info.fingerprint)
245
246    self.assertRaises(common.ExternalError, common.BuildInfo, info_dict, None,
247                      False)
248
249    info_dict['avb_enable'] = 'true'
250    info_dict['vbmeta_digest'] = 'abcde12345'
251    build_info = common.BuildInfo(info_dict, use_legacy_id=False)
252    self.assertEqual(
253        'product-brand/product-name/product-device:version-release/'
254        'legacy-build-id.abcde123/version-incremental:build-type/build-tags',
255        build_info.fingerprint)
256
257  def test___getitem__(self):
258    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
259    self.assertEqual('value1', target_info['property1'])
260    self.assertEqual(4096, target_info['property2'])
261    self.assertEqual('build-foo',
262                     target_info['build.prop'].GetProp('ro.build.foo'))
263
264  def test___getitem__with_oem_props(self):
265    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
266                                   self.TEST_OEM_DICTS)
267    self.assertEqual('value1', target_info['property1'])
268    self.assertEqual(4096, target_info['property2'])
269    self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo'))
270
271  def test___setitem__(self):
272    target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None)
273    self.assertEqual('value1', target_info['property1'])
274    target_info['property1'] = 'value2'
275    self.assertEqual('value2', target_info['property1'])
276
277    self.assertEqual('build-foo',
278                     target_info['build.prop'].GetProp('ro.build.foo'))
279    target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar'
280    self.assertEqual('build-bar',
281                     target_info['build.prop'].GetProp('ro.build.foo'))
282
283  def test_get(self):
284    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
285    self.assertEqual('value1', target_info.get('property1'))
286    self.assertEqual(4096, target_info.get('property2'))
287    self.assertEqual(4096, target_info.get('property2', 1024))
288    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
289    self.assertEqual('build-foo',
290                     target_info.get('build.prop').GetProp('ro.build.foo'))
291
292  def test_get_with_oem_props(self):
293    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
294                                   self.TEST_OEM_DICTS)
295    self.assertEqual('value1', target_info.get('property1'))
296    self.assertEqual(4096, target_info.get('property2'))
297    self.assertEqual(4096, target_info.get('property2', 1024))
298    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
299    self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo'))
300
301  def test_items(self):
302    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
303    items = target_info.items()
304    self.assertIn(('property1', 'value1'), items)
305    self.assertIn(('property2', 4096), items)
306
307  def test_GetBuildProp(self):
308    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
309    self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
310    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
311                      'ro.build.nonexistent')
312
313  def test_GetBuildProp_with_oem_props(self):
314    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
315                                   self.TEST_OEM_DICTS)
316    self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
317    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
318                      'ro.build.nonexistent')
319
320  def test_GetPartitionFingerprint(self):
321    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
322    self.assertEqual(
323        target_info.GetPartitionFingerprint('vendor'),
324        'vendor-product-brand/vendor-product-name/vendor-product-device'
325        ':vendor-version-release/vendor-build-id/vendor-version-incremental'
326        ':vendor-build-type/vendor-build-tags')
327
328  def test_GetPartitionFingerprint_system_other_uses_system(self):
329    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
330    self.assertEqual(
331        target_info.GetPartitionFingerprint('system_other'),
332        target_info.GetPartitionFingerprint('system'))
333
334  def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self):
335    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
336    info_dict['vendor.build.prop'].build_props[
337        'ro.vendor.build.fingerprint'] = 'vendor:fingerprint'
338    target_info = common.BuildInfo(info_dict, None)
339    self.assertEqual(
340        target_info.GetPartitionFingerprint('vendor'),
341        'vendor:fingerprint')
342
343  def test_WriteMountOemScript(self):
344    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
345                                   self.TEST_OEM_DICTS)
346    script_writer = test_utils.MockScriptWriter()
347    target_info.WriteMountOemScript(script_writer)
348    self.assertEqual([('Mount', '/oem', None)], script_writer.lines)
349
350  def test_WriteDeviceAssertions(self):
351    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
352    script_writer = test_utils.MockScriptWriter()
353    target_info.WriteDeviceAssertions(script_writer, False)
354    self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines)
355
356  def test_WriteDeviceAssertions_with_oem_props(self):
357    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
358                                   self.TEST_OEM_DICTS)
359    script_writer = test_utils.MockScriptWriter()
360    target_info.WriteDeviceAssertions(script_writer, False)
361    self.assertEqual(
362        [
363            ('AssertOemProperty', 'ro.product.device',
364             ['device1', 'device2', 'device3'], False),
365            ('AssertOemProperty', 'ro.product.brand',
366             ['brand1', 'brand2', 'brand3'], False),
367        ],
368        script_writer.lines)
369
370  def test_ResolveRoProductProperty_FromVendor(self):
371    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
372    info = common.BuildInfo(info_dict, None)
373    self.assertEqual('vendor-product-device',
374                     info.GetBuildProp('ro.product.device'))
375
376  def test_ResolveRoProductProperty_FromSystem(self):
377    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
378    del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device']
379    info = common.BuildInfo(info_dict, None)
380    self.assertEqual('system-product-device',
381                     info.GetBuildProp('ro.product.device'))
382
383  def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self):
384    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
385    info_dict['build.prop'].build_props[
386        'ro.product.property_source_order'] = 'bad-source'
387    with self.assertRaisesRegexp(common.ExternalError,
388                                 'Invalid ro.product.property_source_order'):
389      info = common.BuildInfo(info_dict, None)
390      info.GetBuildProp('ro.product.device')
391
392  def test_ResolveRoProductProperty_Android10PropertySearchOrder(self):
393    info_dict = copy.deepcopy(
394        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10)
395    info = common.BuildInfo(info_dict, None)
396    self.assertEqual('vendor-product-device',
397                     info.GetBuildProp('ro.product.device'))
398
399  def test_ResolveRoProductProperty_Android9PropertySearchOrder(self):
400    info_dict = copy.deepcopy(
401        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9)
402    info = common.BuildInfo(info_dict, None)
403    self.assertEqual('product-device',
404                     info.GetBuildProp('ro.product.device'))
405
406
407class CommonZipTest(test_utils.ReleaseToolsTestCase):
408
409  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
410              test_file_name=None, expected_stat=None, expected_mode=0o644,
411              expected_compress_type=zipfile.ZIP_STORED):
412    # Verify the stat if present.
413    if test_file_name is not None:
414      new_stat = os.stat(test_file_name)
415      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
416      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
417
418    # Reopen the zip file to verify.
419    zip_file = zipfile.ZipFile(zip_file_name, "r", allowZip64=True)
420
421    # Verify the timestamp.
422    info = zip_file.getinfo(arcname)
423    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
424
425    # Verify the file mode.
426    mode = (info.external_attr >> 16) & 0o777
427    self.assertEqual(mode, expected_mode)
428
429    # Verify the compress type.
430    self.assertEqual(info.compress_type, expected_compress_type)
431
432    # Verify the zip contents.
433    entry = zip_file.open(arcname)
434    sha1_hash = sha1()
435    for chunk in iter(lambda: entry.read(4 * MiB), b''):
436      sha1_hash.update(chunk)
437    self.assertEqual(expected_hash, sha1_hash.hexdigest())
438    self.assertIsNone(zip_file.testzip())
439
440  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
441    with tempfile.NamedTemporaryFile() as test_file:
442      test_file_name = test_file.name
443      for data in contents:
444        test_file.write(bytes(data))
445      return self._test_ZipWriteFile(test_file_name, extra_zipwrite_args)
446
447  def _test_ZipWriteFile(self, test_file_name, extra_zipwrite_args=None):
448    extra_zipwrite_args = dict(extra_zipwrite_args or {})
449
450    test_file = tempfile.NamedTemporaryFile(delete=False)
451    test_file_name = test_file.name
452
453    zip_file = tempfile.NamedTemporaryFile(delete=False)
454    zip_file_name = zip_file.name
455
456    # File names within an archive strip the leading slash.
457    arcname = extra_zipwrite_args.get("arcname", test_file_name)
458    if arcname[0] == "/":
459      arcname = arcname[1:]
460    sha1_hash = hash_file(test_file_name)
461
462    zip_file.close()
463    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
464
465    try:
466      expected_mode = extra_zipwrite_args.get("perms", 0o644)
467      expected_compress_type = extra_zipwrite_args.get("compress_type",
468                                                       zipfile.ZIP_STORED)
469
470      # Arbitrary timestamp, just to make sure common.ZipWrite() restores
471      # the timestamp after writing.
472      os.utime(test_file_name, (1234567, 1234567))
473      expected_stat = os.stat(test_file_name)
474      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
475      common.ZipClose(zip_file)
476
477      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
478                   test_file_name, expected_stat, expected_mode,
479                   expected_compress_type)
480    finally:
481      os.remove(zip_file_name)
482
483  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
484    extra_args = dict(extra_args or {})
485
486    zip_file = tempfile.NamedTemporaryFile(delete=False)
487    zip_file_name = zip_file.name
488    zip_file.close()
489
490    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
491
492    try:
493      expected_compress_type = extra_args.get("compress_type",
494                                              zipfile.ZIP_STORED)
495      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
496        arcname = zinfo_or_arcname
497        expected_mode = extra_args.get("perms", 0o644)
498      else:
499        arcname = zinfo_or_arcname.filename
500        if zinfo_or_arcname.external_attr:
501          zinfo_perms = zinfo_or_arcname.external_attr >> 16
502        else:
503          zinfo_perms = 0o600
504        expected_mode = extra_args.get("perms", zinfo_perms)
505
506      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
507      common.ZipClose(zip_file)
508
509      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
510                   expected_mode=expected_mode,
511                   expected_compress_type=expected_compress_type)
512    finally:
513      os.remove(zip_file_name)
514
515  def _test_ZipWriteStr_large_file(self, large_file: BinaryIO, small, extra_args=None):
516    extra_args = dict(extra_args or {})
517
518    zip_file = tempfile.NamedTemporaryFile(delete=False)
519    zip_file_name = zip_file.name
520
521    test_file_name = large_file.name
522
523    arcname_large = test_file_name
524    arcname_small = "bar"
525
526    # File names within an archive strip the leading slash.
527    if arcname_large[0] == "/":
528      arcname_large = arcname_large[1:]
529
530    zip_file.close()
531    zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
532
533    try:
534      sha1_hash = hash_file(test_file_name)
535
536      # Arbitrary timestamp, just to make sure common.ZipWrite() restores
537      # the timestamp after writing.
538      os.utime(test_file_name, (1234567, 1234567))
539      expected_stat = os.stat(test_file_name)
540      expected_mode = 0o644
541      expected_compress_type = extra_args.get("compress_type",
542                                              zipfile.ZIP_STORED)
543
544      common.ZipWrite(zip_file, test_file_name, **extra_args)
545      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
546      common.ZipClose(zip_file)
547
548      # Verify the contents written by ZipWrite().
549      self._verify(zip_file, zip_file_name, arcname_large,
550                   sha1_hash.hexdigest(), test_file_name, expected_stat,
551                   expected_mode, expected_compress_type)
552
553      # Verify the contents written by ZipWriteStr().
554      self._verify(zip_file, zip_file_name, arcname_small,
555                   sha1(small).hexdigest(),
556                   expected_compress_type=expected_compress_type)
557    finally:
558      os.remove(zip_file_name)
559
560  def _test_reset_ZIP64_LIMIT(self, func, *args):
561    default_limit = (1 << 31) - 1
562    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
563    func(*args)
564    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
565
566  def test_ZipWrite(self):
567    file_contents = os.urandom(1024)
568    self._test_ZipWrite(file_contents)
569
570  def test_ZipWrite_with_opts(self):
571    file_contents = os.urandom(1024)
572    self._test_ZipWrite(file_contents, {
573        "arcname": "foobar",
574        "perms": 0o777,
575        "compress_type": zipfile.ZIP_DEFLATED,
576    })
577    self._test_ZipWrite(file_contents, {
578        "arcname": "foobar",
579        "perms": 0o700,
580        "compress_type": zipfile.ZIP_STORED,
581    })
582
583  def test_ZipWrite_large_file(self):
584    with get_2gb_file() as tmpfile:
585      self._test_ZipWriteFile(tmpfile.name, {
586          "compress_type": zipfile.ZIP_DEFLATED,
587      })
588
589  def test_ZipWrite_resets_ZIP64_LIMIT(self):
590    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
591
592  def test_ZipWriteStr(self):
593    random_string = os.urandom(1024)
594    # Passing arcname
595    self._test_ZipWriteStr("foo", random_string)
596
597    # Passing zinfo
598    zinfo = zipfile.ZipInfo(filename="foo")
599    self._test_ZipWriteStr(zinfo, random_string)
600
601    # Timestamp in the zinfo should be overwritten.
602    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
603    self._test_ZipWriteStr(zinfo, random_string)
604
605  def test_ZipWriteStr_with_opts(self):
606    random_string = os.urandom(1024)
607    # Passing arcname
608    self._test_ZipWriteStr("foo", random_string, {
609        "perms": 0o700,
610        "compress_type": zipfile.ZIP_DEFLATED,
611    })
612    self._test_ZipWriteStr("bar", random_string, {
613        "compress_type": zipfile.ZIP_STORED,
614    })
615
616    # Passing zinfo
617    zinfo = zipfile.ZipInfo(filename="foo")
618    self._test_ZipWriteStr(zinfo, random_string, {
619        "compress_type": zipfile.ZIP_DEFLATED,
620    })
621    self._test_ZipWriteStr(zinfo, random_string, {
622        "perms": 0o600,
623        "compress_type": zipfile.ZIP_STORED,
624    })
625    self._test_ZipWriteStr(zinfo, random_string, {
626        "perms": 0o000,
627        "compress_type": zipfile.ZIP_STORED,
628    })
629
630  def test_ZipWriteStr_large_file(self):
631    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
632    # the workaround. We will only test the case of writing a string into a
633    # large archive.
634    short_string = os.urandom(1024)
635    with get_2gb_file() as large_file:
636      self._test_ZipWriteStr_large_file(large_file, short_string, {
637          "compress_type": zipfile.ZIP_DEFLATED,
638      })
639
640  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
641    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'')
642    zinfo = zipfile.ZipInfo(filename="foo")
643    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'')
644
645  def test_bug21309935(self):
646    zip_file = tempfile.NamedTemporaryFile(delete=False)
647    zip_file_name = zip_file.name
648    zip_file.close()
649
650    try:
651      random_string = os.urandom(1024)
652      zip_file = zipfile.ZipFile(zip_file_name, "w", allowZip64=True)
653      # Default perms should be 0o644 when passing the filename.
654      common.ZipWriteStr(zip_file, "foo", random_string)
655      # Honor the specified perms.
656      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
657      # The perms in zinfo should be untouched.
658      zinfo = zipfile.ZipInfo(filename="baz")
659      zinfo.external_attr = 0o740 << 16
660      common.ZipWriteStr(zip_file, zinfo, random_string)
661      # Explicitly specified perms has the priority.
662      zinfo = zipfile.ZipInfo(filename="qux")
663      zinfo.external_attr = 0o700 << 16
664      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
665      common.ZipClose(zip_file)
666
667      self._verify(zip_file, zip_file_name, "foo",
668                   sha1(random_string).hexdigest(),
669                   expected_mode=0o644)
670      self._verify(zip_file, zip_file_name, "bar",
671                   sha1(random_string).hexdigest(),
672                   expected_mode=0o755)
673      self._verify(zip_file, zip_file_name, "baz",
674                   sha1(random_string).hexdigest(),
675                   expected_mode=0o740)
676      self._verify(zip_file, zip_file_name, "qux",
677                   sha1(random_string).hexdigest(),
678                   expected_mode=0o400)
679    finally:
680      os.remove(zip_file_name)
681
682  @test_utils.SkipIfExternalToolsUnavailable()
683  def test_ZipDelete(self):
684    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
685    output_zip = zipfile.ZipFile(zip_file.name, 'w',
686                                 compression=zipfile.ZIP_DEFLATED)
687    with tempfile.NamedTemporaryFile() as entry_file:
688      entry_file.write(os.urandom(1024))
689      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
690      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
691      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
692      common.ZipClose(output_zip)
693    zip_file.close()
694
695    try:
696      common.ZipDelete(zip_file.name, 'Test2')
697      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
698        entries = check_zip.namelist()
699        self.assertTrue('Test1' in entries)
700        self.assertFalse('Test2' in entries)
701        self.assertTrue('Test3' in entries)
702
703      self.assertRaises(
704          common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
705      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
706        entries = check_zip.namelist()
707        self.assertTrue('Test1' in entries)
708        self.assertFalse('Test2' in entries)
709        self.assertTrue('Test3' in entries)
710
711      common.ZipDelete(zip_file.name, ['Test3'])
712      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
713        entries = check_zip.namelist()
714        self.assertTrue('Test1' in entries)
715        self.assertFalse('Test2' in entries)
716        self.assertFalse('Test3' in entries)
717
718      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
719      with zipfile.ZipFile(zip_file.name, 'r', allowZip64=True) as check_zip:
720        entries = check_zip.namelist()
721        self.assertFalse('Test1' in entries)
722        self.assertFalse('Test2' in entries)
723        self.assertFalse('Test3' in entries)
724    finally:
725      os.remove(zip_file.name)
726
727  @staticmethod
728  def _test_UnzipTemp_createZipFile():
729    zip_file = common.MakeTempFile(suffix='.zip')
730    output_zip = zipfile.ZipFile(
731        zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
732    contents = os.urandom(1024)
733    with tempfile.NamedTemporaryFile() as entry_file:
734      entry_file.write(contents)
735      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
736      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
737      common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
738      common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
739      common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
740      common.ZipClose(output_zip)
741    common.ZipClose(output_zip)
742    return zip_file
743
744  @test_utils.SkipIfExternalToolsUnavailable()
745  def test_UnzipTemp(self):
746    zip_file = self._test_UnzipTemp_createZipFile()
747    unzipped_dir = common.UnzipTemp(zip_file)
748    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
749    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
750    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
751    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
752    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
753
754  @test_utils.SkipIfExternalToolsUnavailable()
755  def test_UnzipTemp_withPatterns(self):
756    zip_file = self._test_UnzipTemp_createZipFile()
757
758    unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
759    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
760    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
761    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
762    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
763    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
764
765    unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
766    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
767    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
768    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
769    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
770    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
771
772    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
773    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
774    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
775    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
776    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
777    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
778
779    unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
780    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
781    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
782    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
783    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
784    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
785
786  def test_UnzipTemp_withEmptyPatterns(self):
787    zip_file = self._test_UnzipTemp_createZipFile()
788    unzipped_dir = common.UnzipTemp(zip_file, [])
789    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
790    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
791    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
792    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
793    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
794
795  @test_utils.SkipIfExternalToolsUnavailable()
796  def test_UnzipTemp_withPartiallyMatchingPatterns(self):
797    zip_file = self._test_UnzipTemp_createZipFile()
798    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
799    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
800    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
801    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
802    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
803    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
804
805  def test_UnzipTemp_withNoMatchingPatterns(self):
806    zip_file = self._test_UnzipTemp_createZipFile()
807    unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
808    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
809    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
810    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
811    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
812    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
813
814
815class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
816  """Tests the APK utils related functions."""
817
818  APKCERTS_TXT1 = (
819      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
820      ' private_key="certs/devkey.pk8"\n'
821      'name="Settings.apk"'
822      ' certificate="build/make/target/product/security/platform.x509.pem"'
823      ' private_key="build/make/target/product/security/platform.pk8"\n'
824      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
825  )
826
827  APKCERTS_CERTMAP1 = {
828      'RecoveryLocalizer.apk': 'certs/devkey',
829      'Settings.apk': 'build/make/target/product/security/platform',
830      'TV.apk': 'PRESIGNED',
831  }
832
833  APKCERTS_TXT2 = (
834      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
835      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
836      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
837      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
838      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
839      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
840      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
841      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
842  )
843
844  APKCERTS_CERTMAP2 = {
845      'Compressed1.apk': 'certs/compressed1',
846      'Compressed2a.apk': 'certs/compressed2',
847      'Compressed2b.apk': 'certs/compressed2',
848      'Compressed3.apk': 'certs/compressed3',
849  }
850
851  APKCERTS_TXT3 = (
852      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
853      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
854  )
855
856  APKCERTS_CERTMAP3 = {
857      'Compressed4.apk': 'certs/compressed4',
858  }
859
860  # Test parsing with no optional fields, both optional fields, and only the
861  # partition optional field.
862  APKCERTS_TXT4 = (
863      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
864      ' private_key="certs/devkey.pk8"\n'
865      'name="Settings.apk"'
866      ' certificate="build/make/target/product/security/platform.x509.pem"'
867      ' private_key="build/make/target/product/security/platform.pk8"'
868      ' compressed="gz" partition="system"\n'
869      'name="TV.apk" certificate="PRESIGNED" private_key=""'
870      ' partition="product"\n'
871  )
872
873  APKCERTS_CERTMAP4 = {
874      'RecoveryLocalizer.apk': 'certs/devkey',
875      'Settings.apk': 'build/make/target/product/security/platform',
876      'TV.apk': 'PRESIGNED',
877  }
878
879  def setUp(self):
880    self.testdata_dir = test_utils.get_testdata_dir()
881
882  @staticmethod
883  def _write_apkcerts_txt(apkcerts_txt, additional=None):
884    if additional is None:
885      additional = []
886    target_files = common.MakeTempFile(suffix='.zip')
887    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
888      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
889      for entry in additional:
890        target_files_zip.writestr(entry, '')
891    return target_files
892
893  def test_ReadApkCerts_NoncompressedApks(self):
894    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
895    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
896      certmap, ext = common.ReadApkCerts(input_zip)
897
898    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
899    self.assertIsNone(ext)
900
901  def test_ReadApkCerts_CompressedApks(self):
902    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
903    # not stored in '.gz' format, so it shouldn't be considered as installed.
904    target_files = self._write_apkcerts_txt(
905        self.APKCERTS_TXT2,
906        ['Compressed1.apk.gz', 'Compressed3.apk'])
907
908    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
909      certmap, ext = common.ReadApkCerts(input_zip)
910
911    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
912    self.assertEqual('.gz', ext)
913
914    # Alternative case with '.xz'.
915    target_files = self._write_apkcerts_txt(
916        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
917
918    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
919      certmap, ext = common.ReadApkCerts(input_zip)
920
921    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
922    self.assertEqual('.xz', ext)
923
924  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
925    target_files = self._write_apkcerts_txt(
926        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
927        ['Compressed1.apk.gz', 'Compressed3.apk'])
928
929    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
930      certmap, ext = common.ReadApkCerts(input_zip)
931
932    certmap_merged = self.APKCERTS_CERTMAP1.copy()
933    certmap_merged.update(self.APKCERTS_CERTMAP2)
934    self.assertDictEqual(certmap_merged, certmap)
935    self.assertEqual('.gz', ext)
936
937  def test_ReadApkCerts_MultipleCompressionMethods(self):
938    target_files = self._write_apkcerts_txt(
939        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
940        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
941
942    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
943      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
944
945  def test_ReadApkCerts_MismatchingKeys(self):
946    malformed_apkcerts_txt = (
947        'name="App1.apk" certificate="certs/cert1.x509.pem"'
948        ' private_key="certs/cert2.pk8"\n'
949    )
950    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
951
952    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
953      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
954
955  def test_ReadApkCerts_WithWithoutOptionalFields(self):
956    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4)
957    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
958      certmap, ext = common.ReadApkCerts(input_zip)
959
960    self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap)
961    self.assertIsNone(ext)
962
963  def test_ExtractPublicKey(self):
964    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
965    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
966    with open(pubkey) as pubkey_fp:
967      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
968
969  def test_ExtractPublicKey_invalidInput(self):
970    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
971    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
972
973  @test_utils.SkipIfExternalToolsUnavailable()
974  def test_ExtractAvbPublicKey(self):
975    privkey = os.path.join(self.testdata_dir, 'testkey.key')
976    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
977    extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey)
978    extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey)
979    with open(extracted_from_privkey, 'rb') as privkey_fp, \
980            open(extracted_from_pubkey, 'rb') as pubkey_fp:
981      self.assertEqual(privkey_fp.read(), pubkey_fp.read())
982
983  def test_ParseCertificate(self):
984    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
985
986    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
987    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
988                      universal_newlines=False)
989    expected, _ = proc.communicate()
990    self.assertEqual(0, proc.returncode)
991
992    with open(cert) as cert_fp:
993      actual = common.ParseCertificate(cert_fp.read())
994    self.assertEqual(expected, actual)
995
996  @test_utils.SkipIfExternalToolsUnavailable()
997  def test_GetMinSdkVersion(self):
998    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
999    self.assertEqual('24', common.GetMinSdkVersion(test_app))
1000
1001  @test_utils.SkipIfExternalToolsUnavailable()
1002  def test_GetMinSdkVersion_invalidInput(self):
1003    self.assertRaises(
1004        common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
1005
1006  @test_utils.SkipIfExternalToolsUnavailable()
1007  def test_GetMinSdkVersionInt(self):
1008    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
1009    self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
1010
1011  @test_utils.SkipIfExternalToolsUnavailable()
1012  def test_GetMinSdkVersionInt_invalidInput(self):
1013    self.assertRaises(
1014        common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
1015        {})
1016
1017
1018class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
1019
1020  def setUp(self):
1021    self.testdata_dir = test_utils.get_testdata_dir()
1022
1023  @test_utils.SkipIfExternalToolsUnavailable()
1024  def test_GetSparseImage_emptyBlockMapFile(self):
1025    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1026    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1027      target_files_zip.write(
1028          test_utils.construct_sparse_image([
1029              (0xCAC1, 6),
1030              (0xCAC3, 3),
1031              (0xCAC1, 4)]),
1032          arcname='IMAGES/system.img')
1033      target_files_zip.writestr('IMAGES/system.map', '')
1034      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1035      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1036
1037    tempdir = common.UnzipTemp(target_files)
1038    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1039      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1040
1041    self.assertDictEqual(
1042        {
1043            '__COPY': RangeSet("0"),
1044            '__NONZERO-0': RangeSet("1-5 9-12"),
1045        },
1046        sparse_image.file_map)
1047
1048  def test_PartitionMapFromTargetFiles(self):
1049    target_files_dir = common.MakeTempDir()
1050    os.makedirs(os.path.join(target_files_dir, 'SYSTEM'))
1051    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor'))
1052    os.makedirs(os.path.join(target_files_dir, 'PRODUCT'))
1053    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'product'))
1054    os.makedirs(os.path.join(target_files_dir, 'SYSTEM', 'vendor', 'odm'))
1055    os.makedirs(os.path.join(target_files_dir, 'VENDOR_DLKM'))
1056    partition_map = common.PartitionMapFromTargetFiles(target_files_dir)
1057    self.assertDictEqual(
1058        partition_map,
1059        {
1060            'system': 'SYSTEM',
1061            'vendor': 'SYSTEM/vendor',
1062            # Prefer PRODUCT over SYSTEM/product
1063            'product': 'PRODUCT',
1064            'odm': 'SYSTEM/vendor/odm',
1065            'vendor_dlkm': 'VENDOR_DLKM',
1066            # No system_ext or odm_dlkm
1067        })
1068
1069  def test_SharedUidPartitionViolations(self):
1070    uid_dict = {
1071        'android.uid.phone': {
1072            'system': ['system_phone.apk'],
1073            'system_ext': ['system_ext_phone.apk'],
1074        },
1075        'android.uid.wifi': {
1076            'vendor': ['vendor_wifi.apk'],
1077            'odm': ['odm_wifi.apk'],
1078        },
1079    }
1080    errors = common.SharedUidPartitionViolations(
1081        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1082    self.assertEqual(errors, [])
1083
1084  def test_SharedUidPartitionViolations_Violation(self):
1085    uid_dict = {
1086        'android.uid.phone': {
1087            'system': ['system_phone.apk'],
1088            'vendor': ['vendor_phone.apk'],
1089        },
1090    }
1091    errors = common.SharedUidPartitionViolations(
1092        uid_dict, [('system', 'system_ext'), ('vendor', 'odm')])
1093    self.assertIn(
1094        ('APK sharedUserId "android.uid.phone" found across partition groups '
1095         'in partitions "system,vendor"'), errors)
1096
1097  def test_GetSparseImage_missingImageFile(self):
1098    self.assertRaises(
1099        AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
1100        None, False)
1101    self.assertRaises(
1102        AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
1103        None, False)
1104
1105  @test_utils.SkipIfExternalToolsUnavailable()
1106  def test_GetSparseImage_missingBlockMapFile(self):
1107    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1108    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1109      target_files_zip.write(
1110          test_utils.construct_sparse_image([
1111              (0xCAC1, 6),
1112              (0xCAC3, 3),
1113              (0xCAC1, 4)]),
1114          arcname='IMAGES/system.img')
1115      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1116      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1117
1118    tempdir = common.UnzipTemp(target_files)
1119    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1120      self.assertRaises(
1121          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1122          False)
1123
1124  @test_utils.SkipIfExternalToolsUnavailable()
1125  def test_GetSparseImage_sharedBlocks_notAllowed(self):
1126    """Tests the case of having overlapping blocks but disallowed."""
1127    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1128    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1129      target_files_zip.write(
1130          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1131          arcname='IMAGES/system.img')
1132      # Block 10 is shared between two files.
1133      target_files_zip.writestr(
1134          'IMAGES/system.map',
1135          '\n'.join([
1136              '/system/file1 1-5 9-10',
1137              '/system/file2 10-12']))
1138      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1139      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1140
1141    tempdir = common.UnzipTemp(target_files)
1142    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1143      self.assertRaises(
1144          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1145          False)
1146
1147  @test_utils.SkipIfExternalToolsUnavailable()
1148  def test_GetSparseImage_sharedBlocks_allowed(self):
1149    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
1150    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1151    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1152      # Construct an image with a care_map of "0-5 9-12".
1153      target_files_zip.write(
1154          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1155          arcname='IMAGES/system.img')
1156      # Block 10 is shared between two files.
1157      target_files_zip.writestr(
1158          'IMAGES/system.map',
1159          '\n'.join([
1160              '/system/file1 1-5 9-10',
1161              '/system/file2 10-12']))
1162      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1163      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1164
1165    tempdir = common.UnzipTemp(target_files)
1166    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1167      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
1168
1169    self.assertDictEqual(
1170        {
1171            '__COPY': RangeSet("0"),
1172            '__NONZERO-0': RangeSet("6-8 13-15"),
1173            '/system/file1': RangeSet("1-5 9-10"),
1174            '/system/file2': RangeSet("11-12"),
1175        },
1176        sparse_image.file_map)
1177
1178    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
1179    # 'incomplete'.
1180    self.assertTrue(
1181        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
1182    self.assertNotIn(
1183        'incomplete', sparse_image.file_map['/system/file2'].extra)
1184
1185    # '/system/file1' will only contain one field -- a copy of the input text.
1186    self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra))
1187
1188    # Meta entries should not have any extra tag.
1189    self.assertFalse(sparse_image.file_map['__COPY'].extra)
1190    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
1191
1192  @test_utils.SkipIfExternalToolsUnavailable()
1193  def test_GetSparseImage_incompleteRanges(self):
1194    """Tests the case of ext4 images with holes."""
1195    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1196    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1197      target_files_zip.write(
1198          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1199          arcname='IMAGES/system.img')
1200      target_files_zip.writestr(
1201          'IMAGES/system.map',
1202          '\n'.join([
1203              '/system/file1 1-5 9-10',
1204              '/system/file2 11-12']))
1205      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1206      # '/system/file2' has less blocks listed (2) than actual (3).
1207      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1208
1209    tempdir = common.UnzipTemp(target_files)
1210    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1211      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1212
1213    self.assertEqual(
1214        '1-5 9-10',
1215        sparse_image.file_map['/system/file1'].extra['text_str'])
1216    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
1217
1218  @test_utils.SkipIfExternalToolsUnavailable()
1219  def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
1220    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1221    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1222      target_files_zip.write(
1223          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1224          arcname='IMAGES/system.img')
1225      target_files_zip.writestr(
1226          'IMAGES/system.map',
1227          '\n'.join([
1228              '//system/file1 1-5 9-10',
1229              '//system/file2 11-12',
1230              '/system/app/file3 13-15']))
1231      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1232      # '/system/file2' has less blocks listed (2) than actual (3).
1233      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1234      # '/system/app/file3' has less blocks listed (3) than actual (4).
1235      target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
1236
1237    tempdir = common.UnzipTemp(target_files)
1238    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1239      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1240
1241    self.assertEqual(
1242        '1-5 9-10',
1243        sparse_image.file_map['//system/file1'].extra['text_str'])
1244    self.assertTrue(
1245        sparse_image.file_map['//system/file2'].extra['incomplete'])
1246    self.assertTrue(
1247        sparse_image.file_map['/system/app/file3'].extra['incomplete'])
1248
1249  @test_utils.SkipIfExternalToolsUnavailable()
1250  def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
1251    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1252    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1253      target_files_zip.write(
1254          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1255          arcname='IMAGES/system.img')
1256      target_files_zip.writestr(
1257          'IMAGES/system.map',
1258          '\n'.join([
1259              '//system/file1 1-5 9-10',
1260              '//init.rc 13-15']))
1261      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1262      # '/init.rc' has less blocks listed (3) than actual (4).
1263      target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
1264
1265    tempdir = common.UnzipTemp(target_files)
1266    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1267      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1268
1269    self.assertEqual(
1270        '1-5 9-10',
1271        sparse_image.file_map['//system/file1'].extra['text_str'])
1272    self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
1273
1274  @test_utils.SkipIfExternalToolsUnavailable()
1275  def test_GetSparseImage_fileNotFound(self):
1276    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1277    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1278      target_files_zip.write(
1279          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1280          arcname='IMAGES/system.img')
1281      target_files_zip.writestr(
1282          'IMAGES/system.map',
1283          '\n'.join([
1284              '//system/file1 1-5 9-10',
1285              '//system/file2 11-12']))
1286      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1287
1288    tempdir = common.UnzipTemp(target_files)
1289    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as input_zip:
1290      self.assertRaises(
1291          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1292          False)
1293
1294  @test_utils.SkipIfExternalToolsUnavailable()
1295  def test_GetAvbChainedPartitionArg(self):
1296    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1297    info_dict = {
1298        'avb_avbtool': 'avbtool',
1299        'avb_system_key_path': pubkey,
1300        'avb_system_rollback_index_location': 2,
1301    }
1302    chained_partition_args = common.GetAvbChainedPartitionArg(
1303        'system', info_dict)
1304    self.assertEqual('system', chained_partition_args.partition)
1305    self.assertEqual(2, chained_partition_args.rollback_index_location)
1306    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
1307
1308  @test_utils.SkipIfExternalToolsUnavailable()
1309  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
1310    key = os.path.join(self.testdata_dir, 'testkey.key')
1311    info_dict = {
1312        'avb_avbtool': 'avbtool',
1313        'avb_product_key_path': key,
1314        'avb_product_rollback_index_location': 2,
1315    }
1316    chained_partition_args = common.GetAvbChainedPartitionArg(
1317        'product', info_dict)
1318    self.assertEqual('product', chained_partition_args.partition)
1319    self.assertEqual(2, chained_partition_args.rollback_index_location)
1320    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
1321
1322  @test_utils.SkipIfExternalToolsUnavailable()
1323  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
1324    info_dict = {
1325        'avb_avbtool': 'avbtool',
1326        'avb_system_key_path': 'does-not-exist',
1327        'avb_system_rollback_index_location': 2,
1328    }
1329    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1330    chained_partition_args = common.GetAvbChainedPartitionArg(
1331        'system', info_dict, pubkey)
1332    self.assertEqual('system', chained_partition_args.partition)
1333    self.assertEqual(2, chained_partition_args.rollback_index_location)
1334    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
1335
1336  @test_utils.SkipIfExternalToolsUnavailable()
1337  def test_GetAvbChainedPartitionArg_invalidKey(self):
1338    pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
1339    info_dict = {
1340        'avb_avbtool': 'avbtool',
1341        'avb_system_key_path': pubkey,
1342        'avb_system_rollback_index_location': 2,
1343    }
1344    self.assertRaises(
1345        common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
1346        info_dict)
1347
1348  INFO_DICT_DEFAULT = {
1349      'recovery_api_version': 3,
1350      'fstab_version': 2,
1351      'no_recovery': 'true',
1352      'recovery_as_boot': 'true',
1353  }
1354
1355  def test_LoadListFromFile(self):
1356    file_path = os.path.join(self.testdata_dir,
1357                             'merge_config_framework_item_list')
1358    contents = common.LoadListFromFile(file_path)
1359    expected_contents = [
1360        'META/apkcerts.txt',
1361        'META/filesystem_config.txt',
1362        'META/root_filesystem_config.txt',
1363        'META/system_manifest.xml',
1364        'META/system_matrix.xml',
1365        'META/update_engine_config.txt',
1366        'PRODUCT/*',
1367        'ROOT/*',
1368        'SYSTEM/*',
1369    ]
1370    self.assertEqual(sorted(contents), sorted(expected_contents))
1371
1372  @staticmethod
1373  def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
1374    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1375    with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
1376      info_values = ''.join(
1377          ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())])
1378      common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
1379      common.ZipWriteStr(target_files_zip, fstab_path,
1380                         "/dev/block/system /system ext4 ro,barrier=1 defaults")
1381      common.ZipWriteStr(
1382          target_files_zip, 'META/file_contexts', 'file-contexts')
1383    return target_files
1384
1385  def test_LoadInfoDict(self):
1386    target_files = self._test_LoadInfoDict_createTargetFiles(
1387        self.INFO_DICT_DEFAULT,
1388        'BOOT/RAMDISK/system/etc/recovery.fstab')
1389    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1390      loaded_dict = common.LoadInfoDict(target_files_zip)
1391      self.assertEqual(3, loaded_dict['recovery_api_version'])
1392      self.assertEqual(2, loaded_dict['fstab_version'])
1393      self.assertIn('/system', loaded_dict['fstab'])
1394
1395  def test_LoadInfoDict_legacyRecoveryFstabPath(self):
1396    target_files = self._test_LoadInfoDict_createTargetFiles(
1397        self.INFO_DICT_DEFAULT,
1398        'BOOT/RAMDISK/etc/recovery.fstab')
1399    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1400      loaded_dict = common.LoadInfoDict(target_files_zip)
1401      self.assertEqual(3, loaded_dict['recovery_api_version'])
1402      self.assertEqual(2, loaded_dict['fstab_version'])
1403      self.assertIn('/system', loaded_dict['fstab'])
1404
1405  @test_utils.SkipIfExternalToolsUnavailable()
1406  def test_LoadInfoDict_dirInput(self):
1407    target_files = self._test_LoadInfoDict_createTargetFiles(
1408        self.INFO_DICT_DEFAULT,
1409        'BOOT/RAMDISK/system/etc/recovery.fstab')
1410    unzipped = common.UnzipTemp(target_files)
1411    loaded_dict = common.LoadInfoDict(unzipped)
1412    self.assertEqual(3, loaded_dict['recovery_api_version'])
1413    self.assertEqual(2, loaded_dict['fstab_version'])
1414    self.assertIn('/system', loaded_dict['fstab'])
1415
1416  @test_utils.SkipIfExternalToolsUnavailable()
1417  def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
1418    target_files = self._test_LoadInfoDict_createTargetFiles(
1419        self.INFO_DICT_DEFAULT,
1420        'BOOT/RAMDISK/system/etc/recovery.fstab')
1421    unzipped = common.UnzipTemp(target_files)
1422    loaded_dict = common.LoadInfoDict(unzipped)
1423    self.assertEqual(3, loaded_dict['recovery_api_version'])
1424    self.assertEqual(2, loaded_dict['fstab_version'])
1425    self.assertIn('/system', loaded_dict['fstab'])
1426
1427  def test_LoadInfoDict_recoveryAsBootFalse(self):
1428    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1429    del info_dict['no_recovery']
1430    del info_dict['recovery_as_boot']
1431    target_files = self._test_LoadInfoDict_createTargetFiles(
1432        info_dict,
1433        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1434    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1435      loaded_dict = common.LoadInfoDict(target_files_zip)
1436      self.assertEqual(3, loaded_dict['recovery_api_version'])
1437      self.assertEqual(2, loaded_dict['fstab_version'])
1438      self.assertNotIn('/', loaded_dict['fstab'])
1439      self.assertIn('/system', loaded_dict['fstab'])
1440
1441  def test_LoadInfoDict_noRecoveryTrue(self):
1442    # Device doesn't have a recovery partition at all.
1443    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1444    del info_dict['recovery_as_boot']
1445    target_files = self._test_LoadInfoDict_createTargetFiles(
1446        info_dict,
1447        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1448    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1449      loaded_dict = common.LoadInfoDict(target_files_zip)
1450      self.assertEqual(3, loaded_dict['recovery_api_version'])
1451      self.assertEqual(2, loaded_dict['fstab_version'])
1452      self.assertIsNone(loaded_dict['fstab'])
1453
1454  @test_utils.SkipIfExternalToolsUnavailable()
1455  def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
1456    target_files = self._test_LoadInfoDict_createTargetFiles(
1457        self.INFO_DICT_DEFAULT,
1458        'BOOT/RAMDISK/system/etc/recovery.fstab')
1459    common.ZipDelete(target_files, 'META/misc_info.txt')
1460    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1461      self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1462
1463  @test_utils.SkipIfExternalToolsUnavailable()
1464  def test_LoadInfoDict_repacking(self):
1465    target_files = self._test_LoadInfoDict_createTargetFiles(
1466        self.INFO_DICT_DEFAULT,
1467        'BOOT/RAMDISK/system/etc/recovery.fstab')
1468    unzipped = common.UnzipTemp(target_files)
1469    loaded_dict = common.LoadInfoDict(unzipped, True)
1470    self.assertEqual(3, loaded_dict['recovery_api_version'])
1471    self.assertEqual(2, loaded_dict['fstab_version'])
1472    self.assertIn('/system', loaded_dict['fstab'])
1473    self.assertEqual(
1474        os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1475    self.assertEqual(
1476        os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1477        loaded_dict['root_fs_config'])
1478
1479  def test_LoadInfoDict_repackingWithZipFileInput(self):
1480    target_files = self._test_LoadInfoDict_createTargetFiles(
1481        self.INFO_DICT_DEFAULT,
1482        'BOOT/RAMDISK/system/etc/recovery.fstab')
1483    with zipfile.ZipFile(target_files, 'r', allowZip64=True) as target_files_zip:
1484      self.assertRaises(
1485          AssertionError, common.LoadInfoDict, target_files_zip, True)
1486
1487  def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self):
1488    framework_dict = {
1489        'use_dynamic_partitions': 'true',
1490        'super_partition_groups': 'group_a',
1491        'dynamic_partition_list': 'system',
1492        'super_group_a_partition_list': 'system',
1493    }
1494    vendor_dict = {
1495        'use_dynamic_partitions': 'true',
1496        'super_partition_groups': 'group_a group_b',
1497        'dynamic_partition_list': 'vendor product',
1498        'super_block_devices': 'super',
1499        'super_super_device_size': '3000',
1500        'super_group_a_partition_list': 'vendor',
1501        'super_group_a_group_size': '1000',
1502        'super_group_b_partition_list': 'product',
1503        'super_group_b_group_size': '2000',
1504    }
1505    merged_dict = common.MergeDynamicPartitionInfoDicts(
1506        framework_dict=framework_dict,
1507        vendor_dict=vendor_dict)
1508    expected_merged_dict = {
1509        'use_dynamic_partitions': 'true',
1510        'super_partition_groups': 'group_a group_b',
1511        'dynamic_partition_list': 'product system vendor',
1512        'super_block_devices': 'super',
1513        'super_super_device_size': '3000',
1514        'super_group_a_partition_list': 'system vendor',
1515        'super_group_a_group_size': '1000',
1516        'super_group_b_partition_list': 'product',
1517        'super_group_b_group_size': '2000',
1518        'vabc_cow_version': '2',
1519    }
1520    self.assertEqual(merged_dict, expected_merged_dict)
1521
1522  def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self):
1523    framework_dict = {
1524        'use_dynamic_partitions': 'true',
1525        'super_partition_groups': 'group_a',
1526        'dynamic_partition_list': 'system',
1527        'super_group_a_partition_list': 'system',
1528        'super_group_a_group_size': '5000',
1529        'vabc_cow_version': '3',
1530    }
1531    vendor_dict = {
1532        'use_dynamic_partitions': 'true',
1533        'super_partition_groups': 'group_a group_b',
1534        'dynamic_partition_list': 'vendor product',
1535        'super_group_a_partition_list': 'vendor',
1536        'super_group_a_group_size': '1000',
1537        'super_group_b_partition_list': 'product',
1538        'super_group_b_group_size': '2000',
1539    }
1540    merged_dict = common.MergeDynamicPartitionInfoDicts(
1541        framework_dict=framework_dict,
1542        vendor_dict=vendor_dict)
1543    expected_merged_dict = {
1544        'use_dynamic_partitions': 'true',
1545        'super_partition_groups': 'group_a group_b',
1546        'dynamic_partition_list': 'product system vendor',
1547        'super_group_a_partition_list': 'system vendor',
1548        'super_group_a_group_size': '1000',
1549        'super_group_b_partition_list': 'product',
1550        'super_group_b_group_size': '2000',
1551        'vabc_cow_version': '2',
1552    }
1553    self.assertEqual(merged_dict, expected_merged_dict)
1554
1555  def test_GetAvbPartitionArg(self):
1556    info_dict = {}
1557    cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict)
1558    self.assertEqual(
1559        [common.AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG, '/path/to/system.img'],
1560        cmd)
1561
1562  @test_utils.SkipIfExternalToolsUnavailable()
1563  def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self):
1564    testdata_dir = test_utils.get_testdata_dir()
1565    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1566    info_dict = {
1567        'avb_avbtool': 'avbtool',
1568        'avb_vendor_key_path': pubkey,
1569        'avb_vendor_rollback_index_location': 5,
1570    }
1571    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
1572    self.assertEqual(2, len(cmd))
1573    self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
1574    chained_partition_args = cmd[1]
1575    self.assertEqual('vendor', chained_partition_args.partition)
1576    self.assertEqual(5, chained_partition_args.rollback_index_location)
1577    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
1578
1579  @test_utils.SkipIfExternalToolsUnavailable()
1580  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
1581    testdata_dir = test_utils.get_testdata_dir()
1582    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1583    info_dict = {
1584        'avb_avbtool': 'avbtool',
1585        'avb_recovery_key_path': pubkey,
1586        'avb_recovery_rollback_index_location': 3,
1587    }
1588    cmd = common.GetAvbPartitionArg(
1589        'recovery', '/path/to/recovery.img', info_dict)
1590    self.assertFalse(cmd)
1591
1592  @test_utils.SkipIfExternalToolsUnavailable()
1593  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self):
1594    testdata_dir = test_utils.get_testdata_dir()
1595    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1596    info_dict = {
1597        'ab_update': 'true',
1598        'avb_avbtool': 'avbtool',
1599        'avb_recovery_key_path': pubkey,
1600        'avb_recovery_rollback_index_location': 3,
1601    }
1602    cmd = common.GetAvbPartitionArg(
1603        'recovery', '/path/to/recovery.img', info_dict)
1604    self.assertEqual(2, len(cmd))
1605    self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
1606    chained_partition_args = cmd[1]
1607    self.assertEqual('recovery', chained_partition_args.partition)
1608    self.assertEqual(3, chained_partition_args.rollback_index_location)
1609    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
1610
1611
1612class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
1613  """Checks the format of install-recovery.sh.
1614
1615  Its format should match between common.py and validate_target_files.py.
1616  """
1617
1618  def setUp(self):
1619    self._tempdir = common.MakeTempDir()
1620    # Create a fake dict that contains the fstab info for boot&recovery.
1621    self._info = {"fstab": {}}
1622    fake_fstab = [
1623        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1624        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
1625    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab)
1626    # Construct the gzipped recovery.img and boot.img
1627    self.recovery_data = bytearray([
1628        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1629        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1630        0x08, 0x00, 0x00, 0x00
1631    ])
1632    # echo -n "boot" | gzip -f | hd
1633    self.boot_data = bytearray([
1634        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1635        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1636    ])
1637
1638  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1639    loc = os.path.join(self._tempdir, prefix, name)
1640    if not os.path.exists(os.path.dirname(loc)):
1641      os.makedirs(os.path.dirname(loc))
1642    with open(loc, "wb") as f:
1643      f.write(data)
1644
1645  def test_full_recovery(self):
1646    recovery_image = common.File("recovery.img", self.recovery_data)
1647    boot_image = common.File("boot.img", self.boot_data)
1648    self._info["full_recovery_image"] = "true"
1649
1650    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1651                             recovery_image, boot_image, self._info)
1652    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1653                                                        self._info)
1654
1655  @test_utils.SkipIfExternalToolsUnavailable()
1656  def test_recovery_from_boot(self):
1657    recovery_image = common.File("recovery.img", self.recovery_data)
1658    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
1659    boot_image = common.File("boot.img", self.boot_data)
1660    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1661
1662    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1663                             recovery_image, boot_image, self._info)
1664    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1665                                                        self._info)
1666    # Validate 'recovery-from-boot' with bonus argument.
1667    self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
1668    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1669                             recovery_image, boot_image, self._info)
1670    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1671                                                        self._info)
1672
1673
1674class MockBlockDifference(object):
1675
1676  def __init__(self, partition, tgt, src=None):
1677    self.partition = partition
1678    self.tgt = tgt
1679    self.src = src
1680
1681  def WriteScript(self, script, _, progress=None,
1682                  write_verify_script=False):
1683    if progress:
1684      script.AppendExtra("progress({})".format(progress))
1685    script.AppendExtra("patch({});".format(self.partition))
1686    if write_verify_script:
1687      self.WritePostInstallVerifyScript(script)
1688
1689  def WritePostInstallVerifyScript(self, script):
1690    script.AppendExtra("verify({});".format(self.partition))
1691
1692
1693class FakeSparseImage(object):
1694
1695  def __init__(self, size):
1696    self.blocksize = 4096
1697    self.total_blocks = size // 4096
1698    assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1699
1700
1701class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1702
1703  @staticmethod
1704  def get_op_list(output_path):
1705    with zipfile.ZipFile(output_path, allowZip64=True) as output_zip:
1706      with output_zip.open('dynamic_partitions_op_list') as op_list:
1707        return [line.decode().strip() for line in op_list.readlines()
1708                if not line.startswith(b'#')]
1709
1710  def setUp(self):
1711    self.script = test_utils.MockScriptWriter()
1712    self.output_path = common.MakeTempFile(suffix='.zip')
1713
1714  def test_full(self):
1715    target_info = common.LoadDictionaryFromLines("""
1716dynamic_partition_list=system vendor
1717super_partition_groups=group_foo
1718super_group_foo_group_size={group_size}
1719super_group_foo_partition_list=system vendor
1720""".format(group_size=4 * GiB).split("\n"))
1721    block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1722                   MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1723
1724    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1725    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1726      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1727
1728    self.assertEqual(str(self.script).strip(), """
1729assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1730patch(system);
1731verify(system);
1732unmap_partition("system");
1733patch(vendor);
1734verify(vendor);
1735unmap_partition("vendor");
1736""".strip())
1737
1738    lines = self.get_op_list(self.output_path)
1739
1740    remove_all_groups = lines.index("remove_all_groups")
1741    add_group = lines.index("add_group group_foo 4294967296")
1742    add_vendor = lines.index("add vendor group_foo")
1743    add_system = lines.index("add system group_foo")
1744    resize_vendor = lines.index("resize vendor 1073741824")
1745    resize_system = lines.index("resize system 3221225472")
1746
1747    self.assertLess(remove_all_groups, add_group,
1748                    "Should add groups after removing all groups")
1749    self.assertLess(add_group, min(add_vendor, add_system),
1750                    "Should add partitions after adding group")
1751    self.assertLess(add_system, resize_system,
1752                    "Should resize system after adding it")
1753    self.assertLess(add_vendor, resize_vendor,
1754                    "Should resize vendor after adding it")
1755
1756  def test_inc_groups(self):
1757    source_info = common.LoadDictionaryFromLines("""
1758super_partition_groups=group_foo group_bar group_baz
1759super_group_foo_group_size={group_foo_size}
1760super_group_bar_group_size={group_bar_size}
1761""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1762    target_info = common.LoadDictionaryFromLines("""
1763super_partition_groups=group_foo group_baz group_qux
1764super_group_foo_group_size={group_foo_size}
1765super_group_baz_group_size={group_baz_size}
1766super_group_qux_group_size={group_qux_size}
1767""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1768           group_qux_size=1 * GiB).split("\n"))
1769
1770    dp_diff = common.DynamicPartitionsDifference(target_info,
1771                                                 block_diffs=[],
1772                                                 source_info_dict=source_info)
1773    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1774      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1775
1776    lines = self.get_op_list(self.output_path)
1777
1778    removed = lines.index("remove_group group_bar")
1779    shrunk = lines.index("resize_group group_foo 3221225472")
1780    grown = lines.index("resize_group group_baz 4294967296")
1781    added = lines.index("add_group group_qux 1073741824")
1782
1783    self.assertLess(max(removed, shrunk),
1784                    min(grown, added),
1785                    "ops that remove / shrink partitions must precede ops that "
1786                    "grow / add partitions")
1787
1788  def test_incremental(self):
1789    source_info = common.LoadDictionaryFromLines("""
1790dynamic_partition_list=system vendor product system_ext
1791super_partition_groups=group_foo
1792super_group_foo_group_size={group_foo_size}
1793super_group_foo_partition_list=system vendor product system_ext
1794""".format(group_foo_size=4 * GiB).split("\n"))
1795    target_info = common.LoadDictionaryFromLines("""
1796dynamic_partition_list=system vendor product odm
1797super_partition_groups=group_foo group_bar
1798super_group_foo_group_size={group_foo_size}
1799super_group_foo_partition_list=system vendor odm
1800super_group_bar_group_size={group_bar_size}
1801super_group_bar_partition_list=product
1802""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1803
1804    block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1805                                       src=FakeSparseImage(1024 * MiB)),
1806                   MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1807                                       src=FakeSparseImage(1024 * MiB)),
1808                   MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1809                                       src=FakeSparseImage(1024 * MiB)),
1810                   MockBlockDifference("system_ext", None,
1811                                       src=FakeSparseImage(1024 * MiB)),
1812                   MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1813                                       src=None)]
1814
1815    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1816                                                 source_info_dict=source_info)
1817    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1818      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1819
1820    metadata_idx = self.script.lines.index(
1821        'assert(update_dynamic_partitions(package_extract_file('
1822        '"dynamic_partitions_op_list")));')
1823    self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1824    self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1825    for p in ("product", "system", "odm"):
1826      patch_idx = self.script.lines.index("patch({});".format(p))
1827      verify_idx = self.script.lines.index("verify({});".format(p))
1828      self.assertLess(metadata_idx, patch_idx,
1829                      "Should patch {} after updating metadata".format(p))
1830      self.assertLess(patch_idx, verify_idx,
1831                      "Should verify {} after patching".format(p))
1832
1833    self.assertNotIn("patch(system_ext);", self.script.lines)
1834
1835    lines = self.get_op_list(self.output_path)
1836
1837    remove = lines.index("remove system_ext")
1838    move_product_out = lines.index("move product default")
1839    shrink = lines.index("resize vendor 536870912")
1840    shrink_group = lines.index("resize_group group_foo 3221225472")
1841    add_group_bar = lines.index("add_group group_bar 1073741824")
1842    add_odm = lines.index("add odm group_foo")
1843    grow_existing = lines.index("resize system 1610612736")
1844    grow_added = lines.index("resize odm 1073741824")
1845    move_product_in = lines.index("move product group_bar")
1846
1847    max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1848    min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1849
1850    self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1851                    "Must shrink group after partitions inside group are shrunk"
1852                    " / removed")
1853
1854    self.assertLess(add_group_bar, move_product_in,
1855                    "Must add partitions to group after group is added")
1856
1857    self.assertLess(max_idx_move_partition_out_foo,
1858                    min_idx_move_partition_in_foo,
1859                    "Must shrink partitions / remove partitions from group"
1860                    "before adding / moving partitions into group")
1861
1862  def test_remove_partition(self):
1863    source_info = common.LoadDictionaryFromLines("""
1864blockimgdiff_versions=3,4
1865use_dynamic_partitions=true
1866dynamic_partition_list=foo
1867super_partition_groups=group_foo
1868super_group_foo_group_size={group_foo_size}
1869super_group_foo_partition_list=foo
1870""".format(group_foo_size=4 * GiB).split("\n"))
1871    target_info = common.LoadDictionaryFromLines("""
1872blockimgdiff_versions=3,4
1873use_dynamic_partitions=true
1874super_partition_groups=group_foo
1875super_group_foo_group_size={group_foo_size}
1876""".format(group_foo_size=4 * GiB).split("\n"))
1877
1878    common.OPTIONS.info_dict = target_info
1879    common.OPTIONS.target_info_dict = target_info
1880    common.OPTIONS.source_info_dict = source_info
1881    common.OPTIONS.cache_size = 4 * 4096
1882
1883    block_diffs = [common.BlockDifference("foo", EmptyImage(),
1884                                          src=DataImage("source", pad=True))]
1885
1886    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1887                                                 source_info_dict=source_info)
1888    with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
1889      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1890
1891    self.assertNotIn("block_image_update", str(self.script),
1892                     "Removed partition should not be patched.")
1893
1894    lines = self.get_op_list(self.output_path)
1895    self.assertEqual(lines, ["remove foo"])
1896
1897
1898class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase):
1899  def setUp(self):
1900    self.odm_build_prop = [
1901        'ro.odm.build.date.utc=1578430045',
1902        'ro.odm.build.fingerprint='
1903        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1904        'ro.product.odm.device=coral',
1905        'import /odm/etc/build_${ro.boot.product.device_name}.prop',
1906    ]
1907
1908  @staticmethod
1909  def _BuildZipFile(entries):
1910    input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1911    with zipfile.ZipFile(input_file, 'w', allowZip64=True) as input_zip:
1912      for name, content in entries.items():
1913        input_zip.writestr(name, content)
1914
1915    return input_file
1916
1917  def test_parseBuildProps_noImportStatement(self):
1918    build_prop = [
1919        'ro.odm.build.date.utc=1578430045',
1920        'ro.odm.build.fingerprint='
1921        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1922        'ro.product.odm.device=coral',
1923    ]
1924    input_file = self._BuildZipFile({
1925        'ODM/etc/build.prop': '\n'.join(build_prop),
1926    })
1927
1928    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
1929      placeholder_values = {
1930          'ro.boot.product.device_name': ['std', 'pro']
1931      }
1932      partition_props = common.PartitionBuildProps.FromInputFile(
1933          input_zip, 'odm', placeholder_values)
1934
1935    self.assertEqual({
1936        'ro.odm.build.date.utc': '1578430045',
1937        'ro.odm.build.fingerprint':
1938        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1939        'ro.product.odm.device': 'coral',
1940    }, partition_props.build_props)
1941
1942    self.assertEqual(set(), partition_props.prop_overrides)
1943
1944  def test_parseBuildProps_singleImportStatement(self):
1945    build_std_prop = [
1946        'ro.product.odm.device=coral',
1947        'ro.product.odm.name=product1',
1948    ]
1949    build_pro_prop = [
1950        'ro.product.odm.device=coralpro',
1951        'ro.product.odm.name=product2',
1952    ]
1953
1954    input_file = self._BuildZipFile({
1955        'ODM/etc/build.prop': '\n'.join(self.odm_build_prop),
1956        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
1957        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
1958    })
1959
1960    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
1961      placeholder_values = {
1962          'ro.boot.product.device_name': 'std'
1963      }
1964      partition_props = common.PartitionBuildProps.FromInputFile(
1965          input_zip, 'odm', placeholder_values)
1966
1967    self.assertEqual({
1968        'ro.odm.build.date.utc': '1578430045',
1969        'ro.odm.build.fingerprint':
1970        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1971        'ro.product.odm.device': 'coral',
1972        'ro.product.odm.name': 'product1',
1973    }, partition_props.build_props)
1974
1975    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
1976      placeholder_values = {
1977          'ro.boot.product.device_name': 'pro'
1978      }
1979      partition_props = common.PartitionBuildProps.FromInputFile(
1980          input_zip, 'odm', placeholder_values)
1981
1982    self.assertEqual({
1983        'ro.odm.build.date.utc': '1578430045',
1984        'ro.odm.build.fingerprint':
1985        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1986        'ro.product.odm.device': 'coralpro',
1987        'ro.product.odm.name': 'product2',
1988    }, partition_props.build_props)
1989
1990  def test_parseBuildProps_noPlaceHolders(self):
1991    build_prop = copy.copy(self.odm_build_prop)
1992    input_file = self._BuildZipFile({
1993        'ODM/etc/build.prop': '\n'.join(build_prop),
1994    })
1995
1996    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
1997      partition_props = common.PartitionBuildProps.FromInputFile(
1998          input_zip, 'odm')
1999
2000    self.assertEqual({
2001        'ro.odm.build.date.utc': '1578430045',
2002        'ro.odm.build.fingerprint':
2003        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2004        'ro.product.odm.device': 'coral',
2005    }, partition_props.build_props)
2006
2007    self.assertEqual(set(), partition_props.prop_overrides)
2008
2009  def test_parseBuildProps_multipleImportStatements(self):
2010    build_prop = copy.deepcopy(self.odm_build_prop)
2011    build_prop.append(
2012        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2013
2014    build_std_prop = [
2015        'ro.product.odm.device=coral',
2016    ]
2017    build_pro_prop = [
2018        'ro.product.odm.device=coralpro',
2019    ]
2020
2021    product1_prop = [
2022        'ro.product.odm.name=product1',
2023        'ro.product.not_care=not_care',
2024    ]
2025
2026    product2_prop = [
2027        'ro.product.odm.name=product2',
2028        'ro.product.not_care=not_care',
2029    ]
2030
2031    input_file = self._BuildZipFile({
2032        'ODM/etc/build.prop': '\n'.join(build_prop),
2033        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2034        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2035        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2036        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2037    })
2038
2039    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2040      placeholder_values = {
2041          'ro.boot.product.device_name': 'std',
2042          'ro.boot.product.product_name': 'product1',
2043          'ro.boot.product.not_care': 'not_care',
2044      }
2045      partition_props = common.PartitionBuildProps.FromInputFile(
2046          input_zip, 'odm', placeholder_values)
2047
2048    self.assertEqual({
2049        'ro.odm.build.date.utc': '1578430045',
2050        'ro.odm.build.fingerprint':
2051        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2052        'ro.product.odm.device': 'coral',
2053        'ro.product.odm.name': 'product1'
2054    }, partition_props.build_props)
2055
2056    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2057      placeholder_values = {
2058          'ro.boot.product.device_name': 'pro',
2059          'ro.boot.product.product_name': 'product2',
2060          'ro.boot.product.not_care': 'not_care',
2061      }
2062      partition_props = common.PartitionBuildProps.FromInputFile(
2063          input_zip, 'odm', placeholder_values)
2064
2065    self.assertEqual({
2066        'ro.odm.build.date.utc': '1578430045',
2067        'ro.odm.build.fingerprint':
2068        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2069        'ro.product.odm.device': 'coralpro',
2070        'ro.product.odm.name': 'product2'
2071    }, partition_props.build_props)
2072
2073  def test_parseBuildProps_defineAfterOverride(self):
2074    build_prop = copy.deepcopy(self.odm_build_prop)
2075    build_prop.append('ro.product.odm.device=coral')
2076
2077    build_std_prop = [
2078        'ro.product.odm.device=coral',
2079    ]
2080    build_pro_prop = [
2081        'ro.product.odm.device=coralpro',
2082    ]
2083
2084    input_file = self._BuildZipFile({
2085        'ODM/etc/build.prop': '\n'.join(build_prop),
2086        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2087        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2088    })
2089
2090    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2091      placeholder_values = {
2092          'ro.boot.product.device_name': 'std',
2093      }
2094
2095      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2096                        input_zip, 'odm', placeholder_values)
2097
2098  def test_parseBuildProps_duplicateOverride(self):
2099    build_prop = copy.deepcopy(self.odm_build_prop)
2100    build_prop.append(
2101        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2102
2103    build_std_prop = [
2104        'ro.product.odm.device=coral',
2105        'ro.product.odm.name=product1',
2106    ]
2107    build_pro_prop = [
2108        'ro.product.odm.device=coralpro',
2109    ]
2110
2111    product1_prop = [
2112        'ro.product.odm.name=product1',
2113    ]
2114
2115    product2_prop = [
2116        'ro.product.odm.name=product2',
2117    ]
2118
2119    input_file = self._BuildZipFile({
2120        'ODM/etc/build.prop': '\n'.join(build_prop),
2121        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2122        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2123        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2124        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2125    })
2126
2127    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2128      placeholder_values = {
2129          'ro.boot.product.device_name': 'std',
2130          'ro.boot.product.product_name': 'product1',
2131      }
2132      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2133                        input_zip, 'odm', placeholder_values)
2134
2135  def test_partitionBuildProps_fromInputFile_deepcopy(self):
2136    build_prop = [
2137        'ro.odm.build.date.utc=1578430045',
2138        'ro.odm.build.fingerprint='
2139        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2140        'ro.product.odm.device=coral',
2141    ]
2142    input_file = self._BuildZipFile({
2143        'ODM/etc/build.prop': '\n'.join(build_prop),
2144    })
2145
2146    with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip:
2147      placeholder_values = {
2148          'ro.boot.product.device_name': ['std', 'pro']
2149      }
2150      partition_props = common.PartitionBuildProps.FromInputFile(
2151          input_zip, 'odm', placeholder_values)
2152
2153    copied_props = copy.deepcopy(partition_props)
2154    self.assertEqual({
2155        'ro.odm.build.date.utc': '1578430045',
2156        'ro.odm.build.fingerprint':
2157        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2158        'ro.product.odm.device': 'coral',
2159    }, copied_props.build_props)
2160