#!/usr/bin/env python # # Copyright (C) 2009 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Check the signatures of all APKs in a target_files .zip file. With -c, compare the signatures of each package to the ones in a separate target_files (usually a previously distributed build for the same device) and flag any changes. Usage: check_target_file_signatures [flags] target_files -c (--compare_with) Look for compatibility problems between the two sets of target files (eg., packages whose keys have changed). -l (--local_cert_dirs) Comma-separated list of top-level directories to scan for .x509.pem files. Defaults to "vendor,build". Where cert files can be found that match APK signatures, the filename will be printed as the cert name, otherwise a hash of the cert plus its subject string will be printed instead. -t (--text) Dump the certificate information for both packages in comparison mode (this output is normally suppressed). """ from __future__ import print_function import logging import os import os.path import re import subprocess import sys import zipfile import common if sys.hexversion < 0x02070000: print("Python 2.7 or newer is required.", file=sys.stderr) sys.exit(1) logger = logging.getLogger(__name__) # Work around a bug in Python's zipfile module that prevents opening of zipfiles # if any entry has an extra field of between 1 and 3 bytes (which is common with # zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which # contains the bug) with an empty version (since we don't need to decode the # extra field anyway). # Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and # Python 3.5.0 alpha 1. class MyZipInfo(zipfile.ZipInfo): def _decodeExtra(self): pass zipfile.ZipInfo = MyZipInfo OPTIONS = common.OPTIONS OPTIONS.text = False OPTIONS.compare_with = None OPTIONS.local_cert_dirs = ("vendor", "build") PROBLEMS = [] PROBLEM_PREFIX = [] def AddProblem(msg): PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg) def Push(msg): PROBLEM_PREFIX.append(msg) def Pop(): PROBLEM_PREFIX.pop() def Banner(msg): print("-" * 70) print(" ", msg) print("-" * 70) def GetCertSubject(cert): p = common.Run(["openssl", "x509", "-inform", "DER", "-text"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False) out, err = p.communicate(cert) if err and not err.strip(): return "(error reading cert subject)" for line in out.decode().split("\n"): line = line.strip() if line.startswith("Subject:"): return line[8:].strip() return "(unknown cert subject)" class CertDB(object): def __init__(self): self.certs = {} def Add(self, cert_digest, subject, name=None): if cert_digest in self.certs: if name: self.certs[cert_digest] = self.certs[cert_digest] + "," + name else: if name is None: name = "unknown cert %s (%s)" % (cert_digest[:12], subject) self.certs[cert_digest] = name def Get(self, cert_digest): """Return the name for a given cert digest.""" return self.certs.get(cert_digest, None) def FindLocalCerts(self): to_load = [] for top in OPTIONS.local_cert_dirs: for dirpath, _, filenames in os.walk(top): certs = [os.path.join(dirpath, i) for i in filenames if i.endswith(".x509.pem")] if certs: to_load.extend(certs) for i in to_load: with open(i) as f: cert = common.ParseCertificate(f.read()) name, _ = os.path.splitext(i) name, _ = os.path.splitext(name) cert_sha1 = common.sha1(cert).hexdigest() cert_subject = GetCertSubject(cert) self.Add(cert_sha1, cert_subject, name) ALL_CERTS = CertDB() def CertFromPKCS7(data, filename): """Read the cert out of a PKCS#7-format file (which is what is stored in a signed .apk).""" Push(filename + ":") try: p = common.Run(["openssl", "pkcs7", "-inform", "DER", "-outform", "PEM", "-print_certs"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False) out, err = p.communicate(data) if err and not err.strip(): AddProblem("error reading cert:\n" + err.decode()) return None cert = common.ParseCertificate(out.decode()) if not cert: AddProblem("error parsing cert output") return None return cert finally: Pop() class APK(object): def __init__(self, full_filename, filename): self.filename = filename self.cert_digests = frozenset() self.shared_uid = None self.package = None Push(filename+":") try: self.RecordCerts(full_filename) self.ReadManifest(full_filename) finally: Pop() def ReadCertsDeprecated(self, full_filename): print("reading certs in deprecated way for {}".format(full_filename)) cert_digests = set() with zipfile.ZipFile(full_filename) as apk: for info in apk.infolist(): filename = info.filename if (filename.startswith("META-INF/") and info.filename.endswith((".DSA", ".RSA"))): pkcs7 = apk.read(filename) cert = CertFromPKCS7(pkcs7, filename) if not cert: continue cert_sha1 = common.sha1(cert).hexdigest() cert_subject = GetCertSubject(cert) ALL_CERTS.Add(cert_sha1, cert_subject) cert_digests.add(cert_sha1) if not cert_digests: AddProblem("No signature found") return self.cert_digests = frozenset(cert_digests) def RecordCerts(self, full_filename): """Parse and save the signature of an apk file.""" # Dump the cert info with apksigner cmd = ["apksigner", "verify", "--print-certs", full_filename] p = common.Run(cmd, stdout=subprocess.PIPE) output, _ = p.communicate() if p.returncode != 0: self.ReadCertsDeprecated(full_filename) return # Sample output: # Signer #1 certificate DN: ... # Signer #1 certificate SHA-256 digest: ... # Signer #1 certificate SHA-1 digest: ... # ... certs_info = {} certificate_regex = re.compile(r"(Signer #[0-9]+) (certificate .*):(.*)") for line in output.splitlines(): m = certificate_regex.match(line) if not m: continue signer, key, val = m.group(1), m.group(2), m.group(3) if certs_info.get(signer): certs_info[signer].update({key.strip(): val.strip()}) else: certs_info.update({signer: {key.strip(): val.strip()}}) if not certs_info: AddProblem("Failed to parse cert info") return cert_digests = set() for signer, props in certs_info.items(): subject = props.get("certificate DN") digest = props.get("certificate SHA-1 digest") if not subject or not digest: AddProblem("Failed to parse cert subject or digest") return ALL_CERTS.Add(digest, subject) cert_digests.add(digest) self.cert_digests = frozenset(cert_digests) def ReadManifest(self, full_filename): p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file", "AndroidManifest.xml"], stdout=subprocess.PIPE) manifest, err = p.communicate() if err: AddProblem("failed to read manifest") return self.shared_uid = None self.package = None for line in manifest.split("\n"): line = line.strip() m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line) if m: name = m.group(1) if name == "android:sharedUserId": if self.shared_uid is not None: AddProblem("multiple sharedUserId declarations") self.shared_uid = m.group(2) elif name == "package": if self.package is not None: AddProblem("multiple package declarations") self.package = m.group(2) if self.package is None: AddProblem("no package declaration") class TargetFiles(object): def __init__(self): self.max_pkg_len = 30 self.max_fn_len = 20 self.apks = None self.apks_by_basename = None self.certmap = None def LoadZipFile(self, filename): # First read the APK certs file to figure out whether there are compressed # APKs in the archive. If we do have compressed APKs in the archive, then we # must decompress them individually before we perform any analysis. # This is the list of wildcards of files we extract from |filename|. apk_extensions = ['*.apk', '*.apex'] with zipfile.ZipFile(filename) as input_zip: self.certmap, compressed_extension = common.ReadApkCerts(input_zip) if compressed_extension: apk_extensions.append('*.apk' + compressed_extension) d = common.UnzipTemp(filename, apk_extensions) self.apks = {} self.apks_by_basename = {} for dirpath, _, filenames in os.walk(d): for fn in filenames: # Decompress compressed APKs before we begin processing them. if compressed_extension and fn.endswith(compressed_extension): # First strip the compressed extension from the file. uncompressed_fn = fn[:-len(compressed_extension)] # Decompress the compressed file to the output file. common.Gunzip(os.path.join(dirpath, fn), os.path.join(dirpath, uncompressed_fn)) # Finally, delete the compressed file and use the uncompressed file # for further processing. Note that the deletion is not strictly # required, but is done here to ensure that we're not using too much # space in the temporary directory. os.remove(os.path.join(dirpath, fn)) fn = uncompressed_fn if fn.endswith(('.apk', '.apex')): fullname = os.path.join(dirpath, fn) displayname = fullname[len(d)+1:] apk = APK(fullname, displayname) self.apks[apk.filename] = apk self.apks_by_basename[os.path.basename(apk.filename)] = apk self.max_pkg_len = max(self.max_pkg_len, len(apk.package)) self.max_fn_len = max(self.max_fn_len, len(apk.filename)) def CheckSharedUids(self): """Look for any instances where packages signed with different certs request the same sharedUserId.""" apks_by_uid = {} for apk in self.apks.values(): if apk.shared_uid: apks_by_uid.setdefault(apk.shared_uid, []).append(apk) for uid in sorted(apks_by_uid): apks = apks_by_uid[uid] for apk in apks[1:]: if apk.certs != apks[0].certs: break else: # all packages have the same set of certs; this uid is fine. continue AddProblem("different cert sets for packages with uid %s" % (uid,)) print("uid %s is shared by packages with different cert sets:" % (uid,)) for apk in apks: print("%-*s [%s]" % (self.max_pkg_len, apk.package, apk.filename)) for digest in apk.cert_digests: print(" ", ALL_CERTS.Get(digest)) print() def CheckExternalSignatures(self): for apk_filename, certname in self.certmap.items(): if certname == "EXTERNAL": # Apps marked EXTERNAL should be signed with the test key # during development, then manually re-signed after # predexopting. Consider it an error if this app is now # signed with any key that is present in our tree. apk = self.apks_by_basename[apk_filename] signed_with_external = False for digest in apk.cert_digests: name = ALL_CERTS.Get(digest) if name and name.startswith("unknown "): signed_with_external = True if not signed_with_external: Push(apk.filename) AddProblem("hasn't been signed with EXTERNAL cert") Pop() def PrintCerts(self): """Display a table of packages grouped by cert.""" by_digest = {} for apk in self.apks.values(): for digest in apk.cert_digests: by_digest.setdefault(digest, []).append((apk.package, apk)) order = [(-len(v), k) for (k, v) in by_digest.items()] order.sort() for _, digest in order: print("%s:" % (ALL_CERTS.Get(digest),)) apks = by_digest[digest] apks.sort() for _, apk in apks: if apk.shared_uid: print(" %-*s %-*s [%s]" % (self.max_fn_len, apk.filename, self.max_pkg_len, apk.package, apk.shared_uid)) else: print(" %-*s %s" % (self.max_fn_len, apk.filename, apk.package)) print() def CompareWith(self, other): """Look for instances where a given package that exists in both self and other have different certs.""" all_apks = set(self.apks.keys()) all_apks.update(other.apks.keys()) max_pkg_len = max(self.max_pkg_len, other.max_pkg_len) by_digestpair = {} for i in all_apks: if i in self.apks: if i in other.apks: # in both; should have same set of certs if self.apks[i].cert_digests != other.apks[i].cert_digests: by_digestpair.setdefault((other.apks[i].cert_digests, self.apks[i].cert_digests), []).append(i) else: print("%s [%s]: new APK (not in comparison target_files)" % ( i, self.apks[i].filename)) else: if i in other.apks: print("%s [%s]: removed APK (only in comparison target_files)" % ( i, other.apks[i].filename)) if by_digestpair: AddProblem("some APKs changed certs") Banner("APK signing differences") for (old, new), packages in sorted(by_digestpair.items()): for i, o in enumerate(old): if i == 0: print("was", ALL_CERTS.Get(o)) else: print(" ", ALL_CERTS.Get(o)) for i, n in enumerate(new): if i == 0: print("now", ALL_CERTS.Get(n)) else: print(" ", ALL_CERTS.Get(n)) for i in sorted(packages): old_fn = other.apks[i].filename new_fn = self.apks[i].filename if old_fn == new_fn: print(" %-*s [%s]" % (max_pkg_len, i, old_fn)) else: print(" %-*s [was: %s; now: %s]" % (max_pkg_len, i, old_fn, new_fn)) print() def main(argv): def option_handler(o, a): if o in ("-c", "--compare_with"): OPTIONS.compare_with = a elif o in ("-l", "--local_cert_dirs"): OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")] elif o in ("-t", "--text"): OPTIONS.text = True else: return False return True args = common.ParseOptions(argv, __doc__, extra_opts="c:l:t", extra_long_opts=["compare_with=", "local_cert_dirs="], extra_option_handler=option_handler) if len(args) != 1: common.Usage(__doc__) sys.exit(1) common.InitLogging() ALL_CERTS.FindLocalCerts() Push("input target_files:") try: target_files = TargetFiles() target_files.LoadZipFile(args[0]) finally: Pop() compare_files = None if OPTIONS.compare_with: Push("comparison target_files:") try: compare_files = TargetFiles() compare_files.LoadZipFile(OPTIONS.compare_with) finally: Pop() if OPTIONS.text or not compare_files: Banner("target files") target_files.PrintCerts() target_files.CheckSharedUids() target_files.CheckExternalSignatures() if compare_files: if OPTIONS.text: Banner("comparison files") compare_files.PrintCerts() target_files.CompareWith(compare_files) if PROBLEMS: print("%d problem(s) found:\n" % (len(PROBLEMS),)) for p in PROBLEMS: print(p) return 1 return 0 if __name__ == '__main__': try: r = main(sys.argv[1:]) sys.exit(r) except common.ExternalError as e: print("\n ERROR: %s\n" % (e,)) sys.exit(1) finally: common.Cleanup()