1#!/usr/bin/env python
2#
3# Copyright (C) 2009 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Check the signatures of all APKs in a target_files .zip file.  With
19-c, compare the signatures of each package to the ones in a separate
20target_files (usually a previously distributed build for the same
21device) and flag any changes.
22
23Usage:  check_target_file_signatures [flags] target_files
24
25  -c  (--compare_with)  <other_target_files>
26      Look for compatibility problems between the two sets of target
27      files (eg., packages whose keys have changed).
28
29  -l  (--local_cert_dirs)  <dir,dir,...>
30      Comma-separated list of top-level directories to scan for
31      .x509.pem files.  Defaults to "vendor,build".  Where cert files
32      can be found that match APK signatures, the filename will be
33      printed as the cert name, otherwise a hash of the cert plus its
34      subject string will be printed instead.
35
36  -t  (--text)
37      Dump the certificate information for both packages in comparison
38      mode (this output is normally suppressed).
39
40"""
41
42from __future__ import print_function
43
44import logging
45import os
46import os.path
47import re
48import subprocess
49import sys
50import zipfile
51
52import common
53
54if sys.hexversion < 0x02070000:
55  print("Python 2.7 or newer is required.", file=sys.stderr)
56  sys.exit(1)
57
58
59logger = logging.getLogger(__name__)
60
61# Work around a bug in Python's zipfile module that prevents opening of zipfiles
62# if any entry has an extra field of between 1 and 3 bytes (which is common with
63# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which
64# contains the bug) with an empty version (since we don't need to decode the
65# extra field anyway).
66# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and
67# Python 3.5.0 alpha 1.
68class MyZipInfo(zipfile.ZipInfo):
69  def _decodeExtra(self):
70    pass
71
72zipfile.ZipInfo = MyZipInfo
73
74
75OPTIONS = common.OPTIONS
76
77OPTIONS.text = False
78OPTIONS.compare_with = None
79OPTIONS.local_cert_dirs = ("vendor", "build")
80
81PROBLEMS = []
82PROBLEM_PREFIX = []
83
84
85def AddProblem(msg):
86  PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
87
88
89def Push(msg):
90  PROBLEM_PREFIX.append(msg)
91
92
93def Pop():
94  PROBLEM_PREFIX.pop()
95
96
97def Banner(msg):
98  print("-" * 70)
99  print("  ", msg)
100  print("-" * 70)
101
102
103def GetCertSubject(cert):
104  p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
105                 stdin=subprocess.PIPE,
106                 stdout=subprocess.PIPE,
107                 universal_newlines=False)
108  out, err = p.communicate(cert)
109  if err and not err.strip():
110    return "(error reading cert subject)"
111  for line in out.decode().split("\n"):
112    line = line.strip()
113    if line.startswith("Subject:"):
114      return line[8:].strip()
115  return "(unknown cert subject)"
116
117
118class CertDB(object):
119
120  def __init__(self):
121    self.certs = {}
122
123  def Add(self, cert_digest, subject, name=None):
124    if cert_digest in self.certs:
125      if name:
126        self.certs[cert_digest] = self.certs[cert_digest] + "," + name
127    else:
128      if name is None:
129        name = "unknown cert %s (%s)" % (cert_digest[:12], subject)
130      self.certs[cert_digest] = name
131
132  def Get(self, cert_digest):
133    """Return the name for a given cert digest."""
134    return self.certs.get(cert_digest, None)
135
136  def FindLocalCerts(self):
137    to_load = []
138    for top in OPTIONS.local_cert_dirs:
139      for dirpath, _, filenames in os.walk(top):
140        certs = [os.path.join(dirpath, i)
141                 for i in filenames if i.endswith(".x509.pem")]
142        if certs:
143          to_load.extend(certs)
144
145    for i in to_load:
146      with open(i) as f:
147        cert = common.ParseCertificate(f.read())
148      name, _ = os.path.splitext(i)
149      name, _ = os.path.splitext(name)
150
151      cert_sha1 = common.sha1(cert).hexdigest()
152      cert_subject = GetCertSubject(cert)
153      self.Add(cert_sha1, cert_subject, name)
154
155
156ALL_CERTS = CertDB()
157
158
159def CertFromPKCS7(data, filename):
160  """Read the cert out of a PKCS#7-format file (which is what is
161  stored in a signed .apk)."""
162  Push(filename + ":")
163  try:
164    p = common.Run(["openssl", "pkcs7",
165                    "-inform", "DER",
166                    "-outform", "PEM",
167                    "-print_certs"],
168                   stdin=subprocess.PIPE,
169                   stdout=subprocess.PIPE,
170                   universal_newlines=False)
171    out, err = p.communicate(data)
172    if err and not err.strip():
173      AddProblem("error reading cert:\n" + err.decode())
174      return None
175
176    cert = common.ParseCertificate(out.decode())
177    if not cert:
178      AddProblem("error parsing cert output")
179      return None
180    return cert
181  finally:
182    Pop()
183
184
185class APK(object):
186
187  def __init__(self, full_filename, filename):
188    self.filename = filename
189    self.cert_digests = frozenset()
190    self.shared_uid = None
191    self.package = None
192
193    Push(filename+":")
194    try:
195      self.RecordCerts(full_filename)
196      self.ReadManifest(full_filename)
197    finally:
198      Pop()
199
200  def ReadCertsDeprecated(self, full_filename):
201    print("reading certs in deprecated way for {}".format(full_filename))
202    cert_digests = set()
203    with zipfile.ZipFile(full_filename) as apk:
204      for info in apk.infolist():
205        filename = info.filename
206        if (filename.startswith("META-INF/") and
207            info.filename.endswith((".DSA", ".RSA"))):
208          pkcs7 = apk.read(filename)
209          cert = CertFromPKCS7(pkcs7, filename)
210          if not cert:
211            continue
212          cert_sha1 = common.sha1(cert).hexdigest()
213          cert_subject = GetCertSubject(cert)
214          ALL_CERTS.Add(cert_sha1, cert_subject)
215          cert_digests.add(cert_sha1)
216    if not cert_digests:
217      AddProblem("No signature found")
218      return
219    self.cert_digests = frozenset(cert_digests)
220
221  def RecordCerts(self, full_filename):
222    """Parse and save the signature of an apk file."""
223
224    # Dump the cert info with apksigner
225    cmd = ["apksigner", "verify", "--print-certs", full_filename]
226    p = common.Run(cmd, stdout=subprocess.PIPE)
227    output, _ = p.communicate()
228    if p.returncode != 0:
229      self.ReadCertsDeprecated(full_filename)
230      return
231
232    # Sample output:
233    # Signer #1 certificate DN: ...
234    # Signer #1 certificate SHA-256 digest: ...
235    # Signer #1 certificate SHA-1 digest: ...
236    # ...
237    certs_info = {}
238    certificate_regex = re.compile(r"(Signer #[0-9]+) (certificate .*):(.*)")
239    for line in output.splitlines():
240      m = certificate_regex.match(line)
241      if not m:
242        continue
243      signer, key, val = m.group(1), m.group(2), m.group(3)
244      if certs_info.get(signer):
245        certs_info[signer].update({key.strip(): val.strip()})
246      else:
247        certs_info.update({signer: {key.strip(): val.strip()}})
248    if not certs_info:
249      AddProblem("Failed to parse cert info")
250      return
251
252    cert_digests = set()
253    for signer, props in certs_info.items():
254      subject = props.get("certificate DN")
255      digest = props.get("certificate SHA-1 digest")
256      if not subject or not digest:
257        AddProblem("Failed to parse cert subject or digest")
258        return
259      ALL_CERTS.Add(digest, subject)
260      cert_digests.add(digest)
261    self.cert_digests = frozenset(cert_digests)
262
263  def ReadManifest(self, full_filename):
264    p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file",
265                    "AndroidManifest.xml"],
266                   stdout=subprocess.PIPE)
267    manifest, err = p.communicate()
268    if err:
269      AddProblem("failed to read manifest")
270      return
271
272    self.shared_uid = None
273    self.package = None
274
275    for line in manifest.split("\n"):
276      line = line.strip()
277      m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
278      if m:
279        name = m.group(1)
280        if name == "android:sharedUserId":
281          if self.shared_uid is not None:
282            AddProblem("multiple sharedUserId declarations")
283          self.shared_uid = m.group(2)
284        elif name == "package":
285          if self.package is not None:
286            AddProblem("multiple package declarations")
287          self.package = m.group(2)
288
289    if self.package is None:
290      AddProblem("no package declaration")
291
292
293class TargetFiles(object):
294  def __init__(self):
295    self.max_pkg_len = 30
296    self.max_fn_len = 20
297    self.apks = None
298    self.apks_by_basename = None
299    self.certmap = None
300
301  def LoadZipFile(self, filename):
302    # First read the APK certs file to figure out whether there are compressed
303    # APKs in the archive. If we do have compressed APKs in the archive, then we
304    # must decompress them individually before we perform any analysis.
305
306    # This is the list of wildcards of files we extract from |filename|.
307    apk_extensions = ['*.apk', '*.apex']
308
309    with zipfile.ZipFile(filename) as input_zip:
310      self.certmap, compressed_extension = common.ReadApkCerts(input_zip)
311    if compressed_extension:
312      apk_extensions.append('*.apk' + compressed_extension)
313
314    d = common.UnzipTemp(filename, apk_extensions)
315    self.apks = {}
316    self.apks_by_basename = {}
317    for dirpath, _, filenames in os.walk(d):
318      for fn in filenames:
319        # Decompress compressed APKs before we begin processing them.
320        if compressed_extension and fn.endswith(compressed_extension):
321          # First strip the compressed extension from the file.
322          uncompressed_fn = fn[:-len(compressed_extension)]
323
324          # Decompress the compressed file to the output file.
325          common.Gunzip(os.path.join(dirpath, fn),
326                        os.path.join(dirpath, uncompressed_fn))
327
328          # Finally, delete the compressed file and use the uncompressed file
329          # for further processing. Note that the deletion is not strictly
330          # required, but is done here to ensure that we're not using too much
331          # space in the temporary directory.
332          os.remove(os.path.join(dirpath, fn))
333          fn = uncompressed_fn
334
335        if fn.endswith(('.apk', '.apex')):
336          fullname = os.path.join(dirpath, fn)
337          displayname = fullname[len(d)+1:]
338          apk = APK(fullname, displayname)
339          self.apks[apk.filename] = apk
340          self.apks_by_basename[os.path.basename(apk.filename)] = apk
341
342          self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
343          self.max_fn_len = max(self.max_fn_len, len(apk.filename))
344
345  def CheckSharedUids(self):
346    """Look for any instances where packages signed with different
347    certs request the same sharedUserId."""
348    apks_by_uid = {}
349    for apk in self.apks.values():
350      if apk.shared_uid:
351        apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
352
353    for uid in sorted(apks_by_uid):
354      apks = apks_by_uid[uid]
355      for apk in apks[1:]:
356        if apk.certs != apks[0].certs:
357          break
358      else:
359        # all packages have the same set of certs; this uid is fine.
360        continue
361
362      AddProblem("different cert sets for packages with uid %s" % (uid,))
363
364      print("uid %s is shared by packages with different cert sets:" % (uid,))
365      for apk in apks:
366        print("%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename))
367        for digest in apk.cert_digests:
368          print("   ", ALL_CERTS.Get(digest))
369      print()
370
371  def CheckExternalSignatures(self):
372    for apk_filename, certname in self.certmap.items():
373      if certname == "EXTERNAL":
374        # Apps marked EXTERNAL should be signed with the test key
375        # during development, then manually re-signed after
376        # predexopting.  Consider it an error if this app is now
377        # signed with any key that is present in our tree.
378        apk = self.apks_by_basename[apk_filename]
379        signed_with_external = False
380        for digest in apk.cert_digests:
381          name = ALL_CERTS.Get(digest)
382          if name and name.startswith("unknown "):
383            signed_with_external = True
384
385        if not signed_with_external:
386          Push(apk.filename)
387          AddProblem("hasn't been signed with EXTERNAL cert")
388          Pop()
389
390  def PrintCerts(self):
391    """Display a table of packages grouped by cert."""
392    by_digest = {}
393    for apk in self.apks.values():
394      for digest in apk.cert_digests:
395        by_digest.setdefault(digest, []).append((apk.package, apk))
396
397    order = [(-len(v), k) for (k, v) in by_digest.items()]
398    order.sort()
399
400    for _, digest in order:
401      print("%s:" % (ALL_CERTS.Get(digest),))
402      apks = by_digest[digest]
403      apks.sort()
404      for _, apk in apks:
405        if apk.shared_uid:
406          print("  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
407                                        self.max_pkg_len, apk.package,
408                                        apk.shared_uid))
409        else:
410          print("  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package))
411      print()
412
413  def CompareWith(self, other):
414    """Look for instances where a given package that exists in both
415    self and other have different certs."""
416
417    all_apks = set(self.apks.keys())
418    all_apks.update(other.apks.keys())
419
420    max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
421
422    by_digestpair = {}
423
424    for i in all_apks:
425      if i in self.apks:
426        if i in other.apks:
427          # in both; should have same set of certs
428          if self.apks[i].cert_digests != other.apks[i].cert_digests:
429            by_digestpair.setdefault((other.apks[i].cert_digests,
430                                      self.apks[i].cert_digests), []).append(i)
431        else:
432          print("%s [%s]: new APK (not in comparison target_files)" % (
433              i, self.apks[i].filename))
434      else:
435        if i in other.apks:
436          print("%s [%s]: removed APK (only in comparison target_files)" % (
437              i, other.apks[i].filename))
438
439    if by_digestpair:
440      AddProblem("some APKs changed certs")
441      Banner("APK signing differences")
442      for (old, new), packages in sorted(by_digestpair.items()):
443        for i, o in enumerate(old):
444          if i == 0:
445            print("was", ALL_CERTS.Get(o))
446          else:
447            print("   ", ALL_CERTS.Get(o))
448        for i, n in enumerate(new):
449          if i == 0:
450            print("now", ALL_CERTS.Get(n))
451          else:
452            print("   ", ALL_CERTS.Get(n))
453        for i in sorted(packages):
454          old_fn = other.apks[i].filename
455          new_fn = self.apks[i].filename
456          if old_fn == new_fn:
457            print("  %-*s  [%s]" % (max_pkg_len, i, old_fn))
458          else:
459            print("  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
460                                                  old_fn, new_fn))
461        print()
462
463
464def main(argv):
465  def option_handler(o, a):
466    if o in ("-c", "--compare_with"):
467      OPTIONS.compare_with = a
468    elif o in ("-l", "--local_cert_dirs"):
469      OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
470    elif o in ("-t", "--text"):
471      OPTIONS.text = True
472    else:
473      return False
474    return True
475
476  args = common.ParseOptions(argv, __doc__,
477                             extra_opts="c:l:t",
478                             extra_long_opts=["compare_with=",
479                                              "local_cert_dirs="],
480                             extra_option_handler=option_handler)
481
482  if len(args) != 1:
483    common.Usage(__doc__)
484    sys.exit(1)
485
486  common.InitLogging()
487
488  ALL_CERTS.FindLocalCerts()
489
490  Push("input target_files:")
491  try:
492    target_files = TargetFiles()
493    target_files.LoadZipFile(args[0])
494  finally:
495    Pop()
496
497  compare_files = None
498  if OPTIONS.compare_with:
499    Push("comparison target_files:")
500    try:
501      compare_files = TargetFiles()
502      compare_files.LoadZipFile(OPTIONS.compare_with)
503    finally:
504      Pop()
505
506  if OPTIONS.text or not compare_files:
507    Banner("target files")
508    target_files.PrintCerts()
509  target_files.CheckSharedUids()
510  target_files.CheckExternalSignatures()
511  if compare_files:
512    if OPTIONS.text:
513      Banner("comparison files")
514      compare_files.PrintCerts()
515    target_files.CompareWith(compare_files)
516
517  if PROBLEMS:
518    print("%d problem(s) found:\n" % (len(PROBLEMS),))
519    for p in PROBLEMS:
520      print(p)
521    return 1
522
523  return 0
524
525
526if __name__ == '__main__':
527  try:
528    r = main(sys.argv[1:])
529    sys.exit(r)
530  except common.ExternalError as e:
531    print("\n   ERROR: %s\n" % (e,))
532    sys.exit(1)
533  finally:
534    common.Cleanup()
535