1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18# TODO(b/147454897): Keep the logic in sync with
19#                    test/vts/utils/python/controllers/android_device.py until
20#                    it is removed.
21import gzip
22import logging
23import os
24import subprocess
25import tempfile
26
27class AndroidDevice(object):
28    """This class controls the device via adb commands."""
29
30    def __init__(self, serial_number):
31        self._serial_number = serial_number
32
33    def AdbPull(self, src, dst):
34        cmd = ["adb", "-s", self._serial_number, "pull", src, dst]
35        subprocess.check_call(cmd, shell=False, stdin=subprocess.PIPE,
36                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
37
38    def Execute(self, *args):
39        """Executes a command.
40
41        Args:
42            args: Strings, the arguments.
43
44        Returns:
45            Stdout as a string, stderr as a string, and return code as an
46            integer.
47        """
48        cmd = ["adb", "-s", self._serial_number, "shell"]
49        cmd.extend(args)
50        proc = subprocess.Popen(cmd, shell=False, stdin=subprocess.PIPE,
51                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
52        out, err = proc.communicate()
53        # Compatible with python2 and python3
54        if not isinstance(out, str):
55            out = out.decode("utf-8")
56        if not isinstance(err, str):
57            err = err.decode("utf-8")
58        return out, err, proc.returncode
59
60    def _GetProp(self, name):
61        """Gets an Android system property.
62
63        Args:
64            name: A string, the property name.
65
66        Returns:
67            A string, the value of the property.
68
69        Raises:
70            IOError if the command fails.
71        """
72        out, err, return_code = self.Execute("getprop", name)
73        if err.strip() or return_code != 0:
74            raise IOError("`getprop %s` stdout: %s\nstderr: %s" %
75                          (name, out, err))
76        return out.strip()
77
78    def GetCpuAbiList(self, bitness=""):
79        """Gets the list of supported ABIs from property.
80
81        Args:
82            bitness: 32 or 64. If the argument is not specified, this method
83                     returns both 32 and 64-bit ABIs.
84
85        Returns:
86            A list of strings, the supported ABIs.
87        """
88        out = self._GetProp("ro.product.cpu.abilist" + str(bitness))
89        return out.lower().split(",") if out else []
90
91    def GetLaunchApiLevel(self):
92        """Gets the API level that the device was initially launched with.
93
94        This method reads ro.product.first_api_level from the device. If the
95        value is 0, it then reads ro.build.version.sdk.
96
97        Returns:
98            An integer, the API level.
99        """
100        level_str = self._GetProp("ro.product.first_api_level")
101        level = int(level_str)
102        if level != 0:
103            return level
104
105        level_str = self._GetProp("ro.build.version.sdk")
106        return int(level_str)
107
108    def getLaunchApiLevel(self, strict=True):
109        """Gets the API level that the device was initially launched with.
110
111        This method is compatible with vndk_utils in vts package.
112
113        Args:
114            strict: A boolean, whether to raise an error if the property is
115                    not an integer or not defined.
116
117        Returns:
118            An integer, the API level.
119            0 if the value is undefined and strict is False.
120
121        Raises:
122            ValueError: if the value is undefined and strict is True.
123        """
124        try:
125            return self.GetLaunchApiLevel()
126        except ValueError as e:
127            if strict:
128                raise
129            logging.exception(e)
130            return 0
131
132    @property
133    def vndk_lite(self):
134        """Checks whether the vendor partition requests lite VNDK enforcement.
135
136        This method is compatible with vndk_utils in vts package.
137
138        Returns:
139            A boolean, True for lite vndk enforcement.
140        """
141        return self._GetProp("ro.vndk.lite").lower() == "true"
142
143    def GetVndkVersion(self):
144        """Gets the VNDK version that the vendor partition requests."""
145        return self._GetProp("ro.vndk.version")
146
147    def GetKernelConfig(self, config_name):
148        """Gets kernel config from the device.
149
150        Args:
151            config_name: A string, the name of the configuration.
152
153        Returns:
154            "y" or "m" if the config is set.
155            "" if the config is not set.
156            None if fails to read config.
157        """
158        line_prefix = config_name + "="
159        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
160            config_path = temp_file.name
161        try:
162            logging.debug("Pull config.gz to %s", config_path)
163            self.AdbPull("/proc/config.gz", config_path)
164            with gzip.open(config_path, "rt") as config_file:
165                for line in config_file:
166                    if line.strip().startswith(line_prefix):
167                        logging.debug("Found config: %s", line)
168                        return line.strip()[len(line_prefix):]
169            logging.debug("%s is not set.", config_name)
170            return ""
171        except (subprocess.CalledProcessError, IOError) as e:
172            logging.exception("Cannot read kernel config.", e)
173            return None
174        finally:
175            os.remove(config_path)
176
177    def GetBinderBitness(self):
178        """Returns the value of BINDER_IPC_32BIT in kernel config.
179
180        Returns:
181            32 or 64, binder bitness of the device.
182            None if fails to read config.
183        """
184        config_value = self.GetKernelConfig("CONFIG_ANDROID_BINDER_IPC_32BIT")
185        if config_value is None:
186            return None
187        elif config_value:
188            return 32
189        else:
190            return 64
191
192    def IsRoot(self):
193        """Returns whether adb has root privilege on the device."""
194        out, err, return_code = self.Execute("id")
195        if err.strip() or return_code != 0:
196            raise IOError("`id` stdout: %s\nstderr: %s \n" % (out, err))
197        return "uid=0(root)" in out.strip()
198
199    def _Test(self, *args):
200        """Tests file types and status."""
201        out, err, return_code = self.Execute("test", *args)
202        if out.strip() or err.strip():
203            raise IOError("`test` args: %s\nstdout: %s\nstderr: %s" %
204                          (args, out, err))
205        return return_code == 0
206
207    def Exists(self, path):
208        """Returns whether a path on the device exists."""
209        return self._Test("-e", path)
210
211    def IsDirectory(self, path):
212        """Returns whether a path on the device is a directory."""
213        return self._Test("-d", path)
214
215    def _Stat(self, fmt, path):
216        """Executes stat command."""
217        out, err, return_code = self.Execute("stat", "--format", fmt, path)
218        if return_code != 0 or err.strip():
219            raise IOError("`stat --format %s %s` stdout: %s\nstderr: %s" %
220                          (fmt, path, out, err))
221        return out.strip()
222
223    def IsExecutable(self, path):
224        """Returns if execute permission is granted to a path on the device."""
225        return "x" in self._Stat("%A", path)
226
227    def FindFiles(self, path, name_pattern, *options):
228        """Executes find command.
229
230        Args:
231            path: A string, the path on the device.
232            name_pattern: A string, the pattern of the file name.
233            options: Strings, extra options passed to the command.
234
235        Returns:
236            A list of strings, the paths to the found files.
237
238        Raises:
239            ValueError if the pattern contains quotes.
240            IOError if the path does not exist.
241        """
242        if '"' in name_pattern or "'" in name_pattern:
243            raise ValueError("File name pattern contains quotes.")
244        out, err, return_code = self.Execute("find", path, "-name",
245                                             "'" + name_pattern + "'",
246                                             *options)
247        if return_code != 0 or err.strip():
248            raise IOError("`find %s -name '%s' %s` stdout: %s\nstderr: %s" %
249                          (path, name_pattern, " ".join(options), out, err))
250        return out.strip().split("\n")
251