1#!/usr/bin/env python
2#
3# Copyright (C) 2015 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Unit testing payload_info.py."""
19
20# Disable check for function names to avoid errors based on old code
21# pylint: disable-msg=invalid-name
22
23from __future__ import absolute_import
24from __future__ import print_function
25
26import sys
27import unittest
28
29from contextlib import contextmanager
30
31from six.moves import StringIO
32
33import mock  # pylint: disable=import-error
34
35import payload_info
36import update_payload
37
38from update_payload import update_metadata_pb2
39
40
41class FakePayloadError(Exception):
42  """A generic error when using the FakePayload."""
43
44
45class FakeOption(object):
46  """Fake options object for testing."""
47
48  def __init__(self, **kwargs):
49    self.list_ops = False
50    self.stats = False
51    self.signatures = False
52    for key, val in kwargs.items():
53      setattr(self, key, val)
54    if not hasattr(self, 'payload_file'):
55      self.payload_file = None
56
57
58class FakeOp(object):
59  """Fake manifest operation for testing."""
60
61  def __init__(self, src_extents, dst_extents, op_type, **kwargs):
62    self.src_extents = src_extents
63    self.dst_extents = dst_extents
64    self.type = op_type
65    for key, val in kwargs.items():
66      setattr(self, key, val)
67
68  def HasField(self, field):
69    return hasattr(self, field)
70
71
72class FakeExtent(object):
73  """Fake Extent for testing."""
74  def __init__(self, start_block, num_blocks):
75    self.start_block = start_block
76    self.num_blocks = num_blocks
77
78
79class FakePartitionInfo(object):
80  """Fake PartitionInfo for testing."""
81  def __init__(self, size):
82    self.size = size
83
84
85class FakePartition(object):
86  """Fake PartitionUpdate field for testing."""
87
88  def __init__(self, partition_name, operations, old_size, new_size):
89    self.partition_name = partition_name
90    self.operations = operations
91    self.old_partition_info = FakePartitionInfo(old_size)
92    self.new_partition_info = FakePartitionInfo(new_size)
93
94
95class FakeManifest(object):
96  """Fake manifest for testing."""
97
98  def __init__(self):
99    self.partitions = [
100        FakePartition(update_payload.common.ROOTFS,
101                      [FakeOp([], [FakeExtent(1, 1), FakeExtent(2, 2)],
102                              update_payload.common.OpType.REPLACE_BZ,
103                              dst_length=3*4096,
104                              data_offset=1,
105                              data_length=1)
106                      ], 1 * 4096, 3 * 4096),
107        FakePartition(update_payload.common.KERNEL,
108                      [FakeOp([FakeExtent(1, 1)],
109                              [FakeExtent(x, x) for x in range(20)],
110                              update_payload.common.OpType.SOURCE_COPY,
111                              src_length=4096)
112                      ], 2 * 4096, 4 * 4096),
113    ]
114    self.block_size = 4096
115    self.minor_version = 4
116    self.signatures_offset = None
117    self.signatures_size = None
118
119  def HasField(self, field_name):
120    """Fake HasField method based on the python members."""
121    return hasattr(self, field_name) and getattr(self, field_name) is not None
122
123
124class FakeHeader(object):
125  """Fake payload header for testing."""
126
127  def __init__(self, manifest_len, metadata_signature_len):
128    self.version = payload_info.MAJOR_PAYLOAD_VERSION_BRILLO
129    self.manifest_len = manifest_len
130    self.metadata_signature_len = metadata_signature_len
131
132  @property
133  def size(self):
134    return 24
135
136
137class FakePayload(object):
138  """Fake payload for testing."""
139
140  def __init__(self):
141    self._header = FakeHeader(222, 0)
142    self.header = None
143    self._manifest = FakeManifest()
144    self.manifest = None
145
146    self._blobs = {}
147    self._payload_signatures = update_metadata_pb2.Signatures()
148    self._metadata_signatures = update_metadata_pb2.Signatures()
149
150  def Init(self):
151    """Fake Init that sets header and manifest.
152
153    Failing to call Init() will not make header and manifest available to the
154    test.
155    """
156    self.header = self._header
157    self.manifest = self._manifest
158
159  def ReadDataBlob(self, offset, length):
160    """Return the blob that should be present at the offset location"""
161    if not offset in self._blobs:
162      raise FakePayloadError('Requested blob at unknown offset %d' % offset)
163    blob = self._blobs[offset]
164    if len(blob) != length:
165      raise FakePayloadError('Read blob with the wrong length (expect: %d, '
166                             'actual: %d)' % (len(blob), length))
167    return blob
168
169  @staticmethod
170  def _AddSignatureToProto(proto, **kwargs):
171    """Add a new Signature element to the passed proto."""
172    new_signature = proto.signatures.add()
173    for key, val in kwargs.items():
174      setattr(new_signature, key, val)
175
176  def AddPayloadSignature(self, **kwargs):
177    self._AddSignatureToProto(self._payload_signatures, **kwargs)
178    blob = self._payload_signatures.SerializeToString()
179    self._manifest.signatures_offset = 1234
180    self._manifest.signatures_size = len(blob)
181    self._blobs[self._manifest.signatures_offset] = blob
182
183  def AddMetadataSignature(self, **kwargs):
184    self._AddSignatureToProto(self._metadata_signatures, **kwargs)
185    if self._header.metadata_signature_len:
186      del self._blobs[-self._header.metadata_signature_len]
187    blob = self._metadata_signatures.SerializeToString()
188    self._header.metadata_signature_len = len(blob)
189    self._blobs[-len(blob)] = blob
190
191
192class PayloadCommandTest(unittest.TestCase):
193  """Test class for our PayloadCommand class."""
194
195  @contextmanager
196  def OutputCapturer(self):
197    """A tool for capturing the sys.stdout"""
198    stdout = sys.stdout
199    try:
200      sys.stdout = StringIO()
201      yield sys.stdout
202    finally:
203      sys.stdout = stdout
204
205  def TestCommand(self, payload_cmd, payload, expected_out):
206    """A tool for testing a payload command.
207
208    It tests that a payload command which runs with a given payload produces a
209    correct output.
210    """
211    with mock.patch.object(update_payload, 'Payload', return_value=payload), \
212         self.OutputCapturer() as output:
213      payload_cmd.Run()
214    self.assertEqual(output.getvalue(), expected_out)
215
216  def testDisplayValue(self):
217    """Verify that DisplayValue prints what we expect."""
218    with self.OutputCapturer() as output:
219      payload_info.DisplayValue('key', 'value')
220    self.assertEqual(output.getvalue(), 'key:                         value\n')
221
222  def testRun(self):
223    """Verify that Run parses and displays the payload like we expect."""
224    payload_cmd = payload_info.PayloadCommand(FakeOption(action='show'))
225    payload = FakePayload()
226    expected_out = """Payload version:             2
227Manifest length:             222
228Number of partitions:        2
229  Number of "root" ops:      1
230  Number of "kernel" ops:    1
231Block size:                  4096
232Minor version:               4
233"""
234    self.TestCommand(payload_cmd, payload, expected_out)
235
236  def testListOpsOnVersion2(self):
237    """Verify that the --list_ops option gives the correct output."""
238    payload_cmd = payload_info.PayloadCommand(
239        FakeOption(list_ops=True, action='show'))
240    payload = FakePayload()
241    expected_out = """Payload version:             2
242Manifest length:             222
243Number of partitions:        2
244  Number of "root" ops:      1
245  Number of "kernel" ops:    1
246Block size:                  4096
247Minor version:               4
248
249root install operations:
250  0: REPLACE_BZ
251    Data offset: 1
252    Data length: 1
253    Destination: 2 extents (3 blocks)
254      (1,1) (2,2)
255kernel install operations:
256  0: SOURCE_COPY
257    Source: 1 extent (1 block)
258      (1,1)
259    Destination: 20 extents (190 blocks)
260      (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
261      (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
262"""
263    self.TestCommand(payload_cmd, payload, expected_out)
264
265  def testStatsOnVersion2(self):
266    """Verify that the --stats option works correctly on version 2."""
267    payload_cmd = payload_info.PayloadCommand(
268        FakeOption(stats=True, action='show'))
269    payload = FakePayload()
270    expected_out = """Payload version:             2
271Manifest length:             222
272Number of partitions:        2
273  Number of "root" ops:      1
274  Number of "kernel" ops:    1
275Block size:                  4096
276Minor version:               4
277Blocks read:                 11
278Blocks written:              193
279Seeks when writing:          18
280"""
281    self.TestCommand(payload_cmd, payload, expected_out)
282
283  def testEmptySignatures(self):
284    """Verify that the --signatures option works with unsigned payloads."""
285    payload_cmd = payload_info.PayloadCommand(
286        FakeOption(action='show', signatures=True))
287    payload = FakePayload()
288    expected_out = """Payload version:             2
289Manifest length:             222
290Number of partitions:        2
291  Number of "root" ops:      1
292  Number of "kernel" ops:    1
293Block size:                  4096
294Minor version:               4
295No metadata signatures stored in the payload
296No payload signatures stored in the payload
297"""
298    self.TestCommand(payload_cmd, payload, expected_out)
299
300  def testSignatures(self):
301    """Verify that the --signatures option shows the present signatures."""
302    payload_cmd = payload_info.PayloadCommand(
303        FakeOption(action='show', signatures=True))
304    payload = FakePayload()
305    payload.AddPayloadSignature(version=1,
306                                data=b'12345678abcdefgh\x00\x01\x02\x03')
307    payload.AddPayloadSignature(data=b'I am a signature so access is yes.')
308    payload.AddMetadataSignature(data=b'\x00\x0a\x0c')
309    expected_out = """Payload version:             2
310Manifest length:             222
311Number of partitions:        2
312  Number of "root" ops:      1
313  Number of "kernel" ops:    1
314Block size:                  4096
315Minor version:               4
316Metadata signatures blob:    file_offset=246 (7 bytes)
317Metadata signatures: (1 entries)
318  version=None, hex_data: (3 bytes)
319    00 0a 0c                                        | ...
320Payload signatures blob:     blob_offset=1234 (64 bytes)
321Payload signatures: (2 entries)
322  version=1, hex_data: (20 bytes)
323    31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
324    00 01 02 03                                     | ....
325  version=None, hex_data: (34 bytes)
326    49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
327    20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 |  so access is ye
328    73 2e                                           | s.
329"""
330    self.TestCommand(payload_cmd, payload, expected_out)
331
332
333if __name__ == '__main__':
334  unittest.main()
335