1#!/usr/bin/env python 2# 3# Copyright (C) 2009 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18Check the signatures of all APKs in a target_files .zip file. With 19-c, compare the signatures of each package to the ones in a separate 20target_files (usually a previously distributed build for the same 21device) and flag any changes. 22 23Usage: check_target_file_signatures [flags] target_files 24 25 -c (--compare_with) <other_target_files> 26 Look for compatibility problems between the two sets of target 27 files (eg., packages whose keys have changed). 28 29 -l (--local_cert_dirs) <dir,dir,...> 30 Comma-separated list of top-level directories to scan for 31 .x509.pem files. Defaults to "vendor,build". Where cert files 32 can be found that match APK signatures, the filename will be 33 printed as the cert name, otherwise a hash of the cert plus its 34 subject string will be printed instead. 35 36 -t (--text) 37 Dump the certificate information for both packages in comparison 38 mode (this output is normally suppressed). 39 40""" 41 42from __future__ import print_function 43 44import logging 45import os 46import os.path 47import re 48import subprocess 49import sys 50import zipfile 51 52import common 53 54if sys.hexversion < 0x02070000: 55 print("Python 2.7 or newer is required.", file=sys.stderr) 56 sys.exit(1) 57 58 59logger = logging.getLogger(__name__) 60 61# Work around a bug in Python's zipfile module that prevents opening of zipfiles 62# if any entry has an extra field of between 1 and 3 bytes (which is common with 63# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which 64# contains the bug) with an empty version (since we don't need to decode the 65# extra field anyway). 66# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and 67# Python 3.5.0 alpha 1. 68class MyZipInfo(zipfile.ZipInfo): 69 def _decodeExtra(self): 70 pass 71 72zipfile.ZipInfo = MyZipInfo 73 74 75OPTIONS = common.OPTIONS 76 77OPTIONS.text = False 78OPTIONS.compare_with = None 79OPTIONS.local_cert_dirs = ("vendor", "build") 80 81PROBLEMS = [] 82PROBLEM_PREFIX = [] 83 84 85def AddProblem(msg): 86 PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg) 87 88 89def Push(msg): 90 PROBLEM_PREFIX.append(msg) 91 92 93def Pop(): 94 PROBLEM_PREFIX.pop() 95 96 97def Banner(msg): 98 print("-" * 70) 99 print(" ", msg) 100 print("-" * 70) 101 102 103def GetCertSubject(cert): 104 p = common.Run(["openssl", "x509", "-inform", "DER", "-text"], 105 stdin=subprocess.PIPE, 106 stdout=subprocess.PIPE, 107 universal_newlines=False) 108 out, err = p.communicate(cert) 109 if err and not err.strip(): 110 return "(error reading cert subject)" 111 for line in out.decode().split("\n"): 112 line = line.strip() 113 if line.startswith("Subject:"): 114 return line[8:].strip() 115 return "(unknown cert subject)" 116 117 118class CertDB(object): 119 120 def __init__(self): 121 self.certs = {} 122 123 def Add(self, cert_digest, subject, name=None): 124 if cert_digest in self.certs: 125 if name: 126 self.certs[cert_digest] = self.certs[cert_digest] + "," + name 127 else: 128 if name is None: 129 name = "unknown cert %s (%s)" % (cert_digest[:12], subject) 130 self.certs[cert_digest] = name 131 132 def Get(self, cert_digest): 133 """Return the name for a given cert digest.""" 134 return self.certs.get(cert_digest, None) 135 136 def FindLocalCerts(self): 137 to_load = [] 138 for top in OPTIONS.local_cert_dirs: 139 for dirpath, _, filenames in os.walk(top): 140 certs = [os.path.join(dirpath, i) 141 for i in filenames if i.endswith(".x509.pem")] 142 if certs: 143 to_load.extend(certs) 144 145 for i in to_load: 146 with open(i) as f: 147 cert = common.ParseCertificate(f.read()) 148 name, _ = os.path.splitext(i) 149 name, _ = os.path.splitext(name) 150 151 cert_sha1 = common.sha1(cert).hexdigest() 152 cert_subject = GetCertSubject(cert) 153 self.Add(cert_sha1, cert_subject, name) 154 155 156ALL_CERTS = CertDB() 157 158 159def CertFromPKCS7(data, filename): 160 """Read the cert out of a PKCS#7-format file (which is what is 161 stored in a signed .apk).""" 162 Push(filename + ":") 163 try: 164 p = common.Run(["openssl", "pkcs7", 165 "-inform", "DER", 166 "-outform", "PEM", 167 "-print_certs"], 168 stdin=subprocess.PIPE, 169 stdout=subprocess.PIPE, 170 universal_newlines=False) 171 out, err = p.communicate(data) 172 if err and not err.strip(): 173 AddProblem("error reading cert:\n" + err.decode()) 174 return None 175 176 cert = common.ParseCertificate(out.decode()) 177 if not cert: 178 AddProblem("error parsing cert output") 179 return None 180 return cert 181 finally: 182 Pop() 183 184 185class APK(object): 186 187 def __init__(self, full_filename, filename): 188 self.filename = filename 189 self.cert_digests = frozenset() 190 self.shared_uid = None 191 self.package = None 192 193 Push(filename+":") 194 try: 195 self.RecordCerts(full_filename) 196 self.ReadManifest(full_filename) 197 finally: 198 Pop() 199 200 def ReadCertsDeprecated(self, full_filename): 201 print("reading certs in deprecated way for {}".format(full_filename)) 202 cert_digests = set() 203 with zipfile.ZipFile(full_filename) as apk: 204 for info in apk.infolist(): 205 filename = info.filename 206 if (filename.startswith("META-INF/") and 207 info.filename.endswith((".DSA", ".RSA"))): 208 pkcs7 = apk.read(filename) 209 cert = CertFromPKCS7(pkcs7, filename) 210 if not cert: 211 continue 212 cert_sha1 = common.sha1(cert).hexdigest() 213 cert_subject = GetCertSubject(cert) 214 ALL_CERTS.Add(cert_sha1, cert_subject) 215 cert_digests.add(cert_sha1) 216 if not cert_digests: 217 AddProblem("No signature found") 218 return 219 self.cert_digests = frozenset(cert_digests) 220 221 def RecordCerts(self, full_filename): 222 """Parse and save the signature of an apk file.""" 223 224 # Dump the cert info with apksigner 225 cmd = ["apksigner", "verify", "--print-certs", full_filename] 226 p = common.Run(cmd, stdout=subprocess.PIPE) 227 output, _ = p.communicate() 228 if p.returncode != 0: 229 self.ReadCertsDeprecated(full_filename) 230 return 231 232 # Sample output: 233 # Signer #1 certificate DN: ... 234 # Signer #1 certificate SHA-256 digest: ... 235 # Signer #1 certificate SHA-1 digest: ... 236 # ... 237 certs_info = {} 238 certificate_regex = re.compile(r"(Signer #[0-9]+) (certificate .*):(.*)") 239 for line in output.splitlines(): 240 m = certificate_regex.match(line) 241 if not m: 242 continue 243 signer, key, val = m.group(1), m.group(2), m.group(3) 244 if certs_info.get(signer): 245 certs_info[signer].update({key.strip(): val.strip()}) 246 else: 247 certs_info.update({signer: {key.strip(): val.strip()}}) 248 if not certs_info: 249 AddProblem("Failed to parse cert info") 250 return 251 252 cert_digests = set() 253 for signer, props in certs_info.items(): 254 subject = props.get("certificate DN") 255 digest = props.get("certificate SHA-1 digest") 256 if not subject or not digest: 257 AddProblem("Failed to parse cert subject or digest") 258 return 259 ALL_CERTS.Add(digest, subject) 260 cert_digests.add(digest) 261 self.cert_digests = frozenset(cert_digests) 262 263 def ReadManifest(self, full_filename): 264 p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file", 265 "AndroidManifest.xml"], 266 stdout=subprocess.PIPE) 267 manifest, err = p.communicate() 268 if err: 269 AddProblem("failed to read manifest") 270 return 271 272 self.shared_uid = None 273 self.package = None 274 275 for line in manifest.split("\n"): 276 line = line.strip() 277 m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line) 278 if m: 279 name = m.group(1) 280 if name == "android:sharedUserId": 281 if self.shared_uid is not None: 282 AddProblem("multiple sharedUserId declarations") 283 self.shared_uid = m.group(2) 284 elif name == "package": 285 if self.package is not None: 286 AddProblem("multiple package declarations") 287 self.package = m.group(2) 288 289 if self.package is None: 290 AddProblem("no package declaration") 291 292 293class TargetFiles(object): 294 def __init__(self): 295 self.max_pkg_len = 30 296 self.max_fn_len = 20 297 self.apks = None 298 self.apks_by_basename = None 299 self.certmap = None 300 301 def LoadZipFile(self, filename): 302 # First read the APK certs file to figure out whether there are compressed 303 # APKs in the archive. If we do have compressed APKs in the archive, then we 304 # must decompress them individually before we perform any analysis. 305 306 # This is the list of wildcards of files we extract from |filename|. 307 apk_extensions = ['*.apk', '*.apex'] 308 309 with zipfile.ZipFile(filename) as input_zip: 310 self.certmap, compressed_extension = common.ReadApkCerts(input_zip) 311 if compressed_extension: 312 apk_extensions.append('*.apk' + compressed_extension) 313 314 d = common.UnzipTemp(filename, apk_extensions) 315 self.apks = {} 316 self.apks_by_basename = {} 317 for dirpath, _, filenames in os.walk(d): 318 for fn in filenames: 319 # Decompress compressed APKs before we begin processing them. 320 if compressed_extension and fn.endswith(compressed_extension): 321 # First strip the compressed extension from the file. 322 uncompressed_fn = fn[:-len(compressed_extension)] 323 324 # Decompress the compressed file to the output file. 325 common.Gunzip(os.path.join(dirpath, fn), 326 os.path.join(dirpath, uncompressed_fn)) 327 328 # Finally, delete the compressed file and use the uncompressed file 329 # for further processing. Note that the deletion is not strictly 330 # required, but is done here to ensure that we're not using too much 331 # space in the temporary directory. 332 os.remove(os.path.join(dirpath, fn)) 333 fn = uncompressed_fn 334 335 if fn.endswith(('.apk', '.apex')): 336 fullname = os.path.join(dirpath, fn) 337 displayname = fullname[len(d)+1:] 338 apk = APK(fullname, displayname) 339 self.apks[apk.filename] = apk 340 self.apks_by_basename[os.path.basename(apk.filename)] = apk 341 342 self.max_pkg_len = max(self.max_pkg_len, len(apk.package)) 343 self.max_fn_len = max(self.max_fn_len, len(apk.filename)) 344 345 def CheckSharedUids(self): 346 """Look for any instances where packages signed with different 347 certs request the same sharedUserId.""" 348 apks_by_uid = {} 349 for apk in self.apks.values(): 350 if apk.shared_uid: 351 apks_by_uid.setdefault(apk.shared_uid, []).append(apk) 352 353 for uid in sorted(apks_by_uid): 354 apks = apks_by_uid[uid] 355 for apk in apks[1:]: 356 if apk.certs != apks[0].certs: 357 break 358 else: 359 # all packages have the same set of certs; this uid is fine. 360 continue 361 362 AddProblem("different cert sets for packages with uid %s" % (uid,)) 363 364 print("uid %s is shared by packages with different cert sets:" % (uid,)) 365 for apk in apks: 366 print("%-*s [%s]" % (self.max_pkg_len, apk.package, apk.filename)) 367 for digest in apk.cert_digests: 368 print(" ", ALL_CERTS.Get(digest)) 369 print() 370 371 def CheckExternalSignatures(self): 372 for apk_filename, certname in self.certmap.items(): 373 if certname == "EXTERNAL": 374 # Apps marked EXTERNAL should be signed with the test key 375 # during development, then manually re-signed after 376 # predexopting. Consider it an error if this app is now 377 # signed with any key that is present in our tree. 378 apk = self.apks_by_basename[apk_filename] 379 signed_with_external = False 380 for digest in apk.cert_digests: 381 name = ALL_CERTS.Get(digest) 382 if name and name.startswith("unknown "): 383 signed_with_external = True 384 385 if not signed_with_external: 386 Push(apk.filename) 387 AddProblem("hasn't been signed with EXTERNAL cert") 388 Pop() 389 390 def PrintCerts(self): 391 """Display a table of packages grouped by cert.""" 392 by_digest = {} 393 for apk in self.apks.values(): 394 for digest in apk.cert_digests: 395 by_digest.setdefault(digest, []).append((apk.package, apk)) 396 397 order = [(-len(v), k) for (k, v) in by_digest.items()] 398 order.sort() 399 400 for _, digest in order: 401 print("%s:" % (ALL_CERTS.Get(digest),)) 402 apks = by_digest[digest] 403 apks.sort() 404 for _, apk in apks: 405 if apk.shared_uid: 406 print(" %-*s %-*s [%s]" % (self.max_fn_len, apk.filename, 407 self.max_pkg_len, apk.package, 408 apk.shared_uid)) 409 else: 410 print(" %-*s %s" % (self.max_fn_len, apk.filename, apk.package)) 411 print() 412 413 def CompareWith(self, other): 414 """Look for instances where a given package that exists in both 415 self and other have different certs.""" 416 417 all_apks = set(self.apks.keys()) 418 all_apks.update(other.apks.keys()) 419 420 max_pkg_len = max(self.max_pkg_len, other.max_pkg_len) 421 422 by_digestpair = {} 423 424 for i in all_apks: 425 if i in self.apks: 426 if i in other.apks: 427 # in both; should have same set of certs 428 if self.apks[i].cert_digests != other.apks[i].cert_digests: 429 by_digestpair.setdefault((other.apks[i].cert_digests, 430 self.apks[i].cert_digests), []).append(i) 431 else: 432 print("%s [%s]: new APK (not in comparison target_files)" % ( 433 i, self.apks[i].filename)) 434 else: 435 if i in other.apks: 436 print("%s [%s]: removed APK (only in comparison target_files)" % ( 437 i, other.apks[i].filename)) 438 439 if by_digestpair: 440 AddProblem("some APKs changed certs") 441 Banner("APK signing differences") 442 for (old, new), packages in sorted(by_digestpair.items()): 443 for i, o in enumerate(old): 444 if i == 0: 445 print("was", ALL_CERTS.Get(o)) 446 else: 447 print(" ", ALL_CERTS.Get(o)) 448 for i, n in enumerate(new): 449 if i == 0: 450 print("now", ALL_CERTS.Get(n)) 451 else: 452 print(" ", ALL_CERTS.Get(n)) 453 for i in sorted(packages): 454 old_fn = other.apks[i].filename 455 new_fn = self.apks[i].filename 456 if old_fn == new_fn: 457 print(" %-*s [%s]" % (max_pkg_len, i, old_fn)) 458 else: 459 print(" %-*s [was: %s; now: %s]" % (max_pkg_len, i, 460 old_fn, new_fn)) 461 print() 462 463 464def main(argv): 465 def option_handler(o, a): 466 if o in ("-c", "--compare_with"): 467 OPTIONS.compare_with = a 468 elif o in ("-l", "--local_cert_dirs"): 469 OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")] 470 elif o in ("-t", "--text"): 471 OPTIONS.text = True 472 else: 473 return False 474 return True 475 476 args = common.ParseOptions(argv, __doc__, 477 extra_opts="c:l:t", 478 extra_long_opts=["compare_with=", 479 "local_cert_dirs="], 480 extra_option_handler=option_handler) 481 482 if len(args) != 1: 483 common.Usage(__doc__) 484 sys.exit(1) 485 486 common.InitLogging() 487 488 ALL_CERTS.FindLocalCerts() 489 490 Push("input target_files:") 491 try: 492 target_files = TargetFiles() 493 target_files.LoadZipFile(args[0]) 494 finally: 495 Pop() 496 497 compare_files = None 498 if OPTIONS.compare_with: 499 Push("comparison target_files:") 500 try: 501 compare_files = TargetFiles() 502 compare_files.LoadZipFile(OPTIONS.compare_with) 503 finally: 504 Pop() 505 506 if OPTIONS.text or not compare_files: 507 Banner("target files") 508 target_files.PrintCerts() 509 target_files.CheckSharedUids() 510 target_files.CheckExternalSignatures() 511 if compare_files: 512 if OPTIONS.text: 513 Banner("comparison files") 514 compare_files.PrintCerts() 515 target_files.CompareWith(compare_files) 516 517 if PROBLEMS: 518 print("%d problem(s) found:\n" % (len(PROBLEMS),)) 519 for p in PROBLEMS: 520 print(p) 521 return 1 522 523 return 0 524 525 526if __name__ == '__main__': 527 try: 528 r = main(sys.argv[1:]) 529 sys.exit(r) 530 except common.ExternalError as e: 531 print("\n ERROR: %s\n" % (e,)) 532 sys.exit(1) 533 finally: 534 common.Cleanup() 535