1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""OtaTools class."""
15
16import logging
17import os
18import stat
19import subprocess
20import tempfile
21
22from six import b
23
24
25from acloud import errors
26from acloud.internal import constants
27from acloud.internal.lib import utils
28
29logger = logging.getLogger(__name__)
30
31_BIN_DIR_NAME = "bin"
32_LPMAKE = "lpmake"
33_BUILD_SUPER_IMAGE = "build_super_image"
34_AVBTOOL = "avbtool"
35_SGDISK = "sgdisk"
36_SIMG2IMG = "simg2img"
37_MK_COMBINED_IMG = "mk_combined_img"
38
39_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30
40_AVBTOOL_TIMEOUT_SECS = 30
41_MK_COMBINED_IMG_TIMEOUT_SECS = 180
42
43_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` "
44                          "in build environment, or set --local-tool to an "
45                          "extracted otatools.zip.")
46
47
48def FindOtaTools(search_paths):
49    """Find OTA tools in the search paths and in build environment.
50
51    Args:
52        search_paths: List of paths, the directories to search for OTA tools.
53
54    Returns:
55        The directory containing OTA tools.
56
57    Raises:
58        errors.CheckPathError if OTA tools are not found.
59    """
60    for search_path in search_paths:
61        if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME,
62                                       _BUILD_SUPER_IMAGE)):
63            return search_path
64
65    host_out_dir = os.environ.get(constants.ENV_ANDROID_HOST_OUT)
66    if (host_out_dir and
67            os.path.isfile(os.path.join(host_out_dir, _BIN_DIR_NAME,
68                                        _BUILD_SUPER_IMAGE))):
69        return host_out_dir
70
71    raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG %
72                                {"tool_name": "OTA tool directory"})
73
74
75class OtaTools:
76    """The class that executes OTA tool commands."""
77
78    def __init__(self, ota_tools_dir):
79        self._ota_tools_dir = os.path.abspath(ota_tools_dir)
80
81    def _GetBinary(self, name):
82        """Get an executable file from _ota_tools_dir.
83
84        Args:
85            name: String, the file name.
86
87        Returns:
88            String, the absolute path.
89
90        Raises:
91            errors.NoExecuteCmd if the file does not exist.
92        """
93        path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name)
94        if not os.path.isfile(path):
95            raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG %
96                                      {"tool_name": name})
97        mode = os.stat(path).st_mode
98        os.chmod(path, mode | (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH |
99                               stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH))
100        return path
101
102    @staticmethod
103    def _ExecuteCommand(*command, **popen_args):
104        """Execute a command and log the output.
105
106        This method waits for the process to terminate. It kills the process
107        if it's interrupted due to timeout.
108
109        Args:
110            command: Strings, the command.
111            popen_kwargs: The arguments to be passed to subprocess.Popen.
112
113        Raises:
114            errors.SubprocessFail if the process returns non-zero.
115        """
116        proc = None
117        try:
118            logger.info("Execute %s", command)
119            popen_args["stdin"] = subprocess.PIPE
120            popen_args["stdout"] = subprocess.PIPE
121            popen_args["stderr"] = subprocess.PIPE
122
123            # Some OTA tools are Python scripts in different versions. The
124            # PYTHONPATH for acloud may be incompatible with the tools.
125            if "env" not in popen_args and "PYTHONPATH" in os.environ:
126                popen_env = os.environ.copy()
127                del popen_env["PYTHONPATH"]
128                popen_args["env"] = popen_env
129
130            proc = subprocess.Popen(command, **popen_args)
131            stdout, stderr = proc.communicate()
132            logger.info("%s stdout: %s", command[0], stdout)
133            logger.info("%s stderr: %s", command[0], stderr)
134
135            if proc.returncode != 0:
136                raise errors.SubprocessFail("%s returned %d." %
137                                            (command[0], proc.returncode))
138        finally:
139            if proc and proc.poll() is None:
140                logger.info("Kill %s", command[0])
141                proc.kill()
142
143    @staticmethod
144    def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image):
145        """Rewrite lpmake and image paths in misc_info.txt.
146
147        Misc info consists of multiple lines of <key>=<value>.
148        Sample input_file:
149        lpmake=lpmake
150        dynamic_partition_list= system system_ext product vendor
151
152        Sample output_file:
153        lpmake=/path/to/lpmake
154        dynamic_partition_list= system system_ext product vendor
155        system_image=/path/to/system.img
156        system_ext_image=/path/to/system_ext.img
157        product_image=/path/to/product.img
158        vendor_image=/path/to/vendor.img
159
160        This method replaces lpmake with the specified path, and sets
161        *_image for every partition in dynamic_partition_list.
162
163        Args:
164            output_file: The output file object.
165            input_file: The input file object.
166            lpmake_path: The path to lpmake binary.
167            get_image: A function that takes the partition name as the
168                       parameter and returns the image path.
169        """
170        partition_names = ()
171        for line in input_file:
172            split_line = line.strip().split("=", 1)
173            if len(split_line) < 2:
174                split_line = (split_line[0], "")
175            if split_line[0] == "dynamic_partition_list":
176                partition_names = split_line[1].split()
177            elif split_line[0] == "lpmake":
178                output_file.write(b("lpmake=%s\n" % lpmake_path))
179                continue
180            elif split_line[0].endswith("_image"):
181                continue
182            output_file.write(b(line))
183
184        if not partition_names:
185            logger.w("No dynamic partition list in misc info.")
186
187        for partition_name in partition_names:
188            output_file.write(b("%s_image=%s\n" %
189                                (partition_name, get_image(partition_name))))
190
191    @utils.TimeExecute(function_description="Build super image")
192    @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS)
193    def BuildSuperImage(self, output_path, misc_info_path, get_image):
194        """Use build_super_image to create a super image.
195
196        Args:
197            output_path: The path to the output super image.
198            misc_info_path: The path to the misc info that provides parameters
199                            to create the super image.
200            get_image: A function that takes the partition name as the
201                       parameter and returns the image path.
202        """
203        build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE)
204        lpmake = self._GetBinary(_LPMAKE)
205
206        new_misc_info_path = None
207        try:
208            with open(misc_info_path, "r") as misc_info:
209                with tempfile.NamedTemporaryFile(
210                        prefix="misc_info_", suffix=".txt",
211                        delete=False) as new_misc_info:
212                    new_misc_info_path = new_misc_info.name
213                    self._RewriteMiscInfo(new_misc_info, misc_info, lpmake,
214                                          get_image)
215
216            self._ExecuteCommand(build_super_image, new_misc_info_path,
217                                 output_path)
218        finally:
219            if new_misc_info_path:
220                os.remove(new_misc_info_path)
221
222    @utils.TimeExecute(function_description="Make disabled vbmeta image.")
223    @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS)
224    def MakeDisabledVbmetaImage(self, output_path):
225        """Use avbtool to create a vbmeta image with verification disabled.
226
227        Args:
228            output_path: The path to the output vbmeta image.
229        """
230        avbtool = self._GetBinary(_AVBTOOL)
231        self._ExecuteCommand(avbtool, "make_vbmeta_image",
232                             "--flag", "2",
233                             "--padding_size", "4096",
234                             "--output", output_path)
235
236    @staticmethod
237    def _RewriteSystemQemuConfig(output_file, input_file, get_image):
238        """Rewrite image paths in system-qemu-config.txt.
239
240        Sample input_file:
241        out/target/product/generic_x86_64/vbmeta.img vbmeta 1
242        out/target/product/generic_x86_64/super.img super 2
243
244        Sample output_file:
245        /path/to/vbmeta.img vbmeta 1
246        /path/to/super.img super 2
247
248        This method replaces the first entry of each line with the path
249        returned by get_image.
250
251        Args:
252            output_file: The output file object.
253            input_file: The input file object.
254            get_image: A function that takes the partition name as the
255                       parameter and returns the image path.
256        """
257        for line in input_file:
258            split_line = line.split()
259            if len(split_line) == 3:
260                output_file.write(b("%s %s %s\n" % (get_image(split_line[1]),
261                                                    split_line[1],
262                                                    split_line[2])))
263            else:
264                output_file.write(b(line))
265
266    @utils.TimeExecute(function_description="Make combined image")
267    @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS)
268    def MkCombinedImg(self, output_path, system_qemu_config_path, get_image):
269        """Use mk_combined_img to create a disk image.
270
271        Args:
272            output_path: The path to the output disk image.
273            system_qemu_config: The path to the config that provides the
274                                parition information on the disk.
275            get_image: A function that takes the partition name as the
276                       parameter and returns the image path.
277        """
278        mk_combined_img = self._GetBinary(_MK_COMBINED_IMG)
279        sgdisk = self._GetBinary(_SGDISK)
280        simg2img = self._GetBinary(_SIMG2IMG)
281
282        new_config_path = None
283        try:
284            with open(system_qemu_config_path, "r") as config:
285                with tempfile.NamedTemporaryFile(
286                        prefix="system-qemu-config_", suffix=".txt",
287                        delete=False) as new_config:
288                    new_config_path = new_config.name
289                    self._RewriteSystemQemuConfig(new_config, config,
290                                                  get_image)
291
292            mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img}
293            self._ExecuteCommand(mk_combined_img,
294                                 "-i", new_config_path,
295                                 "-o", output_path,
296                                 env=mk_combined_img_env)
297        finally:
298            if new_config_path:
299                os.remove(new_config_path)
300