1#!/usr/bin/env python 2# 3# Copyright (C) 2015 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18"""Unit testing payload_info.py.""" 19 20# Disable check for function names to avoid errors based on old code 21# pylint: disable-msg=invalid-name 22 23from __future__ import absolute_import 24from __future__ import print_function 25 26import sys 27import unittest 28 29from contextlib import contextmanager 30 31from six.moves import StringIO 32 33import mock # pylint: disable=import-error 34 35import payload_info 36import update_payload 37 38from update_payload import update_metadata_pb2 39 40 41class FakePayloadError(Exception): 42 """A generic error when using the FakePayload.""" 43 44 45class FakeOption(object): 46 """Fake options object for testing.""" 47 48 def __init__(self, **kwargs): 49 self.list_ops = False 50 self.stats = False 51 self.signatures = False 52 for key, val in kwargs.items(): 53 setattr(self, key, val) 54 if not hasattr(self, 'payload_file'): 55 self.payload_file = None 56 57 58class FakeOp(object): 59 """Fake manifest operation for testing.""" 60 61 def __init__(self, src_extents, dst_extents, op_type, **kwargs): 62 self.src_extents = src_extents 63 self.dst_extents = dst_extents 64 self.type = op_type 65 for key, val in kwargs.items(): 66 setattr(self, key, val) 67 68 def HasField(self, field): 69 return hasattr(self, field) 70 71 72class FakeExtent(object): 73 """Fake Extent for testing.""" 74 def __init__(self, start_block, num_blocks): 75 self.start_block = start_block 76 self.num_blocks = num_blocks 77 78 79class FakePartitionInfo(object): 80 """Fake PartitionInfo for testing.""" 81 def __init__(self, size): 82 self.size = size 83 84 85class FakePartition(object): 86 """Fake PartitionUpdate field for testing.""" 87 88 def __init__(self, partition_name, operations, old_size, new_size): 89 self.partition_name = partition_name 90 self.operations = operations 91 self.old_partition_info = FakePartitionInfo(old_size) 92 self.new_partition_info = FakePartitionInfo(new_size) 93 94 95class FakeManifest(object): 96 """Fake manifest for testing.""" 97 98 def __init__(self): 99 self.partitions = [ 100 FakePartition(update_payload.common.ROOTFS, 101 [FakeOp([], [FakeExtent(1, 1), FakeExtent(2, 2)], 102 update_payload.common.OpType.REPLACE_BZ, 103 dst_length=3*4096, 104 data_offset=1, 105 data_length=1) 106 ], 1 * 4096, 3 * 4096), 107 FakePartition(update_payload.common.KERNEL, 108 [FakeOp([FakeExtent(1, 1)], 109 [FakeExtent(x, x) for x in range(20)], 110 update_payload.common.OpType.SOURCE_COPY, 111 src_length=4096) 112 ], 2 * 4096, 4 * 4096), 113 ] 114 self.block_size = 4096 115 self.minor_version = 4 116 self.signatures_offset = None 117 self.signatures_size = None 118 119 def HasField(self, field_name): 120 """Fake HasField method based on the python members.""" 121 return hasattr(self, field_name) and getattr(self, field_name) is not None 122 123 124class FakeHeader(object): 125 """Fake payload header for testing.""" 126 127 def __init__(self, manifest_len, metadata_signature_len): 128 self.version = payload_info.MAJOR_PAYLOAD_VERSION_BRILLO 129 self.manifest_len = manifest_len 130 self.metadata_signature_len = metadata_signature_len 131 132 @property 133 def size(self): 134 return 24 135 136 137class FakePayload(object): 138 """Fake payload for testing.""" 139 140 def __init__(self): 141 self._header = FakeHeader(222, 0) 142 self.header = None 143 self._manifest = FakeManifest() 144 self.manifest = None 145 146 self._blobs = {} 147 self._payload_signatures = update_metadata_pb2.Signatures() 148 self._metadata_signatures = update_metadata_pb2.Signatures() 149 150 def Init(self): 151 """Fake Init that sets header and manifest. 152 153 Failing to call Init() will not make header and manifest available to the 154 test. 155 """ 156 self.header = self._header 157 self.manifest = self._manifest 158 159 def ReadDataBlob(self, offset, length): 160 """Return the blob that should be present at the offset location""" 161 if not offset in self._blobs: 162 raise FakePayloadError('Requested blob at unknown offset %d' % offset) 163 blob = self._blobs[offset] 164 if len(blob) != length: 165 raise FakePayloadError('Read blob with the wrong length (expect: %d, ' 166 'actual: %d)' % (len(blob), length)) 167 return blob 168 169 @staticmethod 170 def _AddSignatureToProto(proto, **kwargs): 171 """Add a new Signature element to the passed proto.""" 172 new_signature = proto.signatures.add() 173 for key, val in kwargs.items(): 174 setattr(new_signature, key, val) 175 176 def AddPayloadSignature(self, **kwargs): 177 self._AddSignatureToProto(self._payload_signatures, **kwargs) 178 blob = self._payload_signatures.SerializeToString() 179 self._manifest.signatures_offset = 1234 180 self._manifest.signatures_size = len(blob) 181 self._blobs[self._manifest.signatures_offset] = blob 182 183 def AddMetadataSignature(self, **kwargs): 184 self._AddSignatureToProto(self._metadata_signatures, **kwargs) 185 if self._header.metadata_signature_len: 186 del self._blobs[-self._header.metadata_signature_len] 187 blob = self._metadata_signatures.SerializeToString() 188 self._header.metadata_signature_len = len(blob) 189 self._blobs[-len(blob)] = blob 190 191 192class PayloadCommandTest(unittest.TestCase): 193 """Test class for our PayloadCommand class.""" 194 195 @contextmanager 196 def OutputCapturer(self): 197 """A tool for capturing the sys.stdout""" 198 stdout = sys.stdout 199 try: 200 sys.stdout = StringIO() 201 yield sys.stdout 202 finally: 203 sys.stdout = stdout 204 205 def TestCommand(self, payload_cmd, payload, expected_out): 206 """A tool for testing a payload command. 207 208 It tests that a payload command which runs with a given payload produces a 209 correct output. 210 """ 211 with mock.patch.object(update_payload, 'Payload', return_value=payload), \ 212 self.OutputCapturer() as output: 213 payload_cmd.Run() 214 self.assertEqual(output.getvalue(), expected_out) 215 216 def testDisplayValue(self): 217 """Verify that DisplayValue prints what we expect.""" 218 with self.OutputCapturer() as output: 219 payload_info.DisplayValue('key', 'value') 220 self.assertEqual(output.getvalue(), 'key: value\n') 221 222 def testRun(self): 223 """Verify that Run parses and displays the payload like we expect.""" 224 payload_cmd = payload_info.PayloadCommand(FakeOption(action='show')) 225 payload = FakePayload() 226 expected_out = """Payload version: 2 227Manifest length: 222 228Number of partitions: 2 229 Number of "root" ops: 1 230 Number of "kernel" ops: 1 231Block size: 4096 232Minor version: 4 233""" 234 self.TestCommand(payload_cmd, payload, expected_out) 235 236 def testListOpsOnVersion2(self): 237 """Verify that the --list_ops option gives the correct output.""" 238 payload_cmd = payload_info.PayloadCommand( 239 FakeOption(list_ops=True, action='show')) 240 payload = FakePayload() 241 expected_out = """Payload version: 2 242Manifest length: 222 243Number of partitions: 2 244 Number of "root" ops: 1 245 Number of "kernel" ops: 1 246Block size: 4096 247Minor version: 4 248 249root install operations: 250 0: REPLACE_BZ 251 Data offset: 1 252 Data length: 1 253 Destination: 2 extents (3 blocks) 254 (1,1) (2,2) 255kernel install operations: 256 0: SOURCE_COPY 257 Source: 1 extent (1 block) 258 (1,1) 259 Destination: 20 extents (190 blocks) 260 (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) 261 (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) 262""" 263 self.TestCommand(payload_cmd, payload, expected_out) 264 265 def testStatsOnVersion2(self): 266 """Verify that the --stats option works correctly on version 2.""" 267 payload_cmd = payload_info.PayloadCommand( 268 FakeOption(stats=True, action='show')) 269 payload = FakePayload() 270 expected_out = """Payload version: 2 271Manifest length: 222 272Number of partitions: 2 273 Number of "root" ops: 1 274 Number of "kernel" ops: 1 275Block size: 4096 276Minor version: 4 277Blocks read: 11 278Blocks written: 193 279Seeks when writing: 18 280""" 281 self.TestCommand(payload_cmd, payload, expected_out) 282 283 def testEmptySignatures(self): 284 """Verify that the --signatures option works with unsigned payloads.""" 285 payload_cmd = payload_info.PayloadCommand( 286 FakeOption(action='show', signatures=True)) 287 payload = FakePayload() 288 expected_out = """Payload version: 2 289Manifest length: 222 290Number of partitions: 2 291 Number of "root" ops: 1 292 Number of "kernel" ops: 1 293Block size: 4096 294Minor version: 4 295No metadata signatures stored in the payload 296No payload signatures stored in the payload 297""" 298 self.TestCommand(payload_cmd, payload, expected_out) 299 300 def testSignatures(self): 301 """Verify that the --signatures option shows the present signatures.""" 302 payload_cmd = payload_info.PayloadCommand( 303 FakeOption(action='show', signatures=True)) 304 payload = FakePayload() 305 payload.AddPayloadSignature(version=1, 306 data=b'12345678abcdefgh\x00\x01\x02\x03') 307 payload.AddPayloadSignature(data=b'I am a signature so access is yes.') 308 payload.AddMetadataSignature(data=b'\x00\x0a\x0c') 309 expected_out = """Payload version: 2 310Manifest length: 222 311Number of partitions: 2 312 Number of "root" ops: 1 313 Number of "kernel" ops: 1 314Block size: 4096 315Minor version: 4 316Metadata signatures blob: file_offset=246 (7 bytes) 317Metadata signatures: (1 entries) 318 version=None, hex_data: (3 bytes) 319 00 0a 0c | ... 320Payload signatures blob: blob_offset=1234 (64 bytes) 321Payload signatures: (2 entries) 322 version=1, hex_data: (20 bytes) 323 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh 324 00 01 02 03 | .... 325 version=None, hex_data: (34 bytes) 326 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature 327 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye 328 73 2e | s. 329""" 330 self.TestCommand(payload_cmd, payload, expected_out) 331 332 333if __name__ == '__main__': 334 unittest.main() 335