1#!/usr/bin/env python 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Check dynamic partition sizes. 19 20usage: check_partition_sizes [info.txt] 21 22Check dump-super-partitions-info procedure for expected keys in info.txt. In 23addition, *_image (e.g. system_image, vendor_image, etc.) must be defined for 24each partition in dynamic_partition_list. 25 26Exit code is 0 if successful and non-zero if any failures. 27""" 28 29from __future__ import print_function 30 31import logging 32import sys 33 34import common 35import sparse_img 36 37if sys.hexversion < 0x02070000: 38 print("Python 2.7 or newer is required.", file=sys.stderr) 39 sys.exit(1) 40 41logger = logging.getLogger(__name__) 42 43class Expression(object): 44 def __init__(self, desc, expr, value=None): 45 # Human-readable description 46 self.desc = str(desc) 47 # Numeric expression 48 self.expr = str(expr) 49 # Value of expression 50 self.value = int(expr) if value is None else value 51 52 def CheckLe(self, other, level=logging.ERROR): 53 format_args = (self.desc, other.desc, self.expr, self.value, 54 other.expr, other.value) 55 if self.value <= other.value: 56 logger.info("%s is less than or equal to %s:\n%s == %d <= %s == %d", 57 *format_args) 58 else: 59 msg = "{} is greater than {}:\n{} == {} > {} == {}".format(*format_args) 60 if level == logging.ERROR: 61 raise RuntimeError(msg) 62 else: 63 logger.log(level, msg) 64 65 def CheckEq(self, other): 66 format_args = (self.desc, other.desc, self.expr, self.value, 67 other.expr, other.value) 68 if self.value == other.value: 69 logger.info("%s equals %s:\n%s == %d == %s == %d", *format_args) 70 else: 71 raise RuntimeError("{} does not equal {}:\n{} == {} != {} == {}".format( 72 *format_args)) 73 74 75# A/B feature flags 76class DeviceType(object): 77 NONE = 0 78 AB = 1 79 RVAB = 2 # retrofit Virtual-A/B 80 VAB = 3 81 82 @staticmethod 83 def Get(info_dict): 84 if info_dict.get("ab_update") != "true": 85 return DeviceType.NONE 86 if info_dict.get("virtual_ab_retrofit") == "true": 87 return DeviceType.RVAB 88 if info_dict.get("virtual_ab") == "true": 89 return DeviceType.VAB 90 return DeviceType.AB 91 92 93# Dynamic partition feature flags 94class Dap(object): 95 NONE = 0 96 RDAP = 1 97 DAP = 2 98 99 @staticmethod 100 def Get(info_dict): 101 if info_dict.get("use_dynamic_partitions") != "true": 102 return Dap.NONE 103 if info_dict.get("dynamic_partition_retrofit") == "true": 104 return Dap.RDAP 105 return Dap.DAP 106 107 108class DynamicPartitionSizeChecker(object): 109 def __init__(self, info_dict): 110 if "super_partition_size" in info_dict: 111 if "super_partition_warn_limit" not in info_dict: 112 info_dict["super_partition_warn_limit"] = \ 113 int(info_dict["super_partition_size"]) * 95 // 100 114 if "super_partition_error_limit" not in info_dict: 115 info_dict["super_partition_error_limit"] = \ 116 int(info_dict["super_partition_size"]) 117 self.info_dict = info_dict 118 119 120 def _ReadSizeOfPartition(self, name): 121 # Tests uses *_image_size instead (to avoid creating empty sparse images 122 # on disk) 123 if name + "_image_size" in self.info_dict: 124 return int(self.info_dict[name + "_image_size"]) 125 return sparse_img.GetImagePartitionSize(self.info_dict[name + "_image"]) 126 127 128 # Round result to BOARD_SUPER_PARTITION_ALIGNMENT 129 def _RoundPartitionSize(self, size): 130 alignment = self.info_dict.get("super_partition_alignment") 131 if alignment is None: 132 return size 133 return (size + alignment - 1) // alignment * alignment 134 135 136 def _CheckSuperPartitionSize(self): 137 info_dict = self.info_dict 138 super_block_devices = \ 139 info_dict.get("super_block_devices", "").strip().split() 140 size_list = [int(info_dict.get("super_{}_device_size".format(b), "0")) 141 for b in super_block_devices] 142 sum_size = Expression("sum of super partition block device sizes", 143 "+".join(str(size) for size in size_list), 144 sum(size_list)) 145 super_partition_size = Expression("BOARD_SUPER_PARTITION_SIZE", 146 info_dict["super_partition_size"]) 147 sum_size.CheckEq(super_partition_size) 148 149 def _CheckSumOfPartitionSizes(self, max_size, partition_names, 150 warn_size=None, error_size=None): 151 partition_size_list = [self._RoundPartitionSize( 152 self._ReadSizeOfPartition(p)) for p in partition_names] 153 sum_size = Expression("sum of sizes of {}".format(partition_names), 154 "+".join(str(size) for size in partition_size_list), 155 sum(partition_size_list)) 156 sum_size.CheckLe(max_size) 157 if error_size: 158 sum_size.CheckLe(error_size) 159 if warn_size: 160 sum_size.CheckLe(warn_size, level=logging.WARNING) 161 162 def _NumDeviceTypesInSuper(self): 163 slot = DeviceType.Get(self.info_dict) 164 dap = Dap.Get(self.info_dict) 165 166 if dap == Dap.NONE: 167 raise RuntimeError("check_partition_sizes should only be executed on " 168 "builds with dynamic partitions enabled") 169 170 # Retrofit dynamic partitions: 1 slot per "super", 2 "super"s on the device 171 if dap == Dap.RDAP: 172 if slot != DeviceType.AB: 173 raise RuntimeError("Device with retrofit dynamic partitions must use " 174 "regular (non-Virtual) A/B") 175 return 1 176 177 # Launch DAP: 1 super on the device 178 assert dap == Dap.DAP 179 180 # DAP + A/B: 2 slots in super 181 if slot == DeviceType.AB: 182 return 2 183 184 # DAP + retrofit Virtual A/B: same as A/B 185 if slot == DeviceType.RVAB: 186 return 2 187 188 # DAP + Launch Virtual A/B: 1 *real* slot in super (2 virtual slots) 189 if slot == DeviceType.VAB: 190 return 1 191 192 # DAP + non-A/B: 1 slot in super 193 assert slot == DeviceType.NONE 194 return 1 195 196 def _CheckAllPartitionSizes(self): 197 info_dict = self.info_dict 198 num_slots = self._NumDeviceTypesInSuper() 199 size_limit_suffix = (" / %d" % num_slots) if num_slots > 1 else "" 200 201 # Check sum(all partitions) <= super partition (/ 2 for A/B devices launched 202 # with dynamic partitions) 203 if "super_partition_size" in info_dict and \ 204 "dynamic_partition_list" in info_dict: 205 max_size = Expression( 206 "BOARD_SUPER_PARTITION_SIZE{}".format(size_limit_suffix), 207 int(info_dict["super_partition_size"]) // num_slots) 208 warn_limit = Expression( 209 "BOARD_SUPER_PARTITION_WARN_LIMIT{}".format(size_limit_suffix), 210 int(info_dict["super_partition_warn_limit"]) // num_slots) 211 error_limit = Expression( 212 "BOARD_SUPER_PARTITION_ERROR_LIMIT{}".format(size_limit_suffix), 213 int(info_dict["super_partition_error_limit"]) // num_slots) 214 self._CheckSumOfPartitionSizes( 215 max_size, info_dict["dynamic_partition_list"].strip().split(), 216 warn_limit, error_limit) 217 218 groups = info_dict.get("super_partition_groups", "").strip().split() 219 220 # For each group, check sum(partitions in group) <= group size 221 for group in groups: 222 if "super_{}_group_size".format(group) in info_dict and \ 223 "super_{}_partition_list".format(group) in info_dict: 224 group_size = Expression( 225 "BOARD_{}_SIZE".format(group), 226 int(info_dict["super_{}_group_size".format(group)])) 227 self._CheckSumOfPartitionSizes( 228 group_size, 229 info_dict["super_{}_partition_list".format(group)].strip().split()) 230 231 # Check sum(all group sizes) <= super partition (/ 2 for A/B devices 232 # launched with dynamic partitions) 233 if "super_partition_size" in info_dict: 234 group_size_list = [int(info_dict.get( 235 "super_{}_group_size".format(group), 0)) for group in groups] 236 sum_size = Expression("sum of sizes of {}".format(groups), 237 "+".join(str(size) for size in group_size_list), 238 sum(group_size_list)) 239 max_size = Expression( 240 "BOARD_SUPER_PARTITION_SIZE{}".format(size_limit_suffix), 241 int(info_dict["super_partition_size"]) // num_slots) 242 sum_size.CheckLe(max_size) 243 244 def Run(self): 245 self._CheckAllPartitionSizes() 246 if self.info_dict.get("dynamic_partition_retrofit") == "true": 247 self._CheckSuperPartitionSize() 248 249 250def CheckPartitionSizes(inp): 251 if isinstance(inp, str): 252 info_dict = common.LoadDictionaryFromFile(inp) 253 return DynamicPartitionSizeChecker(info_dict).Run() 254 if isinstance(inp, dict): 255 return DynamicPartitionSizeChecker(inp).Run() 256 raise ValueError("{} is not a dictionary or a valid path".format(inp)) 257 258 259def main(argv): 260 args = common.ParseOptions(argv, __doc__) 261 if len(args) != 1: 262 common.Usage(__doc__) 263 sys.exit(1) 264 common.InitLogging() 265 CheckPartitionSizes(args[0]) 266 267 268if __name__ == "__main__": 269 try: 270 common.CloseInheritedPipes() 271 main(sys.argv[1:]) 272 except common.ExternalError: 273 logger.exception("\n ERROR:\n") 274 sys.exit(1) 275 finally: 276 common.Cleanup() 277