1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import subprocess
20import tempfile
21import time
22import unittest
23import zipfile
24from hashlib import sha1
25
26import common
27import test_utils
28import validate_target_files
29from images import EmptyImage, DataImage
30from rangelib import RangeSet
31
32
33KiB = 1024
34MiB = 1024 * KiB
35GiB = 1024 * MiB
36
37
38def get_2gb_string():
39  size = int(2 * GiB + 1)
40  block_size = 4 * KiB
41  step_size = 4 * MiB
42  # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
43  for _ in range(0, size, step_size):
44    yield os.urandom(block_size)
45    yield b'\0' * (step_size - block_size)
46
47
48class BuildInfoTest(test_utils.ReleaseToolsTestCase):
49
50  TEST_INFO_DICT = {
51      'build.prop': common.PartitionBuildProps.FromDictionary(
52          'system', {
53              'ro.product.device': 'product-device',
54              'ro.product.name': 'product-name',
55              'ro.build.fingerprint': 'build-fingerprint',
56              'ro.build.foo': 'build-foo'}
57      ),
58      'system.build.prop': common.PartitionBuildProps.FromDictionary(
59          'system', {
60              'ro.product.system.brand': 'product-brand',
61              'ro.product.system.name': 'product-name',
62              'ro.product.system.device': 'product-device',
63              'ro.system.build.version.release': 'version-release',
64              'ro.system.build.id': 'build-id',
65              'ro.system.build.version.incremental': 'version-incremental',
66              'ro.system.build.type': 'build-type',
67              'ro.system.build.tags': 'build-tags',
68              'ro.system.build.foo': 'build-foo'}
69      ),
70      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
71          'vendor', {
72              'ro.product.vendor.brand': 'vendor-product-brand',
73              'ro.product.vendor.name': 'vendor-product-name',
74              'ro.product.vendor.device': 'vendor-product-device',
75              'ro.vendor.build.version.release': 'vendor-version-release',
76              'ro.vendor.build.id': 'vendor-build-id',
77              'ro.vendor.build.version.incremental':
78              'vendor-version-incremental',
79              'ro.vendor.build.type': 'vendor-build-type',
80              'ro.vendor.build.tags': 'vendor-build-tags'}
81      ),
82      'property1': 'value1',
83      'property2': 4096,
84  }
85
86  TEST_INFO_DICT_USES_OEM_PROPS = {
87      'build.prop': common.PartitionBuildProps.FromDictionary(
88          'system', {
89              'ro.product.name': 'product-name',
90              'ro.build.thumbprint': 'build-thumbprint',
91              'ro.build.bar': 'build-bar'}
92      ),
93      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
94          'vendor', {
95              'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
96      ),
97      'property1': 'value1',
98      'property2': 4096,
99      'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
100  }
101
102  TEST_OEM_DICTS = [
103      {
104          'ro.product.brand': 'brand1',
105          'ro.product.device': 'device1',
106      },
107      {
108          'ro.product.brand': 'brand2',
109          'ro.product.device': 'device2',
110      },
111      {
112          'ro.product.brand': 'brand3',
113          'ro.product.device': 'device3',
114      },
115  ]
116
117  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER = {
118      'build.prop': common.PartitionBuildProps.FromDictionary(
119          'system', {
120              'ro.build.fingerprint': 'build-fingerprint',
121              'ro.product.property_source_order':
122                  'product,odm,vendor,system_ext,system'}
123      ),
124      'system.build.prop': common.PartitionBuildProps.FromDictionary(
125          'system', {
126              'ro.product.system.device': 'system-product-device'}
127      ),
128      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
129          'vendor', {
130              'ro.product.vendor.device': 'vendor-product-device'}
131      ),
132  }
133
134  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10 = {
135      'build.prop': common.PartitionBuildProps.FromDictionary(
136          'system', {
137              'ro.build.fingerprint': 'build-fingerprint',
138              'ro.product.property_source_order':
139                  'product,product_services,odm,vendor,system',
140              'ro.build.version.release': '10',
141              'ro.build.version.codename': 'REL'}
142      ),
143      'system.build.prop': common.PartitionBuildProps.FromDictionary(
144          'system', {
145              'ro.product.system.device': 'system-product-device'}
146      ),
147      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
148          'vendor', {
149              'ro.product.vendor.device': 'vendor-product-device'}
150      ),
151  }
152
153  TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9 = {
154      'build.prop': common.PartitionBuildProps.FromDictionary(
155          'system', {
156              'ro.product.device': 'product-device',
157              'ro.build.fingerprint': 'build-fingerprint',
158              'ro.build.version.release': '9',
159              'ro.build.version.codename': 'REL'}
160      ),
161      'system.build.prop': common.PartitionBuildProps.FromDictionary(
162          'system', {
163              'ro.product.system.device': 'system-product-device'}
164      ),
165      'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
166          'vendor', {
167              'ro.product.vendor.device': 'vendor-product-device'}
168      ),
169  }
170
171  def test_init(self):
172    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
173    self.assertEqual('product-device', target_info.device)
174    self.assertEqual('build-fingerprint', target_info.fingerprint)
175    self.assertFalse(target_info.is_ab)
176    self.assertIsNone(target_info.oem_props)
177
178  def test_init_with_oem_props(self):
179    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
180                                   self.TEST_OEM_DICTS)
181    self.assertEqual('device1', target_info.device)
182    self.assertEqual('brand1/product-name/device1:build-thumbprint',
183                     target_info.fingerprint)
184
185    # Swap the order in oem_dicts, which would lead to different BuildInfo.
186    oem_dicts = copy.copy(self.TEST_OEM_DICTS)
187    oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
188    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
189                                   oem_dicts)
190    self.assertEqual('device3', target_info.device)
191    self.assertEqual('brand3/product-name/device3:build-thumbprint',
192                     target_info.fingerprint)
193
194  def test_init_badFingerprint(self):
195    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
196    info_dict['build.prop'].build_props[
197        'ro.build.fingerprint'] = 'bad fingerprint'
198    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
199
200    info_dict['build.prop'].build_props[
201        'ro.build.fingerprint'] = 'bad\x80fingerprint'
202    self.assertRaises(ValueError, common.BuildInfo, info_dict, None)
203
204  def test___getitem__(self):
205    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
206    self.assertEqual('value1', target_info['property1'])
207    self.assertEqual(4096, target_info['property2'])
208    self.assertEqual('build-foo',
209                     target_info['build.prop'].GetProp('ro.build.foo'))
210
211  def test___getitem__with_oem_props(self):
212    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
213                                   self.TEST_OEM_DICTS)
214    self.assertEqual('value1', target_info['property1'])
215    self.assertEqual(4096, target_info['property2'])
216    self.assertIsNone(target_info['build.prop'].GetProp('ro.build.foo'))
217
218  def test___setitem__(self):
219    target_info = common.BuildInfo(copy.deepcopy(self.TEST_INFO_DICT), None)
220    self.assertEqual('value1', target_info['property1'])
221    target_info['property1'] = 'value2'
222    self.assertEqual('value2', target_info['property1'])
223
224    self.assertEqual('build-foo',
225                     target_info['build.prop'].GetProp('ro.build.foo'))
226    target_info['build.prop'].build_props['ro.build.foo'] = 'build-bar'
227    self.assertEqual('build-bar',
228                     target_info['build.prop'].GetProp('ro.build.foo'))
229
230  def test_get(self):
231    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
232    self.assertEqual('value1', target_info.get('property1'))
233    self.assertEqual(4096, target_info.get('property2'))
234    self.assertEqual(4096, target_info.get('property2', 1024))
235    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
236    self.assertEqual('build-foo',
237                     target_info.get('build.prop').GetProp('ro.build.foo'))
238
239  def test_get_with_oem_props(self):
240    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
241                                   self.TEST_OEM_DICTS)
242    self.assertEqual('value1', target_info.get('property1'))
243    self.assertEqual(4096, target_info.get('property2'))
244    self.assertEqual(4096, target_info.get('property2', 1024))
245    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
246    self.assertIsNone(target_info.get('build.prop').GetProp('ro.build.foo'))
247
248  def test_items(self):
249    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
250    items = target_info.items()
251    self.assertIn(('property1', 'value1'), items)
252    self.assertIn(('property2', 4096), items)
253
254  def test_GetBuildProp(self):
255    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
256    self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
257    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
258                      'ro.build.nonexistent')
259
260  def test_GetBuildProp_with_oem_props(self):
261    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
262                                   self.TEST_OEM_DICTS)
263    self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
264    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
265                      'ro.build.nonexistent')
266
267  def test_GetPartitionFingerprint(self):
268    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
269    self.assertEqual(
270        target_info.GetPartitionFingerprint('vendor'),
271        'vendor-product-brand/vendor-product-name/vendor-product-device'
272        ':vendor-version-release/vendor-build-id/vendor-version-incremental'
273        ':vendor-build-type/vendor-build-tags')
274
275  def test_GetPartitionFingerprint_system_other_uses_system(self):
276    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
277    self.assertEqual(
278        target_info.GetPartitionFingerprint('system_other'),
279        target_info.GetPartitionFingerprint('system'))
280
281  def test_GetPartitionFingerprint_uses_fingerprint_prop_if_available(self):
282    info_dict = copy.deepcopy(self.TEST_INFO_DICT)
283    info_dict['vendor.build.prop'].build_props[
284        'ro.vendor.build.fingerprint'] = 'vendor:fingerprint'
285    target_info = common.BuildInfo(info_dict, None)
286    self.assertEqual(
287        target_info.GetPartitionFingerprint('vendor'),
288        'vendor:fingerprint')
289
290  def test_WriteMountOemScript(self):
291    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
292                                   self.TEST_OEM_DICTS)
293    script_writer = test_utils.MockScriptWriter()
294    target_info.WriteMountOemScript(script_writer)
295    self.assertEqual([('Mount', '/oem', None)], script_writer.lines)
296
297  def test_WriteDeviceAssertions(self):
298    target_info = common.BuildInfo(self.TEST_INFO_DICT, None)
299    script_writer = test_utils.MockScriptWriter()
300    target_info.WriteDeviceAssertions(script_writer, False)
301    self.assertEqual([('AssertDevice', 'product-device')], script_writer.lines)
302
303  def test_WriteDeviceAssertions_with_oem_props(self):
304    target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
305                                   self.TEST_OEM_DICTS)
306    script_writer = test_utils.MockScriptWriter()
307    target_info.WriteDeviceAssertions(script_writer, False)
308    self.assertEqual(
309        [
310            ('AssertOemProperty', 'ro.product.device',
311             ['device1', 'device2', 'device3'], False),
312            ('AssertOemProperty', 'ro.product.brand',
313             ['brand1', 'brand2', 'brand3'], False),
314        ],
315        script_writer.lines)
316
317  def test_ResolveRoProductProperty_FromVendor(self):
318    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
319    info = common.BuildInfo(info_dict, None)
320    self.assertEqual('vendor-product-device',
321                     info.GetBuildProp('ro.product.device'))
322
323  def test_ResolveRoProductProperty_FromSystem(self):
324    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
325    del info_dict['vendor.build.prop'].build_props['ro.product.vendor.device']
326    info = common.BuildInfo(info_dict, None)
327    self.assertEqual('system-product-device',
328                     info.GetBuildProp('ro.product.device'))
329
330  def test_ResolveRoProductProperty_InvalidPropertySearchOrder(self):
331    info_dict = copy.deepcopy(self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER)
332    info_dict['build.prop'].build_props[
333        'ro.product.property_source_order'] = 'bad-source'
334    with self.assertRaisesRegexp(common.ExternalError,
335        'Invalid ro.product.property_source_order'):
336      info = common.BuildInfo(info_dict, None)
337      info.GetBuildProp('ro.product.device')
338
339  def test_ResolveRoProductProperty_Android10PropertySearchOrder(self):
340    info_dict = copy.deepcopy(
341        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_10)
342    info = common.BuildInfo(info_dict, None)
343    self.assertEqual('vendor-product-device',
344                     info.GetBuildProp('ro.product.device'))
345
346  def test_ResolveRoProductProperty_Android9PropertySearchOrder(self):
347    info_dict = copy.deepcopy(
348        self.TEST_INFO_DICT_PROPERTY_SOURCE_ORDER_ANDROID_9)
349    info = common.BuildInfo(info_dict, None)
350    self.assertEqual('product-device',
351                     info.GetBuildProp('ro.product.device'))
352
353
354class CommonZipTest(test_utils.ReleaseToolsTestCase):
355
356  def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
357              test_file_name=None, expected_stat=None, expected_mode=0o644,
358              expected_compress_type=zipfile.ZIP_STORED):
359    # Verify the stat if present.
360    if test_file_name is not None:
361      new_stat = os.stat(test_file_name)
362      self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
363      self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
364
365    # Reopen the zip file to verify.
366    zip_file = zipfile.ZipFile(zip_file_name, "r")
367
368    # Verify the timestamp.
369    info = zip_file.getinfo(arcname)
370    self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
371
372    # Verify the file mode.
373    mode = (info.external_attr >> 16) & 0o777
374    self.assertEqual(mode, expected_mode)
375
376    # Verify the compress type.
377    self.assertEqual(info.compress_type, expected_compress_type)
378
379    # Verify the zip contents.
380    entry = zip_file.open(arcname)
381    sha1_hash = sha1()
382    for chunk in iter(lambda: entry.read(4 * MiB), b''):
383      sha1_hash.update(chunk)
384    self.assertEqual(expected_hash, sha1_hash.hexdigest())
385    self.assertIsNone(zip_file.testzip())
386
387  def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
388    extra_zipwrite_args = dict(extra_zipwrite_args or {})
389
390    test_file = tempfile.NamedTemporaryFile(delete=False)
391    test_file_name = test_file.name
392
393    zip_file = tempfile.NamedTemporaryFile(delete=False)
394    zip_file_name = zip_file.name
395
396    # File names within an archive strip the leading slash.
397    arcname = extra_zipwrite_args.get("arcname", test_file_name)
398    if arcname[0] == "/":
399      arcname = arcname[1:]
400
401    zip_file.close()
402    zip_file = zipfile.ZipFile(zip_file_name, "w")
403
404    try:
405      sha1_hash = sha1()
406      for data in contents:
407        sha1_hash.update(bytes(data))
408        test_file.write(bytes(data))
409      test_file.close()
410
411      expected_stat = os.stat(test_file_name)
412      expected_mode = extra_zipwrite_args.get("perms", 0o644)
413      expected_compress_type = extra_zipwrite_args.get("compress_type",
414                                                       zipfile.ZIP_STORED)
415      time.sleep(5)  # Make sure the atime/mtime will change measurably.
416
417      common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
418      common.ZipClose(zip_file)
419
420      self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
421                   test_file_name, expected_stat, expected_mode,
422                   expected_compress_type)
423    finally:
424      os.remove(test_file_name)
425      os.remove(zip_file_name)
426
427  def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
428    extra_args = dict(extra_args or {})
429
430    zip_file = tempfile.NamedTemporaryFile(delete=False)
431    zip_file_name = zip_file.name
432    zip_file.close()
433
434    zip_file = zipfile.ZipFile(zip_file_name, "w")
435
436    try:
437      expected_compress_type = extra_args.get("compress_type",
438                                              zipfile.ZIP_STORED)
439      time.sleep(5)  # Make sure the atime/mtime will change measurably.
440
441      if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
442        arcname = zinfo_or_arcname
443        expected_mode = extra_args.get("perms", 0o644)
444      else:
445        arcname = zinfo_or_arcname.filename
446        if zinfo_or_arcname.external_attr:
447          zinfo_perms = zinfo_or_arcname.external_attr >> 16
448        else:
449          zinfo_perms = 0o600
450        expected_mode = extra_args.get("perms", zinfo_perms)
451
452      common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
453      common.ZipClose(zip_file)
454
455      self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
456                   expected_mode=expected_mode,
457                   expected_compress_type=expected_compress_type)
458    finally:
459      os.remove(zip_file_name)
460
461  def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
462    extra_args = dict(extra_args or {})
463
464    zip_file = tempfile.NamedTemporaryFile(delete=False)
465    zip_file_name = zip_file.name
466
467    test_file = tempfile.NamedTemporaryFile(delete=False)
468    test_file_name = test_file.name
469
470    arcname_large = test_file_name
471    arcname_small = "bar"
472
473    # File names within an archive strip the leading slash.
474    if arcname_large[0] == "/":
475      arcname_large = arcname_large[1:]
476
477    zip_file.close()
478    zip_file = zipfile.ZipFile(zip_file_name, "w")
479
480    try:
481      sha1_hash = sha1()
482      for data in large:
483        sha1_hash.update(data)
484        test_file.write(data)
485      test_file.close()
486
487      expected_stat = os.stat(test_file_name)
488      expected_mode = 0o644
489      expected_compress_type = extra_args.get("compress_type",
490                                              zipfile.ZIP_STORED)
491      time.sleep(5)  # Make sure the atime/mtime will change measurably.
492
493      common.ZipWrite(zip_file, test_file_name, **extra_args)
494      common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
495      common.ZipClose(zip_file)
496
497      # Verify the contents written by ZipWrite().
498      self._verify(zip_file, zip_file_name, arcname_large,
499                   sha1_hash.hexdigest(), test_file_name, expected_stat,
500                   expected_mode, expected_compress_type)
501
502      # Verify the contents written by ZipWriteStr().
503      self._verify(zip_file, zip_file_name, arcname_small,
504                   sha1(small).hexdigest(),
505                   expected_compress_type=expected_compress_type)
506    finally:
507      os.remove(zip_file_name)
508      os.remove(test_file_name)
509
510  def _test_reset_ZIP64_LIMIT(self, func, *args):
511    default_limit = (1 << 31) - 1
512    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
513    func(*args)
514    self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
515
516  def test_ZipWrite(self):
517    file_contents = os.urandom(1024)
518    self._test_ZipWrite(file_contents)
519
520  def test_ZipWrite_with_opts(self):
521    file_contents = os.urandom(1024)
522    self._test_ZipWrite(file_contents, {
523        "arcname": "foobar",
524        "perms": 0o777,
525        "compress_type": zipfile.ZIP_DEFLATED,
526    })
527    self._test_ZipWrite(file_contents, {
528        "arcname": "foobar",
529        "perms": 0o700,
530        "compress_type": zipfile.ZIP_STORED,
531    })
532
533  def test_ZipWrite_large_file(self):
534    file_contents = get_2gb_string()
535    self._test_ZipWrite(file_contents, {
536        "compress_type": zipfile.ZIP_DEFLATED,
537    })
538
539  def test_ZipWrite_resets_ZIP64_LIMIT(self):
540    self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
541
542  def test_ZipWriteStr(self):
543    random_string = os.urandom(1024)
544    # Passing arcname
545    self._test_ZipWriteStr("foo", random_string)
546
547    # Passing zinfo
548    zinfo = zipfile.ZipInfo(filename="foo")
549    self._test_ZipWriteStr(zinfo, random_string)
550
551    # Timestamp in the zinfo should be overwritten.
552    zinfo.date_time = (2015, 3, 1, 15, 30, 0)
553    self._test_ZipWriteStr(zinfo, random_string)
554
555  def test_ZipWriteStr_with_opts(self):
556    random_string = os.urandom(1024)
557    # Passing arcname
558    self._test_ZipWriteStr("foo", random_string, {
559        "perms": 0o700,
560        "compress_type": zipfile.ZIP_DEFLATED,
561    })
562    self._test_ZipWriteStr("bar", random_string, {
563        "compress_type": zipfile.ZIP_STORED,
564    })
565
566    # Passing zinfo
567    zinfo = zipfile.ZipInfo(filename="foo")
568    self._test_ZipWriteStr(zinfo, random_string, {
569        "compress_type": zipfile.ZIP_DEFLATED,
570    })
571    self._test_ZipWriteStr(zinfo, random_string, {
572        "perms": 0o600,
573        "compress_type": zipfile.ZIP_STORED,
574    })
575    self._test_ZipWriteStr(zinfo, random_string, {
576        "perms": 0o000,
577        "compress_type": zipfile.ZIP_STORED,
578    })
579
580  def test_ZipWriteStr_large_file(self):
581    # zipfile.writestr() doesn't work when the str size is over 2GiB even with
582    # the workaround. We will only test the case of writing a string into a
583    # large archive.
584    long_string = get_2gb_string()
585    short_string = os.urandom(1024)
586    self._test_ZipWriteStr_large_file(long_string, short_string, {
587        "compress_type": zipfile.ZIP_DEFLATED,
588    })
589
590  def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
591    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, 'foo', b'')
592    zinfo = zipfile.ZipInfo(filename="foo")
593    self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, b'')
594
595  def test_bug21309935(self):
596    zip_file = tempfile.NamedTemporaryFile(delete=False)
597    zip_file_name = zip_file.name
598    zip_file.close()
599
600    try:
601      random_string = os.urandom(1024)
602      zip_file = zipfile.ZipFile(zip_file_name, "w")
603      # Default perms should be 0o644 when passing the filename.
604      common.ZipWriteStr(zip_file, "foo", random_string)
605      # Honor the specified perms.
606      common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
607      # The perms in zinfo should be untouched.
608      zinfo = zipfile.ZipInfo(filename="baz")
609      zinfo.external_attr = 0o740 << 16
610      common.ZipWriteStr(zip_file, zinfo, random_string)
611      # Explicitly specified perms has the priority.
612      zinfo = zipfile.ZipInfo(filename="qux")
613      zinfo.external_attr = 0o700 << 16
614      common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
615      common.ZipClose(zip_file)
616
617      self._verify(zip_file, zip_file_name, "foo",
618                   sha1(random_string).hexdigest(),
619                   expected_mode=0o644)
620      self._verify(zip_file, zip_file_name, "bar",
621                   sha1(random_string).hexdigest(),
622                   expected_mode=0o755)
623      self._verify(zip_file, zip_file_name, "baz",
624                   sha1(random_string).hexdigest(),
625                   expected_mode=0o740)
626      self._verify(zip_file, zip_file_name, "qux",
627                   sha1(random_string).hexdigest(),
628                   expected_mode=0o400)
629    finally:
630      os.remove(zip_file_name)
631
632  @test_utils.SkipIfExternalToolsUnavailable()
633  def test_ZipDelete(self):
634    zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
635    output_zip = zipfile.ZipFile(zip_file.name, 'w',
636                                 compression=zipfile.ZIP_DEFLATED)
637    with tempfile.NamedTemporaryFile() as entry_file:
638      entry_file.write(os.urandom(1024))
639      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
640      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
641      common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
642      common.ZipClose(output_zip)
643    zip_file.close()
644
645    try:
646      common.ZipDelete(zip_file.name, 'Test2')
647      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
648        entries = check_zip.namelist()
649        self.assertTrue('Test1' in entries)
650        self.assertFalse('Test2' in entries)
651        self.assertTrue('Test3' in entries)
652
653      self.assertRaises(
654          common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
655      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
656        entries = check_zip.namelist()
657        self.assertTrue('Test1' in entries)
658        self.assertFalse('Test2' in entries)
659        self.assertTrue('Test3' in entries)
660
661      common.ZipDelete(zip_file.name, ['Test3'])
662      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
663        entries = check_zip.namelist()
664        self.assertTrue('Test1' in entries)
665        self.assertFalse('Test2' in entries)
666        self.assertFalse('Test3' in entries)
667
668      common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
669      with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
670        entries = check_zip.namelist()
671        self.assertFalse('Test1' in entries)
672        self.assertFalse('Test2' in entries)
673        self.assertFalse('Test3' in entries)
674    finally:
675      os.remove(zip_file.name)
676
677  @staticmethod
678  def _test_UnzipTemp_createZipFile():
679    zip_file = common.MakeTempFile(suffix='.zip')
680    output_zip = zipfile.ZipFile(
681        zip_file, 'w', compression=zipfile.ZIP_DEFLATED)
682    contents = os.urandom(1024)
683    with tempfile.NamedTemporaryFile() as entry_file:
684      entry_file.write(contents)
685      common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
686      common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
687      common.ZipWrite(output_zip, entry_file.name, arcname='Foo3')
688      common.ZipWrite(output_zip, entry_file.name, arcname='Bar4')
689      common.ZipWrite(output_zip, entry_file.name, arcname='Dir5/Baz5')
690      common.ZipClose(output_zip)
691    common.ZipClose(output_zip)
692    return zip_file
693
694  @test_utils.SkipIfExternalToolsUnavailable()
695  def test_UnzipTemp(self):
696    zip_file = self._test_UnzipTemp_createZipFile()
697    unzipped_dir = common.UnzipTemp(zip_file)
698    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
699    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
700    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
701    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
702    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
703
704  @test_utils.SkipIfExternalToolsUnavailable()
705  def test_UnzipTemp_withPatterns(self):
706    zip_file = self._test_UnzipTemp_createZipFile()
707
708    unzipped_dir = common.UnzipTemp(zip_file, ['Test1'])
709    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
710    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
711    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
712    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
713    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
714
715    unzipped_dir = common.UnzipTemp(zip_file, ['Test1', 'Foo3'])
716    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
717    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
718    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
719    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
720    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
721
722    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Foo3*'])
723    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
724    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
725    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
726    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
727    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
728
729    unzipped_dir = common.UnzipTemp(zip_file, ['*Test1', '*Baz*'])
730    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
731    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
732    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
733    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
734    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
735
736  def test_UnzipTemp_withEmptyPatterns(self):
737    zip_file = self._test_UnzipTemp_createZipFile()
738    unzipped_dir = common.UnzipTemp(zip_file, [])
739    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
740    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
741    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
742    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
743    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
744
745  @test_utils.SkipIfExternalToolsUnavailable()
746  def test_UnzipTemp_withPartiallyMatchingPatterns(self):
747    zip_file = self._test_UnzipTemp_createZipFile()
748    unzipped_dir = common.UnzipTemp(zip_file, ['Test*', 'Nonexistent*'])
749    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
750    self.assertTrue(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
751    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
752    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
753    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
754
755  def test_UnzipTemp_withNoMatchingPatterns(self):
756    zip_file = self._test_UnzipTemp_createZipFile()
757    unzipped_dir = common.UnzipTemp(zip_file, ['Foo4', 'Nonexistent*'])
758    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test1')))
759    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Test2')))
760    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Foo3')))
761    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Bar4')))
762    self.assertFalse(os.path.exists(os.path.join(unzipped_dir, 'Dir5/Baz5')))
763
764
765class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
766  """Tests the APK utils related functions."""
767
768  APKCERTS_TXT1 = (
769      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
770      ' private_key="certs/devkey.pk8"\n'
771      'name="Settings.apk"'
772      ' certificate="build/make/target/product/security/platform.x509.pem"'
773      ' private_key="build/make/target/product/security/platform.pk8"\n'
774      'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
775  )
776
777  APKCERTS_CERTMAP1 = {
778      'RecoveryLocalizer.apk' : 'certs/devkey',
779      'Settings.apk' : 'build/make/target/product/security/platform',
780      'TV.apk' : 'PRESIGNED',
781  }
782
783  APKCERTS_TXT2 = (
784      'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
785      ' private_key="certs/compressed1.pk8" compressed="gz"\n'
786      'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
787      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
788      'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
789      ' private_key="certs/compressed2.pk8" compressed="gz"\n'
790      'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
791      ' private_key="certs/compressed3.pk8" compressed="gz"\n'
792  )
793
794  APKCERTS_CERTMAP2 = {
795      'Compressed1.apk' : 'certs/compressed1',
796      'Compressed2a.apk' : 'certs/compressed2',
797      'Compressed2b.apk' : 'certs/compressed2',
798      'Compressed3.apk' : 'certs/compressed3',
799  }
800
801  APKCERTS_TXT3 = (
802      'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
803      ' private_key="certs/compressed4.pk8" compressed="xz"\n'
804  )
805
806  APKCERTS_CERTMAP3 = {
807      'Compressed4.apk' : 'certs/compressed4',
808  }
809
810  # Test parsing with no optional fields, both optional fields, and only the
811  # partition optional field.
812  APKCERTS_TXT4 = (
813      'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
814      ' private_key="certs/devkey.pk8"\n'
815      'name="Settings.apk"'
816      ' certificate="build/make/target/product/security/platform.x509.pem"'
817      ' private_key="build/make/target/product/security/platform.pk8"'
818      ' compressed="gz" partition="system"\n'
819      'name="TV.apk" certificate="PRESIGNED" private_key=""'
820      ' partition="product"\n'
821  )
822
823  APKCERTS_CERTMAP4 = {
824      'RecoveryLocalizer.apk' : 'certs/devkey',
825      'Settings.apk' : 'build/make/target/product/security/platform',
826      'TV.apk' : 'PRESIGNED',
827  }
828
829  def setUp(self):
830    self.testdata_dir = test_utils.get_testdata_dir()
831
832  @staticmethod
833  def _write_apkcerts_txt(apkcerts_txt, additional=None):
834    if additional is None:
835      additional = []
836    target_files = common.MakeTempFile(suffix='.zip')
837    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
838      target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
839      for entry in additional:
840        target_files_zip.writestr(entry, '')
841    return target_files
842
843  def test_ReadApkCerts_NoncompressedApks(self):
844    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
845    with zipfile.ZipFile(target_files, 'r') as input_zip:
846      certmap, ext = common.ReadApkCerts(input_zip)
847
848    self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
849    self.assertIsNone(ext)
850
851  def test_ReadApkCerts_CompressedApks(self):
852    # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
853    # not stored in '.gz' format, so it shouldn't be considered as installed.
854    target_files = self._write_apkcerts_txt(
855        self.APKCERTS_TXT2,
856        ['Compressed1.apk.gz', 'Compressed3.apk'])
857
858    with zipfile.ZipFile(target_files, 'r') as input_zip:
859      certmap, ext = common.ReadApkCerts(input_zip)
860
861    self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
862    self.assertEqual('.gz', ext)
863
864    # Alternative case with '.xz'.
865    target_files = self._write_apkcerts_txt(
866        self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
867
868    with zipfile.ZipFile(target_files, 'r') as input_zip:
869      certmap, ext = common.ReadApkCerts(input_zip)
870
871    self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
872    self.assertEqual('.xz', ext)
873
874  def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
875    target_files = self._write_apkcerts_txt(
876        self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
877        ['Compressed1.apk.gz', 'Compressed3.apk'])
878
879    with zipfile.ZipFile(target_files, 'r') as input_zip:
880      certmap, ext = common.ReadApkCerts(input_zip)
881
882    certmap_merged = self.APKCERTS_CERTMAP1.copy()
883    certmap_merged.update(self.APKCERTS_CERTMAP2)
884    self.assertDictEqual(certmap_merged, certmap)
885    self.assertEqual('.gz', ext)
886
887  def test_ReadApkCerts_MultipleCompressionMethods(self):
888    target_files = self._write_apkcerts_txt(
889        self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
890        ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
891
892    with zipfile.ZipFile(target_files, 'r') as input_zip:
893      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
894
895  def test_ReadApkCerts_MismatchingKeys(self):
896    malformed_apkcerts_txt = (
897        'name="App1.apk" certificate="certs/cert1.x509.pem"'
898        ' private_key="certs/cert2.pk8"\n'
899    )
900    target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
901
902    with zipfile.ZipFile(target_files, 'r') as input_zip:
903      self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
904
905  def test_ReadApkCerts_WithWithoutOptionalFields(self):
906    target_files = self._write_apkcerts_txt(self.APKCERTS_TXT4)
907    with zipfile.ZipFile(target_files, 'r') as input_zip:
908      certmap, ext = common.ReadApkCerts(input_zip)
909
910    self.assertDictEqual(self.APKCERTS_CERTMAP4, certmap)
911    self.assertIsNone(ext)
912
913  def test_ExtractPublicKey(self):
914    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
915    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
916    with open(pubkey) as pubkey_fp:
917      self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
918
919  def test_ExtractPublicKey_invalidInput(self):
920    wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
921    self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
922
923  @test_utils.SkipIfExternalToolsUnavailable()
924  def test_ExtractAvbPublicKey(self):
925    privkey = os.path.join(self.testdata_dir, 'testkey.key')
926    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
927    extracted_from_privkey = common.ExtractAvbPublicKey('avbtool', privkey)
928    extracted_from_pubkey = common.ExtractAvbPublicKey('avbtool', pubkey)
929    with open(extracted_from_privkey, 'rb') as privkey_fp, \
930        open(extracted_from_pubkey, 'rb') as pubkey_fp:
931      self.assertEqual(privkey_fp.read(), pubkey_fp.read())
932
933  def test_ParseCertificate(self):
934    cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
935
936    cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
937    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
938                      universal_newlines=False)
939    expected, _ = proc.communicate()
940    self.assertEqual(0, proc.returncode)
941
942    with open(cert) as cert_fp:
943      actual = common.ParseCertificate(cert_fp.read())
944    self.assertEqual(expected, actual)
945
946  @test_utils.SkipIfExternalToolsUnavailable()
947  def test_GetMinSdkVersion(self):
948    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
949    self.assertEqual('24', common.GetMinSdkVersion(test_app))
950
951  @test_utils.SkipIfExternalToolsUnavailable()
952  def test_GetMinSdkVersion_invalidInput(self):
953    self.assertRaises(
954        common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
955
956  @test_utils.SkipIfExternalToolsUnavailable()
957  def test_GetMinSdkVersionInt(self):
958    test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
959    self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
960
961  @test_utils.SkipIfExternalToolsUnavailable()
962  def test_GetMinSdkVersionInt_invalidInput(self):
963    self.assertRaises(
964        common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
965        {})
966
967
968class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
969
970  def setUp(self):
971    self.testdata_dir = test_utils.get_testdata_dir()
972
973  @test_utils.SkipIfExternalToolsUnavailable()
974  def test_GetSparseImage_emptyBlockMapFile(self):
975    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
976    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
977      target_files_zip.write(
978          test_utils.construct_sparse_image([
979              (0xCAC1, 6),
980              (0xCAC3, 3),
981              (0xCAC1, 4)]),
982          arcname='IMAGES/system.img')
983      target_files_zip.writestr('IMAGES/system.map', '')
984      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
985      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
986
987    tempdir = common.UnzipTemp(target_files)
988    with zipfile.ZipFile(target_files, 'r') as input_zip:
989      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
990
991    self.assertDictEqual(
992        {
993            '__COPY': RangeSet("0"),
994            '__NONZERO-0': RangeSet("1-5 9-12"),
995        },
996        sparse_image.file_map)
997
998  def test_GetSparseImage_missingImageFile(self):
999    self.assertRaises(
1000        AssertionError, common.GetSparseImage, 'system2', self.testdata_dir,
1001        None, False)
1002    self.assertRaises(
1003        AssertionError, common.GetSparseImage, 'unknown', self.testdata_dir,
1004        None, False)
1005
1006  @test_utils.SkipIfExternalToolsUnavailable()
1007  def test_GetSparseImage_missingBlockMapFile(self):
1008    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1009    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1010      target_files_zip.write(
1011          test_utils.construct_sparse_image([
1012              (0xCAC1, 6),
1013              (0xCAC3, 3),
1014              (0xCAC1, 4)]),
1015          arcname='IMAGES/system.img')
1016      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
1017      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1018
1019    tempdir = common.UnzipTemp(target_files)
1020    with zipfile.ZipFile(target_files, 'r') as input_zip:
1021      self.assertRaises(
1022          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1023          False)
1024
1025  @test_utils.SkipIfExternalToolsUnavailable()
1026  def test_GetSparseImage_sharedBlocks_notAllowed(self):
1027    """Tests the case of having overlapping blocks but disallowed."""
1028    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1029    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1030      target_files_zip.write(
1031          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1032          arcname='IMAGES/system.img')
1033      # Block 10 is shared between two files.
1034      target_files_zip.writestr(
1035          'IMAGES/system.map',
1036          '\n'.join([
1037              '/system/file1 1-5 9-10',
1038              '/system/file2 10-12']))
1039      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1040      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1041
1042    tempdir = common.UnzipTemp(target_files)
1043    with zipfile.ZipFile(target_files, 'r') as input_zip:
1044      self.assertRaises(
1045          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1046          False)
1047
1048  @test_utils.SkipIfExternalToolsUnavailable()
1049  def test_GetSparseImage_sharedBlocks_allowed(self):
1050    """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
1051    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1052    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1053      # Construct an image with a care_map of "0-5 9-12".
1054      target_files_zip.write(
1055          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1056          arcname='IMAGES/system.img')
1057      # Block 10 is shared between two files.
1058      target_files_zip.writestr(
1059          'IMAGES/system.map',
1060          '\n'.join([
1061              '/system/file1 1-5 9-10',
1062              '/system/file2 10-12']))
1063      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1064      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1065
1066    tempdir = common.UnzipTemp(target_files)
1067    with zipfile.ZipFile(target_files, 'r') as input_zip:
1068      sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
1069
1070    self.assertDictEqual(
1071        {
1072            '__COPY': RangeSet("0"),
1073            '__NONZERO-0': RangeSet("6-8 13-15"),
1074            '/system/file1': RangeSet("1-5 9-10"),
1075            '/system/file2': RangeSet("11-12"),
1076        },
1077        sparse_image.file_map)
1078
1079    # '/system/file2' should be marked with 'uses_shared_blocks', but not with
1080    # 'incomplete'.
1081    self.assertTrue(
1082        sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
1083    self.assertNotIn(
1084        'incomplete', sparse_image.file_map['/system/file2'].extra)
1085
1086    # '/system/file1' will only contain one field -- a copy of the input text.
1087    self.assertEqual(1, len(sparse_image.file_map['/system/file1'].extra))
1088
1089    # Meta entries should not have any extra tag.
1090    self.assertFalse(sparse_image.file_map['__COPY'].extra)
1091    self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
1092
1093  @test_utils.SkipIfExternalToolsUnavailable()
1094  def test_GetSparseImage_incompleteRanges(self):
1095    """Tests the case of ext4 images with holes."""
1096    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1097    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1098      target_files_zip.write(
1099          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1100          arcname='IMAGES/system.img')
1101      target_files_zip.writestr(
1102          'IMAGES/system.map',
1103          '\n'.join([
1104              '/system/file1 1-5 9-10',
1105              '/system/file2 11-12']))
1106      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1107      # '/system/file2' has less blocks listed (2) than actual (3).
1108      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1109
1110    tempdir = common.UnzipTemp(target_files)
1111    with zipfile.ZipFile(target_files, 'r') as input_zip:
1112      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1113
1114    self.assertEqual(
1115        '1-5 9-10',
1116        sparse_image.file_map['/system/file1'].extra['text_str'])
1117    self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
1118
1119  @test_utils.SkipIfExternalToolsUnavailable()
1120  def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
1121    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1122    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1123      target_files_zip.write(
1124          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1125          arcname='IMAGES/system.img')
1126      target_files_zip.writestr(
1127          'IMAGES/system.map',
1128          '\n'.join([
1129              '//system/file1 1-5 9-10',
1130              '//system/file2 11-12',
1131              '/system/app/file3 13-15']))
1132      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1133      # '/system/file2' has less blocks listed (2) than actual (3).
1134      target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
1135      # '/system/app/file3' has less blocks listed (3) than actual (4).
1136      target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
1137
1138    tempdir = common.UnzipTemp(target_files)
1139    with zipfile.ZipFile(target_files, 'r') as input_zip:
1140      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1141
1142    self.assertEqual(
1143        '1-5 9-10',
1144        sparse_image.file_map['//system/file1'].extra['text_str'])
1145    self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
1146    self.assertTrue(
1147        sparse_image.file_map['/system/app/file3'].extra['incomplete'])
1148
1149  @test_utils.SkipIfExternalToolsUnavailable()
1150  def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
1151    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1152    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1153      target_files_zip.write(
1154          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1155          arcname='IMAGES/system.img')
1156      target_files_zip.writestr(
1157          'IMAGES/system.map',
1158          '\n'.join([
1159              '//system/file1 1-5 9-10',
1160              '//init.rc 13-15']))
1161      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1162      # '/init.rc' has less blocks listed (3) than actual (4).
1163      target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
1164
1165    tempdir = common.UnzipTemp(target_files)
1166    with zipfile.ZipFile(target_files, 'r') as input_zip:
1167      sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
1168
1169    self.assertEqual(
1170        '1-5 9-10',
1171        sparse_image.file_map['//system/file1'].extra['text_str'])
1172    self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
1173
1174  @test_utils.SkipIfExternalToolsUnavailable()
1175  def test_GetSparseImage_fileNotFound(self):
1176    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1177    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1178      target_files_zip.write(
1179          test_utils.construct_sparse_image([(0xCAC2, 16)]),
1180          arcname='IMAGES/system.img')
1181      target_files_zip.writestr(
1182          'IMAGES/system.map',
1183          '\n'.join([
1184              '//system/file1 1-5 9-10',
1185              '//system/file2 11-12']))
1186      target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
1187
1188    tempdir = common.UnzipTemp(target_files)
1189    with zipfile.ZipFile(target_files, 'r') as input_zip:
1190      self.assertRaises(
1191          AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
1192          False)
1193
1194  @test_utils.SkipIfExternalToolsUnavailable()
1195  def test_GetAvbChainedPartitionArg(self):
1196    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1197    info_dict = {
1198        'avb_avbtool': 'avbtool',
1199        'avb_system_key_path': pubkey,
1200        'avb_system_rollback_index_location': 2,
1201    }
1202    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
1203    self.assertEqual(3, len(args))
1204    self.assertEqual('system', args[0])
1205    self.assertEqual('2', args[1])
1206    self.assertTrue(os.path.exists(args[2]))
1207
1208  @test_utils.SkipIfExternalToolsUnavailable()
1209  def test_GetAvbChainedPartitionArg_withPrivateKey(self):
1210    key = os.path.join(self.testdata_dir, 'testkey.key')
1211    info_dict = {
1212        'avb_avbtool': 'avbtool',
1213        'avb_product_key_path': key,
1214        'avb_product_rollback_index_location': 2,
1215    }
1216    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
1217    self.assertEqual(3, len(args))
1218    self.assertEqual('product', args[0])
1219    self.assertEqual('2', args[1])
1220    self.assertTrue(os.path.exists(args[2]))
1221
1222  @test_utils.SkipIfExternalToolsUnavailable()
1223  def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
1224    info_dict = {
1225        'avb_avbtool': 'avbtool',
1226        'avb_system_key_path': 'does-not-exist',
1227        'avb_system_rollback_index_location': 2,
1228    }
1229    pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
1230    args = common.GetAvbChainedPartitionArg(
1231        'system', info_dict, pubkey).split(':')
1232    self.assertEqual(3, len(args))
1233    self.assertEqual('system', args[0])
1234    self.assertEqual('2', args[1])
1235    self.assertTrue(os.path.exists(args[2]))
1236
1237  @test_utils.SkipIfExternalToolsUnavailable()
1238  def test_GetAvbChainedPartitionArg_invalidKey(self):
1239    pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
1240    info_dict = {
1241        'avb_avbtool': 'avbtool',
1242        'avb_system_key_path': pubkey,
1243        'avb_system_rollback_index_location': 2,
1244    }
1245    self.assertRaises(
1246        common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
1247        info_dict)
1248
1249  INFO_DICT_DEFAULT = {
1250      'recovery_api_version': 3,
1251      'fstab_version': 2,
1252      'system_root_image': 'true',
1253      'no_recovery' : 'true',
1254      'recovery_as_boot': 'true',
1255  }
1256
1257  def test_LoadListFromFile(self):
1258    file_path = os.path.join(self.testdata_dir,
1259                             'merge_config_framework_item_list')
1260    contents = common.LoadListFromFile(file_path)
1261    expected_contents = [
1262        'META/apkcerts.txt',
1263        'META/filesystem_config.txt',
1264        'META/root_filesystem_config.txt',
1265        'META/system_manifest.xml',
1266        'META/system_matrix.xml',
1267        'META/update_engine_config.txt',
1268        'PRODUCT/*',
1269        'ROOT/*',
1270        'SYSTEM/*',
1271    ]
1272    self.assertEqual(sorted(contents), sorted(expected_contents))
1273
1274  @staticmethod
1275  def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
1276    target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1277    with zipfile.ZipFile(target_files, 'w') as target_files_zip:
1278      info_values = ''.join(
1279          ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())])
1280      common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
1281
1282      FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
1283      if info_dict.get('system_root_image') == 'true':
1284        fstab_values = FSTAB_TEMPLATE.format('/')
1285      else:
1286        fstab_values = FSTAB_TEMPLATE.format('/system')
1287      common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
1288
1289      common.ZipWriteStr(
1290          target_files_zip, 'META/file_contexts', 'file-contexts')
1291    return target_files
1292
1293  def test_LoadInfoDict(self):
1294    target_files = self._test_LoadInfoDict_createTargetFiles(
1295        self.INFO_DICT_DEFAULT,
1296        'BOOT/RAMDISK/system/etc/recovery.fstab')
1297    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1298      loaded_dict = common.LoadInfoDict(target_files_zip)
1299      self.assertEqual(3, loaded_dict['recovery_api_version'])
1300      self.assertEqual(2, loaded_dict['fstab_version'])
1301      self.assertIn('/', loaded_dict['fstab'])
1302      self.assertIn('/system', loaded_dict['fstab'])
1303
1304  def test_LoadInfoDict_legacyRecoveryFstabPath(self):
1305    target_files = self._test_LoadInfoDict_createTargetFiles(
1306        self.INFO_DICT_DEFAULT,
1307        'BOOT/RAMDISK/etc/recovery.fstab')
1308    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1309      loaded_dict = common.LoadInfoDict(target_files_zip)
1310      self.assertEqual(3, loaded_dict['recovery_api_version'])
1311      self.assertEqual(2, loaded_dict['fstab_version'])
1312      self.assertIn('/', loaded_dict['fstab'])
1313      self.assertIn('/system', loaded_dict['fstab'])
1314
1315  @test_utils.SkipIfExternalToolsUnavailable()
1316  def test_LoadInfoDict_dirInput(self):
1317    target_files = self._test_LoadInfoDict_createTargetFiles(
1318        self.INFO_DICT_DEFAULT,
1319        'BOOT/RAMDISK/system/etc/recovery.fstab')
1320    unzipped = common.UnzipTemp(target_files)
1321    loaded_dict = common.LoadInfoDict(unzipped)
1322    self.assertEqual(3, loaded_dict['recovery_api_version'])
1323    self.assertEqual(2, loaded_dict['fstab_version'])
1324    self.assertIn('/', loaded_dict['fstab'])
1325    self.assertIn('/system', loaded_dict['fstab'])
1326
1327  @test_utils.SkipIfExternalToolsUnavailable()
1328  def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
1329    target_files = self._test_LoadInfoDict_createTargetFiles(
1330        self.INFO_DICT_DEFAULT,
1331        'BOOT/RAMDISK/system/etc/recovery.fstab')
1332    unzipped = common.UnzipTemp(target_files)
1333    loaded_dict = common.LoadInfoDict(unzipped)
1334    self.assertEqual(3, loaded_dict['recovery_api_version'])
1335    self.assertEqual(2, loaded_dict['fstab_version'])
1336    self.assertIn('/', loaded_dict['fstab'])
1337    self.assertIn('/system', loaded_dict['fstab'])
1338
1339  def test_LoadInfoDict_systemRootImageFalse(self):
1340    # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
1341    # launched prior to P will likely have this config.
1342    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1343    del info_dict['no_recovery']
1344    del info_dict['system_root_image']
1345    del info_dict['recovery_as_boot']
1346    target_files = self._test_LoadInfoDict_createTargetFiles(
1347        info_dict,
1348        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1349    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1350      loaded_dict = common.LoadInfoDict(target_files_zip)
1351      self.assertEqual(3, loaded_dict['recovery_api_version'])
1352      self.assertEqual(2, loaded_dict['fstab_version'])
1353      self.assertNotIn('/', loaded_dict['fstab'])
1354      self.assertIn('/system', loaded_dict['fstab'])
1355
1356  def test_LoadInfoDict_recoveryAsBootFalse(self):
1357    # Devices using system-as-root, but with standalone recovery image. Non-A/B
1358    # devices launched since P will likely have this config.
1359    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1360    del info_dict['no_recovery']
1361    del info_dict['recovery_as_boot']
1362    target_files = self._test_LoadInfoDict_createTargetFiles(
1363        info_dict,
1364        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1365    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1366      loaded_dict = common.LoadInfoDict(target_files_zip)
1367      self.assertEqual(3, loaded_dict['recovery_api_version'])
1368      self.assertEqual(2, loaded_dict['fstab_version'])
1369      self.assertIn('/', loaded_dict['fstab'])
1370      self.assertIn('/system', loaded_dict['fstab'])
1371
1372  def test_LoadInfoDict_noRecoveryTrue(self):
1373    # Device doesn't have a recovery partition at all.
1374    info_dict = copy.copy(self.INFO_DICT_DEFAULT)
1375    del info_dict['recovery_as_boot']
1376    target_files = self._test_LoadInfoDict_createTargetFiles(
1377        info_dict,
1378        'RECOVERY/RAMDISK/system/etc/recovery.fstab')
1379    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1380      loaded_dict = common.LoadInfoDict(target_files_zip)
1381      self.assertEqual(3, loaded_dict['recovery_api_version'])
1382      self.assertEqual(2, loaded_dict['fstab_version'])
1383      self.assertIsNone(loaded_dict['fstab'])
1384
1385  @test_utils.SkipIfExternalToolsUnavailable()
1386  def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
1387    target_files = self._test_LoadInfoDict_createTargetFiles(
1388        self.INFO_DICT_DEFAULT,
1389        'BOOT/RAMDISK/system/etc/recovery.fstab')
1390    common.ZipDelete(target_files, 'META/misc_info.txt')
1391    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1392      self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
1393
1394  @test_utils.SkipIfExternalToolsUnavailable()
1395  def test_LoadInfoDict_repacking(self):
1396    target_files = self._test_LoadInfoDict_createTargetFiles(
1397        self.INFO_DICT_DEFAULT,
1398        'BOOT/RAMDISK/system/etc/recovery.fstab')
1399    unzipped = common.UnzipTemp(target_files)
1400    loaded_dict = common.LoadInfoDict(unzipped, True)
1401    self.assertEqual(3, loaded_dict['recovery_api_version'])
1402    self.assertEqual(2, loaded_dict['fstab_version'])
1403    self.assertIn('/', loaded_dict['fstab'])
1404    self.assertIn('/system', loaded_dict['fstab'])
1405    self.assertEqual(
1406        os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
1407    self.assertEqual(
1408        os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
1409        loaded_dict['root_fs_config'])
1410
1411  def test_LoadInfoDict_repackingWithZipFileInput(self):
1412    target_files = self._test_LoadInfoDict_createTargetFiles(
1413        self.INFO_DICT_DEFAULT,
1414        'BOOT/RAMDISK/system/etc/recovery.fstab')
1415    with zipfile.ZipFile(target_files, 'r') as target_files_zip:
1416      self.assertRaises(
1417          AssertionError, common.LoadInfoDict, target_files_zip, True)
1418
1419  def test_MergeDynamicPartitionInfoDicts_ReturnsMergedDict(self):
1420    framework_dict = {
1421        'super_partition_groups': 'group_a',
1422        'dynamic_partition_list': 'system',
1423        'super_group_a_partition_list': 'system',
1424    }
1425    vendor_dict = {
1426        'super_partition_groups': 'group_a group_b',
1427        'dynamic_partition_list': 'vendor product',
1428        'super_group_a_partition_list': 'vendor',
1429        'super_group_a_group_size': '1000',
1430        'super_group_b_partition_list': 'product',
1431        'super_group_b_group_size': '2000',
1432    }
1433    merged_dict = common.MergeDynamicPartitionInfoDicts(
1434        framework_dict=framework_dict,
1435        vendor_dict=vendor_dict)
1436    expected_merged_dict = {
1437        'super_partition_groups': 'group_a group_b',
1438        'dynamic_partition_list': 'system vendor product',
1439        'super_group_a_partition_list': 'system vendor',
1440        'super_group_a_group_size': '1000',
1441        'super_group_b_partition_list': 'product',
1442        'super_group_b_group_size': '2000',
1443    }
1444    self.assertEqual(merged_dict, expected_merged_dict)
1445
1446  def test_MergeDynamicPartitionInfoDicts_IgnoringFrameworkGroupSize(self):
1447    framework_dict = {
1448        'super_partition_groups': 'group_a',
1449        'dynamic_partition_list': 'system',
1450        'super_group_a_partition_list': 'system',
1451        'super_group_a_group_size': '5000',
1452    }
1453    vendor_dict = {
1454        'super_partition_groups': 'group_a group_b',
1455        'dynamic_partition_list': 'vendor product',
1456        'super_group_a_partition_list': 'vendor',
1457        'super_group_a_group_size': '1000',
1458        'super_group_b_partition_list': 'product',
1459        'super_group_b_group_size': '2000',
1460    }
1461    merged_dict = common.MergeDynamicPartitionInfoDicts(
1462        framework_dict=framework_dict,
1463        vendor_dict=vendor_dict)
1464    expected_merged_dict = {
1465        'super_partition_groups': 'group_a group_b',
1466        'dynamic_partition_list': 'system vendor product',
1467        'super_group_a_partition_list': 'system vendor',
1468        'super_group_a_group_size': '1000',
1469        'super_group_b_partition_list': 'product',
1470        'super_group_b_group_size': '2000',
1471    }
1472    self.assertEqual(merged_dict, expected_merged_dict)
1473
1474  def test_GetAvbPartitionArg(self):
1475    info_dict = {}
1476    cmd = common.GetAvbPartitionArg('system', '/path/to/system.img', info_dict)
1477    self.assertEqual(
1478        ['--include_descriptors_from_image', '/path/to/system.img'], cmd)
1479
1480  @test_utils.SkipIfExternalToolsUnavailable()
1481  def test_AppendVBMetaArgsForPartition_vendorAsChainedPartition(self):
1482    testdata_dir = test_utils.get_testdata_dir()
1483    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1484    info_dict = {
1485        'avb_avbtool': 'avbtool',
1486        'avb_vendor_key_path': pubkey,
1487        'avb_vendor_rollback_index_location': 5,
1488    }
1489    cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
1490    self.assertEqual(2, len(cmd))
1491    self.assertEqual('--chain_partition', cmd[0])
1492    chained_partition_args = cmd[1].split(':')
1493    self.assertEqual(3, len(chained_partition_args))
1494    self.assertEqual('vendor', chained_partition_args[0])
1495    self.assertEqual('5', chained_partition_args[1])
1496    self.assertTrue(os.path.exists(chained_partition_args[2]))
1497
1498  @test_utils.SkipIfExternalToolsUnavailable()
1499  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
1500    testdata_dir = test_utils.get_testdata_dir()
1501    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1502    info_dict = {
1503        'avb_avbtool': 'avbtool',
1504        'avb_recovery_key_path': pubkey,
1505        'avb_recovery_rollback_index_location': 3,
1506    }
1507    cmd = common.GetAvbPartitionArg(
1508        'recovery', '/path/to/recovery.img', info_dict)
1509    self.assertFalse(cmd)
1510
1511  @test_utils.SkipIfExternalToolsUnavailable()
1512  def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_ab(self):
1513    testdata_dir = test_utils.get_testdata_dir()
1514    pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
1515    info_dict = {
1516        'ab_update': 'true',
1517        'avb_avbtool': 'avbtool',
1518        'avb_recovery_key_path': pubkey,
1519        'avb_recovery_rollback_index_location': 3,
1520    }
1521    cmd = common.GetAvbPartitionArg(
1522        'recovery', '/path/to/recovery.img', info_dict)
1523    self.assertEqual(2, len(cmd))
1524    self.assertEqual('--chain_partition', cmd[0])
1525    chained_partition_args = cmd[1].split(':')
1526    self.assertEqual(3, len(chained_partition_args))
1527    self.assertEqual('recovery', chained_partition_args[0])
1528    self.assertEqual('3', chained_partition_args[1])
1529    self.assertTrue(os.path.exists(chained_partition_args[2]))
1530
1531  def test_BuildVBMeta_appendAftlCommandSyntax(self):
1532    testdata_dir = test_utils.get_testdata_dir()
1533    common.OPTIONS.info_dict = {
1534        'ab_update': 'true',
1535        'avb_avbtool': 'avbtool',
1536        'build.prop': common.PartitionBuildProps.FromDictionary(
1537            'system', {
1538                'ro.build.version.incremental': '6285659',
1539                'ro.product.device': 'coral',
1540                'ro.build.fingerprint':
1541                'google/coral/coral:R/RP1A.200311.002/'
1542                '6285659:userdebug/dev-keys'}
1543        ),
1544    }
1545    common.OPTIONS.aftl_tool_path = 'aftltool'
1546    common.OPTIONS.aftl_server = 'log.endpoints.aftl-dev.cloud.goog:9000'
1547    common.OPTIONS.aftl_key_path = os.path.join(testdata_dir,
1548                                                'test_transparency_key.pub')
1549    common.OPTIONS.aftl_manufacturer_key_path = os.path.join(
1550        testdata_dir, 'test_aftl_rsa4096.pem')
1551
1552    vbmeta_image = tempfile.NamedTemporaryFile(delete=False)
1553    cmd = common.ConstructAftlMakeImageCommands(vbmeta_image.name)
1554    expected_cmd = [
1555        'aftltool', 'make_icp_from_vbmeta',
1556        '--vbmeta_image_path', 'place_holder',
1557        '--output', vbmeta_image.name,
1558        '--version_incremental', '6285659',
1559        '--transparency_log_servers',
1560        'log.endpoints.aftl-dev.cloud.goog:9000,{}'.format(
1561            common.OPTIONS.aftl_key_path),
1562        '--manufacturer_key', common.OPTIONS.aftl_manufacturer_key_path,
1563        '--algorithm', 'SHA256_RSA4096',
1564        '--padding', '4096']
1565
1566    # ignore the place holder, i.e. path to a temp file
1567    self.assertEqual(cmd[:3], expected_cmd[:3])
1568    self.assertEqual(cmd[4:], expected_cmd[4:])
1569
1570  @unittest.skip("enable after we have a server for public")
1571  def test_BuildVBMeta_appendAftlContactServer(self):
1572    testdata_dir = test_utils.get_testdata_dir()
1573    common.OPTIONS.info_dict = {
1574        'ab_update': 'true',
1575        'avb_avbtool': 'avbtool',
1576        'build.prop': common.PartitionBuildProps.FromDictionary(
1577            'system', {
1578                'ro.build.version.incremental': '6285659',
1579                'ro.product.device': 'coral',
1580                'ro.build.fingerprint':
1581                'google/coral/coral:R/RP1A.200311.002/'
1582                '6285659:userdebug/dev-keys'}
1583        )
1584    }
1585    common.OPTIONS.aftl_tool_path = "aftltool"
1586    common.OPTIONS.aftl_server = "log.endpoints.aftl-dev.cloud.goog:9000"
1587    common.OPTIONS.aftl_key_path = os.path.join(testdata_dir,
1588                                                'test_transparency_key.pub')
1589    common.OPTIONS.aftl_manufacturer_key_path = os.path.join(
1590        testdata_dir, 'test_aftl_rsa4096.pem')
1591
1592    input_dir = common.MakeTempDir()
1593    system_image = common.MakeTempFile()
1594    build_image_cmd = ['mkuserimg_mke2fs', input_dir, system_image, 'ext4',
1595                       '/system', str(4096 * 100), '-j', '0', '-s']
1596    common.RunAndCheckOutput(build_image_cmd)
1597
1598    add_footer_cmd = ['avbtool', 'add_hashtree_footer',
1599                      '--partition_size', str(4096 * 150),
1600                      '--partition_name', 'system',
1601                      '--image', system_image]
1602    common.RunAndCheckOutput(add_footer_cmd)
1603
1604    vbmeta_image = common.MakeTempFile()
1605    common.BuildVBMeta(vbmeta_image, {'system': system_image}, 'vbmeta',
1606                       ['system'])
1607
1608    verify_cmd = ['aftltool', 'verify_image_icp', '--vbmeta_image_path',
1609                  vbmeta_image, '--transparency_log_pub_keys',
1610                  common.OPTIONS.aftl_key_path]
1611    common.RunAndCheckOutput(verify_cmd)
1612
1613
1614class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
1615  """Checks the format of install-recovery.sh.
1616
1617  Its format should match between common.py and validate_target_files.py.
1618  """
1619
1620  def setUp(self):
1621    self._tempdir = common.MakeTempDir()
1622    # Create a fake dict that contains the fstab info for boot&recovery.
1623    self._info = {"fstab" : {}}
1624    fake_fstab = [
1625        "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
1626        "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
1627    self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab)
1628    # Construct the gzipped recovery.img and boot.img
1629    self.recovery_data = bytearray([
1630        0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
1631        0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
1632        0x08, 0x00, 0x00, 0x00
1633    ])
1634    # echo -n "boot" | gzip -f | hd
1635    self.boot_data = bytearray([
1636        0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
1637        0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
1638    ])
1639
1640  def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
1641    loc = os.path.join(self._tempdir, prefix, name)
1642    if not os.path.exists(os.path.dirname(loc)):
1643      os.makedirs(os.path.dirname(loc))
1644    with open(loc, "wb") as f:
1645      f.write(data)
1646
1647  def test_full_recovery(self):
1648    recovery_image = common.File("recovery.img", self.recovery_data)
1649    boot_image = common.File("boot.img", self.boot_data)
1650    self._info["full_recovery_image"] = "true"
1651
1652    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1653                             recovery_image, boot_image, self._info)
1654    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1655                                                        self._info)
1656
1657  @test_utils.SkipIfExternalToolsUnavailable()
1658  def test_recovery_from_boot(self):
1659    recovery_image = common.File("recovery.img", self.recovery_data)
1660    self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
1661    boot_image = common.File("boot.img", self.boot_data)
1662    self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
1663
1664    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1665                             recovery_image, boot_image, self._info)
1666    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1667                                                        self._info)
1668    # Validate 'recovery-from-boot' with bonus argument.
1669    self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
1670    common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
1671                             recovery_image, boot_image, self._info)
1672    validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
1673                                                        self._info)
1674
1675
1676class MockBlockDifference(object):
1677
1678  def __init__(self, partition, tgt, src=None):
1679    self.partition = partition
1680    self.tgt = tgt
1681    self.src = src
1682
1683  def WriteScript(self, script, _, progress=None,
1684                  write_verify_script=False):
1685    if progress:
1686      script.AppendExtra("progress({})".format(progress))
1687    script.AppendExtra("patch({});".format(self.partition))
1688    if write_verify_script:
1689      self.WritePostInstallVerifyScript(script)
1690
1691  def WritePostInstallVerifyScript(self, script):
1692    script.AppendExtra("verify({});".format(self.partition))
1693
1694
1695class FakeSparseImage(object):
1696
1697  def __init__(self, size):
1698    self.blocksize = 4096
1699    self.total_blocks = size // 4096
1700    assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1701
1702
1703class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1704
1705  @staticmethod
1706  def get_op_list(output_path):
1707    with zipfile.ZipFile(output_path) as output_zip:
1708      with output_zip.open('dynamic_partitions_op_list') as op_list:
1709        return [line.decode().strip() for line in op_list.readlines()
1710                if not line.startswith(b'#')]
1711
1712  def setUp(self):
1713    self.script = test_utils.MockScriptWriter()
1714    self.output_path = common.MakeTempFile(suffix='.zip')
1715
1716  def test_full(self):
1717    target_info = common.LoadDictionaryFromLines("""
1718dynamic_partition_list=system vendor
1719super_partition_groups=group_foo
1720super_group_foo_group_size={group_size}
1721super_group_foo_partition_list=system vendor
1722""".format(group_size=4 * GiB).split("\n"))
1723    block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1724                   MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1725
1726    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1727    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1728      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1729
1730    self.assertEqual(str(self.script).strip(), """
1731assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1732patch(system);
1733verify(system);
1734unmap_partition("system");
1735patch(vendor);
1736verify(vendor);
1737unmap_partition("vendor");
1738""".strip())
1739
1740    lines = self.get_op_list(self.output_path)
1741
1742    remove_all_groups = lines.index("remove_all_groups")
1743    add_group = lines.index("add_group group_foo 4294967296")
1744    add_vendor = lines.index("add vendor group_foo")
1745    add_system = lines.index("add system group_foo")
1746    resize_vendor = lines.index("resize vendor 1073741824")
1747    resize_system = lines.index("resize system 3221225472")
1748
1749    self.assertLess(remove_all_groups, add_group,
1750                    "Should add groups after removing all groups")
1751    self.assertLess(add_group, min(add_vendor, add_system),
1752                    "Should add partitions after adding group")
1753    self.assertLess(add_system, resize_system,
1754                    "Should resize system after adding it")
1755    self.assertLess(add_vendor, resize_vendor,
1756                    "Should resize vendor after adding it")
1757
1758  def test_inc_groups(self):
1759    source_info = common.LoadDictionaryFromLines("""
1760super_partition_groups=group_foo group_bar group_baz
1761super_group_foo_group_size={group_foo_size}
1762super_group_bar_group_size={group_bar_size}
1763""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1764    target_info = common.LoadDictionaryFromLines("""
1765super_partition_groups=group_foo group_baz group_qux
1766super_group_foo_group_size={group_foo_size}
1767super_group_baz_group_size={group_baz_size}
1768super_group_qux_group_size={group_qux_size}
1769""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1770           group_qux_size=1 * GiB).split("\n"))
1771
1772    dp_diff = common.DynamicPartitionsDifference(target_info,
1773                                                 block_diffs=[],
1774                                                 source_info_dict=source_info)
1775    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1776      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1777
1778    lines = self.get_op_list(self.output_path)
1779
1780    removed = lines.index("remove_group group_bar")
1781    shrunk = lines.index("resize_group group_foo 3221225472")
1782    grown = lines.index("resize_group group_baz 4294967296")
1783    added = lines.index("add_group group_qux 1073741824")
1784
1785    self.assertLess(max(removed, shrunk),
1786                    min(grown, added),
1787                    "ops that remove / shrink partitions must precede ops that "
1788                    "grow / add partitions")
1789
1790  def test_incremental(self):
1791    source_info = common.LoadDictionaryFromLines("""
1792dynamic_partition_list=system vendor product system_ext
1793super_partition_groups=group_foo
1794super_group_foo_group_size={group_foo_size}
1795super_group_foo_partition_list=system vendor product system_ext
1796""".format(group_foo_size=4 * GiB).split("\n"))
1797    target_info = common.LoadDictionaryFromLines("""
1798dynamic_partition_list=system vendor product odm
1799super_partition_groups=group_foo group_bar
1800super_group_foo_group_size={group_foo_size}
1801super_group_foo_partition_list=system vendor odm
1802super_group_bar_group_size={group_bar_size}
1803super_group_bar_partition_list=product
1804""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1805
1806    block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1807                                       src=FakeSparseImage(1024 * MiB)),
1808                   MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1809                                       src=FakeSparseImage(1024 * MiB)),
1810                   MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1811                                       src=FakeSparseImage(1024 * MiB)),
1812                   MockBlockDifference("system_ext", None,
1813                                       src=FakeSparseImage(1024 * MiB)),
1814                   MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1815                                       src=None)]
1816
1817    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1818                                                 source_info_dict=source_info)
1819    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1820      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1821
1822    metadata_idx = self.script.lines.index(
1823        'assert(update_dynamic_partitions(package_extract_file('
1824        '"dynamic_partitions_op_list")));')
1825    self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1826    self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1827    for p in ("product", "system", "odm"):
1828      patch_idx = self.script.lines.index("patch({});".format(p))
1829      verify_idx = self.script.lines.index("verify({});".format(p))
1830      self.assertLess(metadata_idx, patch_idx,
1831                      "Should patch {} after updating metadata".format(p))
1832      self.assertLess(patch_idx, verify_idx,
1833                      "Should verify {} after patching".format(p))
1834
1835    self.assertNotIn("patch(system_ext);", self.script.lines)
1836
1837    lines = self.get_op_list(self.output_path)
1838
1839    remove = lines.index("remove system_ext")
1840    move_product_out = lines.index("move product default")
1841    shrink = lines.index("resize vendor 536870912")
1842    shrink_group = lines.index("resize_group group_foo 3221225472")
1843    add_group_bar = lines.index("add_group group_bar 1073741824")
1844    add_odm = lines.index("add odm group_foo")
1845    grow_existing = lines.index("resize system 1610612736")
1846    grow_added = lines.index("resize odm 1073741824")
1847    move_product_in = lines.index("move product group_bar")
1848
1849    max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1850    min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1851
1852    self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1853                    "Must shrink group after partitions inside group are shrunk"
1854                    " / removed")
1855
1856    self.assertLess(add_group_bar, move_product_in,
1857                    "Must add partitions to group after group is added")
1858
1859    self.assertLess(max_idx_move_partition_out_foo,
1860                    min_idx_move_partition_in_foo,
1861                    "Must shrink partitions / remove partitions from group"
1862                    "before adding / moving partitions into group")
1863
1864  def test_remove_partition(self):
1865    source_info = common.LoadDictionaryFromLines("""
1866blockimgdiff_versions=3,4
1867use_dynamic_partitions=true
1868dynamic_partition_list=foo
1869super_partition_groups=group_foo
1870super_group_foo_group_size={group_foo_size}
1871super_group_foo_partition_list=foo
1872""".format(group_foo_size=4 * GiB).split("\n"))
1873    target_info = common.LoadDictionaryFromLines("""
1874blockimgdiff_versions=3,4
1875use_dynamic_partitions=true
1876super_partition_groups=group_foo
1877super_group_foo_group_size={group_foo_size}
1878""".format(group_foo_size=4 * GiB).split("\n"))
1879
1880    common.OPTIONS.info_dict = target_info
1881    common.OPTIONS.target_info_dict = target_info
1882    common.OPTIONS.source_info_dict = source_info
1883    common.OPTIONS.cache_size = 4 * 4096
1884
1885    block_diffs = [common.BlockDifference("foo", EmptyImage(),
1886                                          src=DataImage("source", pad=True))]
1887
1888    dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1889                                                 source_info_dict=source_info)
1890    with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1891      dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1892
1893    self.assertNotIn("block_image_update", str(self.script),
1894                     "Removed partition should not be patched.")
1895
1896    lines = self.get_op_list(self.output_path)
1897    self.assertEqual(lines, ["remove foo"])
1898
1899
1900class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase):
1901  def setUp(self):
1902    self.odm_build_prop = [
1903        'ro.odm.build.date.utc=1578430045',
1904        'ro.odm.build.fingerprint='
1905        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1906        'ro.product.odm.device=coral',
1907        'import /odm/etc/build_${ro.boot.product.device_name}.prop',
1908    ]
1909
1910  @staticmethod
1911  def _BuildZipFile(entries):
1912    input_file = common.MakeTempFile(prefix='target_files-', suffix='.zip')
1913    with zipfile.ZipFile(input_file, 'w') as input_zip:
1914      for name, content in entries.items():
1915        input_zip.writestr(name, content)
1916
1917    return input_file
1918
1919  def test_parseBuildProps_noImportStatement(self):
1920    build_prop = [
1921        'ro.odm.build.date.utc=1578430045',
1922        'ro.odm.build.fingerprint='
1923        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1924        'ro.product.odm.device=coral',
1925    ]
1926    input_file = self._BuildZipFile({
1927        'ODM/etc/build.prop': '\n'.join(build_prop),
1928    })
1929
1930    with zipfile.ZipFile(input_file, 'r') as input_zip:
1931      placeholder_values = {
1932          'ro.boot.product.device_name': ['std', 'pro']
1933      }
1934      partition_props = common.PartitionBuildProps.FromInputFile(
1935          input_zip, 'odm', placeholder_values)
1936
1937    self.assertEqual({
1938        'ro.odm.build.date.utc': '1578430045',
1939        'ro.odm.build.fingerprint':
1940        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1941        'ro.product.odm.device': 'coral',
1942    }, partition_props.build_props)
1943
1944    self.assertEqual(set(), partition_props.prop_overrides)
1945
1946  def test_parseBuildProps_singleImportStatement(self):
1947    build_std_prop = [
1948        'ro.product.odm.device=coral',
1949        'ro.product.odm.name=product1',
1950    ]
1951    build_pro_prop = [
1952        'ro.product.odm.device=coralpro',
1953        'ro.product.odm.name=product2',
1954    ]
1955
1956    input_file = self._BuildZipFile({
1957        'ODM/etc/build.prop': '\n'.join(self.odm_build_prop),
1958        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
1959        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
1960    })
1961
1962    with zipfile.ZipFile(input_file, 'r') as input_zip:
1963      placeholder_values = {
1964          'ro.boot.product.device_name': 'std'
1965      }
1966      partition_props = common.PartitionBuildProps.FromInputFile(
1967          input_zip, 'odm', placeholder_values)
1968
1969    self.assertEqual({
1970      'ro.odm.build.date.utc': '1578430045',
1971      'ro.odm.build.fingerprint':
1972      'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1973      'ro.product.odm.device': 'coral',
1974      'ro.product.odm.name': 'product1',
1975    }, partition_props.build_props)
1976
1977    with zipfile.ZipFile(input_file, 'r') as input_zip:
1978      placeholder_values = {
1979          'ro.boot.product.device_name': 'pro'
1980      }
1981      partition_props = common.PartitionBuildProps.FromInputFile(
1982          input_zip, 'odm', placeholder_values)
1983
1984    self.assertEqual({
1985        'ro.odm.build.date.utc': '1578430045',
1986        'ro.odm.build.fingerprint':
1987        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
1988        'ro.product.odm.device': 'coralpro',
1989        'ro.product.odm.name': 'product2',
1990    }, partition_props.build_props)
1991
1992  def test_parseBuildProps_noPlaceHolders(self):
1993    build_prop = copy.copy(self.odm_build_prop)
1994    input_file = self._BuildZipFile({
1995        'ODM/etc/build.prop': '\n'.join(build_prop),
1996    })
1997
1998    with zipfile.ZipFile(input_file, 'r') as input_zip:
1999      partition_props = common.PartitionBuildProps.FromInputFile(
2000          input_zip, 'odm')
2001
2002    self.assertEqual({
2003        'ro.odm.build.date.utc': '1578430045',
2004        'ro.odm.build.fingerprint':
2005        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2006        'ro.product.odm.device': 'coral',
2007    }, partition_props.build_props)
2008
2009    self.assertEqual(set(), partition_props.prop_overrides)
2010
2011  def test_parseBuildProps_multipleImportStatements(self):
2012    build_prop = copy.deepcopy(self.odm_build_prop)
2013    build_prop.append(
2014        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2015
2016    build_std_prop = [
2017        'ro.product.odm.device=coral',
2018    ]
2019    build_pro_prop = [
2020        'ro.product.odm.device=coralpro',
2021    ]
2022
2023    product1_prop = [
2024        'ro.product.odm.name=product1',
2025        'ro.product.not_care=not_care',
2026    ]
2027
2028    product2_prop = [
2029        'ro.product.odm.name=product2',
2030        'ro.product.not_care=not_care',
2031    ]
2032
2033    input_file = self._BuildZipFile({
2034        'ODM/etc/build.prop': '\n'.join(build_prop),
2035        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2036        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2037        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2038        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2039    })
2040
2041    with zipfile.ZipFile(input_file, 'r') as input_zip:
2042      placeholder_values = {
2043          'ro.boot.product.device_name': 'std',
2044          'ro.boot.product.product_name': 'product1',
2045          'ro.boot.product.not_care': 'not_care',
2046      }
2047      partition_props = common.PartitionBuildProps.FromInputFile(
2048          input_zip, 'odm', placeholder_values)
2049
2050    self.assertEqual({
2051        'ro.odm.build.date.utc': '1578430045',
2052        'ro.odm.build.fingerprint':
2053        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2054        'ro.product.odm.device': 'coral',
2055        'ro.product.odm.name': 'product1'
2056    }, partition_props.build_props)
2057
2058    with zipfile.ZipFile(input_file, 'r') as input_zip:
2059      placeholder_values = {
2060          'ro.boot.product.device_name': 'pro',
2061          'ro.boot.product.product_name': 'product2',
2062          'ro.boot.product.not_care': 'not_care',
2063      }
2064      partition_props = common.PartitionBuildProps.FromInputFile(
2065          input_zip, 'odm', placeholder_values)
2066
2067    self.assertEqual({
2068        'ro.odm.build.date.utc': '1578430045',
2069        'ro.odm.build.fingerprint':
2070        'google/coral/coral:10/RP1A.200325.001/6337676:user/dev-keys',
2071        'ro.product.odm.device': 'coralpro',
2072        'ro.product.odm.name': 'product2'
2073    }, partition_props.build_props)
2074
2075  def test_parseBuildProps_defineAfterOverride(self):
2076    build_prop = copy.deepcopy(self.odm_build_prop)
2077    build_prop.append('ro.product.odm.device=coral')
2078
2079    build_std_prop = [
2080        'ro.product.odm.device=coral',
2081    ]
2082    build_pro_prop = [
2083        'ro.product.odm.device=coralpro',
2084    ]
2085
2086    input_file = self._BuildZipFile({
2087        'ODM/etc/build.prop': '\n'.join(build_prop),
2088        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2089        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2090    })
2091
2092    with zipfile.ZipFile(input_file, 'r') as input_zip:
2093      placeholder_values = {
2094          'ro.boot.product.device_name': 'std',
2095      }
2096
2097      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2098                        input_zip, 'odm', placeholder_values)
2099
2100  def test_parseBuildProps_duplicateOverride(self):
2101    build_prop = copy.deepcopy(self.odm_build_prop)
2102    build_prop.append(
2103        'import /odm/etc/build_${ro.boot.product.product_name}.prop')
2104
2105    build_std_prop = [
2106        'ro.product.odm.device=coral',
2107        'ro.product.odm.name=product1',
2108    ]
2109    build_pro_prop = [
2110        'ro.product.odm.device=coralpro',
2111    ]
2112
2113    product1_prop = [
2114        'ro.product.odm.name=product1',
2115    ]
2116
2117    product2_prop = [
2118        'ro.product.odm.name=product2',
2119    ]
2120
2121    input_file = self._BuildZipFile({
2122        'ODM/etc/build.prop': '\n'.join(build_prop),
2123        'ODM/etc/build_std.prop': '\n'.join(build_std_prop),
2124        'ODM/etc/build_pro.prop': '\n'.join(build_pro_prop),
2125        'ODM/etc/build_product1.prop': '\n'.join(product1_prop),
2126        'ODM/etc/build_product2.prop': '\n'.join(product2_prop),
2127    })
2128
2129    with zipfile.ZipFile(input_file, 'r') as input_zip:
2130      placeholder_values = {
2131          'ro.boot.product.device_name': 'std',
2132          'ro.boot.product.product_name': 'product1',
2133      }
2134      self.assertRaises(ValueError, common.PartitionBuildProps.FromInputFile,
2135                        input_zip, 'odm', placeholder_values)
2136