1# Copyright 2019 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""OtaTools class.""" 15 16import logging 17import os 18import stat 19import subprocess 20import tempfile 21 22from six import b 23 24 25from acloud import errors 26from acloud.internal import constants 27from acloud.internal.lib import utils 28 29logger = logging.getLogger(__name__) 30 31_BIN_DIR_NAME = "bin" 32_LPMAKE = "lpmake" 33_BUILD_SUPER_IMAGE = "build_super_image" 34_AVBTOOL = "avbtool" 35_SGDISK = "sgdisk" 36_SIMG2IMG = "simg2img" 37_MK_COMBINED_IMG = "mk_combined_img" 38 39_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30 40_AVBTOOL_TIMEOUT_SECS = 30 41_MK_COMBINED_IMG_TIMEOUT_SECS = 180 42 43_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` " 44 "in build environment, or set --local-tool to an " 45 "extracted otatools.zip.") 46 47 48def FindOtaTools(search_paths): 49 """Find OTA tools in the search paths and in build environment. 50 51 Args: 52 search_paths: List of paths, the directories to search for OTA tools. 53 54 Returns: 55 The directory containing OTA tools. 56 57 Raises: 58 errors.CheckPathError if OTA tools are not found. 59 """ 60 for search_path in search_paths: 61 if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME, 62 _BUILD_SUPER_IMAGE)): 63 return search_path 64 65 host_out_dir = os.environ.get(constants.ENV_ANDROID_HOST_OUT) 66 if (host_out_dir and 67 os.path.isfile(os.path.join(host_out_dir, _BIN_DIR_NAME, 68 _BUILD_SUPER_IMAGE))): 69 return host_out_dir 70 71 raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG % 72 {"tool_name": "OTA tool directory"}) 73 74 75class OtaTools: 76 """The class that executes OTA tool commands.""" 77 78 def __init__(self, ota_tools_dir): 79 self._ota_tools_dir = os.path.abspath(ota_tools_dir) 80 81 def _GetBinary(self, name): 82 """Get an executable file from _ota_tools_dir. 83 84 Args: 85 name: String, the file name. 86 87 Returns: 88 String, the absolute path. 89 90 Raises: 91 errors.NoExecuteCmd if the file does not exist. 92 """ 93 path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name) 94 if not os.path.isfile(path): 95 raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG % 96 {"tool_name": name}) 97 mode = os.stat(path).st_mode 98 os.chmod(path, mode | (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | 99 stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)) 100 return path 101 102 @staticmethod 103 def _ExecuteCommand(*command, **popen_args): 104 """Execute a command and log the output. 105 106 This method waits for the process to terminate. It kills the process 107 if it's interrupted due to timeout. 108 109 Args: 110 command: Strings, the command. 111 popen_kwargs: The arguments to be passed to subprocess.Popen. 112 113 Raises: 114 errors.SubprocessFail if the process returns non-zero. 115 """ 116 proc = None 117 try: 118 logger.info("Execute %s", command) 119 popen_args["stdin"] = subprocess.PIPE 120 popen_args["stdout"] = subprocess.PIPE 121 popen_args["stderr"] = subprocess.PIPE 122 123 # Some OTA tools are Python scripts in different versions. The 124 # PYTHONPATH for acloud may be incompatible with the tools. 125 if "env" not in popen_args and "PYTHONPATH" in os.environ: 126 popen_env = os.environ.copy() 127 del popen_env["PYTHONPATH"] 128 popen_args["env"] = popen_env 129 130 proc = subprocess.Popen(command, **popen_args) 131 stdout, stderr = proc.communicate() 132 logger.info("%s stdout: %s", command[0], stdout) 133 logger.info("%s stderr: %s", command[0], stderr) 134 135 if proc.returncode != 0: 136 raise errors.SubprocessFail("%s returned %d." % 137 (command[0], proc.returncode)) 138 finally: 139 if proc and proc.poll() is None: 140 logger.info("Kill %s", command[0]) 141 proc.kill() 142 143 @staticmethod 144 def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image): 145 """Rewrite lpmake and image paths in misc_info.txt. 146 147 Misc info consists of multiple lines of <key>=<value>. 148 Sample input_file: 149 lpmake=lpmake 150 dynamic_partition_list= system system_ext product vendor 151 152 Sample output_file: 153 lpmake=/path/to/lpmake 154 dynamic_partition_list= system system_ext product vendor 155 system_image=/path/to/system.img 156 system_ext_image=/path/to/system_ext.img 157 product_image=/path/to/product.img 158 vendor_image=/path/to/vendor.img 159 160 This method replaces lpmake with the specified path, and sets 161 *_image for every partition in dynamic_partition_list. 162 163 Args: 164 output_file: The output file object. 165 input_file: The input file object. 166 lpmake_path: The path to lpmake binary. 167 get_image: A function that takes the partition name as the 168 parameter and returns the image path. 169 """ 170 partition_names = () 171 for line in input_file: 172 split_line = line.strip().split("=", 1) 173 if len(split_line) < 2: 174 split_line = (split_line[0], "") 175 if split_line[0] == "dynamic_partition_list": 176 partition_names = split_line[1].split() 177 elif split_line[0] == "lpmake": 178 output_file.write(b("lpmake=%s\n" % lpmake_path)) 179 continue 180 elif split_line[0].endswith("_image"): 181 continue 182 output_file.write(b(line)) 183 184 if not partition_names: 185 logger.w("No dynamic partition list in misc info.") 186 187 for partition_name in partition_names: 188 output_file.write(b("%s_image=%s\n" % 189 (partition_name, get_image(partition_name)))) 190 191 @utils.TimeExecute(function_description="Build super image") 192 @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS) 193 def BuildSuperImage(self, output_path, misc_info_path, get_image): 194 """Use build_super_image to create a super image. 195 196 Args: 197 output_path: The path to the output super image. 198 misc_info_path: The path to the misc info that provides parameters 199 to create the super image. 200 get_image: A function that takes the partition name as the 201 parameter and returns the image path. 202 """ 203 build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE) 204 lpmake = self._GetBinary(_LPMAKE) 205 206 new_misc_info_path = None 207 try: 208 with open(misc_info_path, "r") as misc_info: 209 with tempfile.NamedTemporaryFile( 210 prefix="misc_info_", suffix=".txt", 211 delete=False) as new_misc_info: 212 new_misc_info_path = new_misc_info.name 213 self._RewriteMiscInfo(new_misc_info, misc_info, lpmake, 214 get_image) 215 216 self._ExecuteCommand(build_super_image, new_misc_info_path, 217 output_path) 218 finally: 219 if new_misc_info_path: 220 os.remove(new_misc_info_path) 221 222 @utils.TimeExecute(function_description="Make disabled vbmeta image.") 223 @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS) 224 def MakeDisabledVbmetaImage(self, output_path): 225 """Use avbtool to create a vbmeta image with verification disabled. 226 227 Args: 228 output_path: The path to the output vbmeta image. 229 """ 230 avbtool = self._GetBinary(_AVBTOOL) 231 self._ExecuteCommand(avbtool, "make_vbmeta_image", 232 "--flag", "2", 233 "--padding_size", "4096", 234 "--output", output_path) 235 236 @staticmethod 237 def _RewriteSystemQemuConfig(output_file, input_file, get_image): 238 """Rewrite image paths in system-qemu-config.txt. 239 240 Sample input_file: 241 out/target/product/generic_x86_64/vbmeta.img vbmeta 1 242 out/target/product/generic_x86_64/super.img super 2 243 244 Sample output_file: 245 /path/to/vbmeta.img vbmeta 1 246 /path/to/super.img super 2 247 248 This method replaces the first entry of each line with the path 249 returned by get_image. 250 251 Args: 252 output_file: The output file object. 253 input_file: The input file object. 254 get_image: A function that takes the partition name as the 255 parameter and returns the image path. 256 """ 257 for line in input_file: 258 split_line = line.split() 259 if len(split_line) == 3: 260 output_file.write(b("%s %s %s\n" % (get_image(split_line[1]), 261 split_line[1], 262 split_line[2]))) 263 else: 264 output_file.write(b(line)) 265 266 @utils.TimeExecute(function_description="Make combined image") 267 @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS) 268 def MkCombinedImg(self, output_path, system_qemu_config_path, get_image): 269 """Use mk_combined_img to create a disk image. 270 271 Args: 272 output_path: The path to the output disk image. 273 system_qemu_config: The path to the config that provides the 274 parition information on the disk. 275 get_image: A function that takes the partition name as the 276 parameter and returns the image path. 277 """ 278 mk_combined_img = self._GetBinary(_MK_COMBINED_IMG) 279 sgdisk = self._GetBinary(_SGDISK) 280 simg2img = self._GetBinary(_SIMG2IMG) 281 282 new_config_path = None 283 try: 284 with open(system_qemu_config_path, "r") as config: 285 with tempfile.NamedTemporaryFile( 286 prefix="system-qemu-config_", suffix=".txt", 287 delete=False) as new_config: 288 new_config_path = new_config.name 289 self._RewriteSystemQemuConfig(new_config, config, 290 get_image) 291 292 mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} 293 self._ExecuteCommand(mk_combined_img, 294 "-i", new_config_path, 295 "-o", output_path, 296 env=mk_combined_img_env) 297 finally: 298 if new_config_path: 299 os.remove(new_config_path) 300