1# Copyright 2014 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import copy 16import math 17import os 18import os.path 19import re 20import subprocess 21import sys 22import tempfile 23import threading 24import time 25 26import its.caps 27import its.cv2image 28import its.device 29from its.device import ItsSession 30import its.image 31 32import numpy as np 33 34# For checking the installed APK's target SDK version 35MIN_SUPPORTED_SDK_VERSION = 28 # P 36 37CHART_DELAY = 1 # seconds 38CHART_DISTANCE = 30.0 # cm 39CHART_HEIGHT = 13.5 # cm 40CHART_LEVEL = 96 41CHART_SCALE_START = 0.65 42CHART_SCALE_STOP = 1.35 43CHART_SCALE_STEP = 0.025 44FACING_EXTERNAL = 2 45NUM_TRYS = 2 46PROC_TIMEOUT_CODE = -101 # terminated process return -process_id 47PROC_TIMEOUT_TIME = 900 # timeout in seconds for a process (15 minutes) 48SCENE3_FILE = os.path.join(os.environ['CAMERA_ITS_TOP'], 'pymodules', 'its', 49 'test_images', 'ISO12233.png') 50SKIP_RET_CODE = 101 # note this must be same as tests/scene*/test_* 51VGA_HEIGHT = 480 52VGA_WIDTH = 640 53 54# Not yet mandated tests 55NOT_YET_MANDATED = { 56 'scene0': [ 57 'test_test_patterns', 58 'test_tonemap_curve' 59 ], 60 'scene1': [ 61 'test_ae_precapture_trigger', 62 'test_channel_saturation' 63 ], 64 'scene2': [ 65 'test_auto_per_frame_control' 66 ], 67 'scene2b': [], 68 'scene2c': [], 69 'scene3': [], 70 'scene4': [], 71 'scene5': [], 72 'sensor_fusion': [] 73} 74 75# Must match mHiddenPhysicalCameraSceneIds in ItsTestActivity.java 76HIDDEN_PHYSICAL_CAMERA_TESTS = { 77 'scene0': [ 78 'test_burst_capture', 79 'test_metadata', 80 'test_read_write', 81 'test_sensor_events' 82 ], 83 'scene1': [ 84 'test_exposure', 85 'test_dng_noise_model', 86 'test_linearity', 87 'test_raw_exposure', 88 'test_raw_sensitivity' 89 ], 90 'scene2': [ 91 'test_faces', 92 'test_num_faces' 93 ], 94 'scene2b': [], 95 'scene2c': [], 96 'scene3': [], 97 'scene4': [ 98 'test_aspect_ratio_and_crop' 99 ], 100 'scene5': [], 101 'sensor_fusion': [ 102 'test_sensor_fusion' 103 ] 104} 105 106def run_subprocess_with_timeout(cmd, fout, ferr, outdir): 107 """Run subprocess with a timeout. 108 109 Args: 110 cmd: list containing python command 111 fout: stdout file for the test 112 ferr: stderr file for the test 113 outdir: dir location for fout/ferr 114 115 Returns: 116 process status or PROC_TIMEOUT_CODE if timer maxes 117 """ 118 119 proc = subprocess.Popen( 120 cmd, stdout=fout, stderr=ferr, cwd=outdir) 121 timer = threading.Timer(PROC_TIMEOUT_TIME, proc.kill) 122 123 try: 124 timer.start() 125 proc.communicate() 126 test_code = proc.returncode 127 finally: 128 timer.cancel() 129 130 if test_code < 0: 131 return PROC_TIMEOUT_CODE 132 else: 133 return test_code 134 135 136def calc_camera_fov(camera_id, hidden_physical_id): 137 """Determine the camera field of view from internal params.""" 138 with ItsSession(camera_id, hidden_physical_id) as cam: 139 props = cam.get_camera_properties() 140 props = cam.override_with_hidden_physical_camera_props(props) 141 focal_ls = props['android.lens.info.availableFocalLengths'] 142 if len(focal_ls) > 1: 143 print 'Doing capture to determine logical camera focal length' 144 cap = cam.do_capture(its.objects.auto_capture_request()) 145 focal_l = cap['metadata']['android.lens.focalLength'] 146 else: 147 focal_l = focal_ls[0] 148 sensor_size = props['android.sensor.info.physicalSize'] 149 diag = math.sqrt(sensor_size['height'] ** 2 + 150 sensor_size['width'] ** 2) 151 try: 152 fov = str(round(2 * math.degrees(math.atan(diag / (2 * focal_l))), 2)) 153 except ValueError: 154 fov = str(0) 155 print 'Calculated FoV: %s' % fov 156 return fov 157 158 159def evaluate_socket_failure(err_file_path): 160 """Determine if test fails due to socket FAIL.""" 161 socket_fail = False 162 with open(err_file_path, 'r') as ferr: 163 for line in ferr: 164 if (line.find('socket.error') != -1 or 165 line.find('socket.timeout') != -1 or 166 line.find('Problem with socket') != -1): 167 socket_fail = True 168 return socket_fail 169 170 171def skip_sensor_fusion(camera_id): 172 """Determine if sensor fusion test is skipped for this camera.""" 173 174 skip_code = SKIP_RET_CODE 175 with ItsSession(camera_id) as cam: 176 props = cam.get_camera_properties() 177 if (its.caps.sensor_fusion(props) and its.caps.manual_sensor(props) and 178 props['android.lens.facing'] is not FACING_EXTERNAL): 179 skip_code = None 180 return skip_code 181 182 183def main(): 184 """Run all the automated tests, saving intermediate files, and producing 185 a summary/report of the results. 186 187 Script should be run from the top-level CameraITS directory. 188 189 Command line arguments: 190 camera: the camera(s) to be tested. Use comma to separate multiple 191 camera Ids. Ex: "camera=0,1" or "camera=1" 192 device: device id for adb 193 scenes: the test scene(s) to be executed. Use comma to separate 194 multiple scenes. Ex: "scenes=scene0,scene1" or 195 "scenes=0,1,sensor_fusion" (sceneX can be abbreviated by X 196 where X is a integer) 197 chart: [Experimental] another android device served as test chart 198 display. When this argument presents, change of test scene 199 will be handled automatically. Note that this argument 200 requires special physical/hardware setup to work and may not 201 work on all android devices. 202 result: Device ID to forward results to (in addition to the device 203 that the tests are running on). 204 rot_rig: [Experimental] ID of the rotation rig being used (formatted as 205 "<vendor ID>:<product ID>:<channel #>" or "default") 206 tmp_dir: location of temp directory for output files 207 skip_scene_validation: force skip scene validation. Used when test scene 208 is setup up front and don't require tester validation. 209 dist: [Experimental] chart distance in cm. 210 """ 211 212 all_scenes = ["scene0", "scene1", "scene2", "scene2b", "scene2c", "scene3", "scene4", "scene5", 213 "sensor_fusion"] 214 215 auto_scenes = ["scene0", "scene1", "scene2", "scene2b", "scene2c", "scene3", "scene4"] 216 217 scene_req = { 218 "scene0": None, 219 "scene1": "A grey card covering at least the middle 30% of the scene", 220 "scene2": "A picture containing human faces", 221 "scene2b": "A picture containing human faces", 222 "scene2c": "A picture containing human faces", 223 "scene3": "The ISO 12233 chart", 224 "scene4": "A specific test page of a circle covering at least the " 225 "middle 50% of the scene. See CameraITS.pdf section 2.3.4 " 226 "for more details", 227 "scene5": "Capture images with a diffuser attached to the camera. See " 228 "CameraITS.pdf section 2.3.4 for more details", 229 "sensor_fusion": "Rotating checkboard pattern. See " 230 "sensor_fusion/SensorFusion.pdf for detailed " 231 "instructions.\nNote that this test will be skipped " 232 "on devices not supporting REALTIME camera timestamp." 233 } 234 scene_extra_args = { 235 "scene5": ["doAF=False"] 236 } 237 238 camera_id_combos = [] 239 scenes = [] 240 chart_host_id = None 241 result_device_id = None 242 rot_rig_id = None 243 tmp_dir = None 244 skip_scene_validation = False 245 chart_distance = CHART_DISTANCE 246 chart_level = CHART_LEVEL 247 one_camera_argv = sys.argv[1:] 248 249 for s in list(sys.argv[1:]): 250 if s[:7] == "camera=" and len(s) > 7: 251 camera_ids = s[7:].split(',') 252 camera_id_combos = its.device.parse_camera_ids(camera_ids) 253 one_camera_argv.remove(s) 254 elif s[:7] == "scenes=" and len(s) > 7: 255 scenes = s[7:].split(',') 256 elif s[:6] == 'chart=' and len(s) > 6: 257 chart_host_id = s[6:] 258 elif s[:7] == 'result=' and len(s) > 7: 259 result_device_id = s[7:] 260 elif s[:8] == 'rot_rig=' and len(s) > 8: 261 rot_rig_id = s[8:] # valid values: 'default' or '$VID:$PID:$CH' 262 # The default '$VID:$PID:$CH' is '04d8:fc73:1' 263 elif s[:8] == 'tmp_dir=' and len(s) > 8: 264 tmp_dir = s[8:] 265 elif s == 'skip_scene_validation': 266 skip_scene_validation = True 267 elif s[:5] == 'dist=' and len(s) > 5: 268 chart_distance = float(re.sub('cm', '', s[5:])) 269 elif s[:11] == 'brightness=' and len(s) > 11: 270 chart_level = s[11:] 271 272 chart_dist_arg = 'dist= ' + str(chart_distance) 273 chart_level_arg = 'brightness=' + str(chart_level) 274 auto_scene_switch = chart_host_id is not None 275 merge_result_switch = result_device_id is not None 276 277 # Run through all scenes if user does not supply one 278 possible_scenes = auto_scenes if auto_scene_switch else all_scenes 279 if not scenes: 280 scenes = possible_scenes 281 else: 282 # Validate user input scene names 283 valid_scenes = True 284 temp_scenes = [] 285 for s in scenes: 286 if s in possible_scenes: 287 temp_scenes.append(s) 288 else: 289 try: 290 # Try replace "X" to "sceneX" 291 scene_str = "scene" + s 292 if scene_str not in possible_scenes: 293 valid_scenes = False 294 break 295 temp_scenes.append(scene_str) 296 except ValueError: 297 valid_scenes = False 298 break 299 300 if not valid_scenes: 301 print 'Unknown scene specified:', s 302 assert False 303 scenes = temp_scenes 304 305 # Make output directories to hold the generated files. 306 topdir = tempfile.mkdtemp(dir=tmp_dir) 307 subprocess.call(['chmod', 'g+rx', topdir]) 308 print "Saving output files to:", topdir, "\n" 309 310 device_id = its.device.get_device_id() 311 device_id_arg = "device=" + device_id 312 print "Testing device " + device_id 313 314 # Check CtsVerifier SDK level 315 # Here we only do warning as there is no guarantee on pm dump output formt not changed 316 # Also sometimes it's intentional to run mismatched versions 317 cmd = "adb -s %s shell pm dump com.android.cts.verifier" % (device_id) 318 dump_path = os.path.join(topdir, 'CtsVerifier.txt') 319 with open(dump_path, 'w') as fout: 320 fout.write('ITS minimum supported SDK version is %d\n--\n' % (MIN_SUPPORTED_SDK_VERSION)) 321 fout.flush() 322 ret_code = subprocess.call(cmd.split(), stdout=fout) 323 324 if ret_code != 0: 325 print "Warning: cannot get CtsVerifier SDK version. Is CtsVerifier installed?" 326 327 ctsv_version = None 328 ctsv_version_name = None 329 with open(dump_path, 'r') as f: 330 target_sdk_found = False 331 version_name_found = False 332 for line in f: 333 match = re.search('targetSdk=([0-9]+)', line) 334 if match: 335 ctsv_version = int(match.group(1)) 336 target_sdk_found = True 337 match = re.search('versionName=([\S]+)$', line) 338 if match: 339 ctsv_version_name = match.group(1) 340 version_name_found = True 341 if target_sdk_found and version_name_found: 342 break 343 344 if ctsv_version is None: 345 print "Warning: cannot get CtsVerifier SDK version. Is CtsVerifier installed?" 346 elif ctsv_version < MIN_SUPPORTED_SDK_VERSION: 347 print "Warning: CtsVerifier version (%d) < ITS version (%d), is this intentional?" % ( 348 ctsv_version, MIN_SUPPORTED_SDK_VERSION) 349 else: 350 print "CtsVerifier targetSdk is", ctsv_version 351 if ctsv_version_name: 352 print "CtsVerifier version name is", ctsv_version_name 353 354 # Hard check on ItsService/host script version that should catch incompatible APK/script 355 with ItsSession() as cam: 356 cam.check_its_version_compatible() 357 358 # Correctness check for devices 359 device_bfp = its.device.get_device_fingerprint(device_id) 360 assert device_bfp is not None 361 362 if auto_scene_switch: 363 chart_host_bfp = its.device.get_device_fingerprint(chart_host_id) 364 assert chart_host_bfp is not None 365 366 if merge_result_switch: 367 result_device_bfp = its.device.get_device_fingerprint(result_device_id) 368 assert_err_msg = ('Cannot merge result to a different build, from ' 369 '%s to %s' % (device_bfp, result_device_bfp)) 370 assert device_bfp == result_device_bfp, assert_err_msg 371 372 # user doesn't specify camera id, run through all cameras 373 if not camera_id_combos: 374 with its.device.ItsSession() as cam: 375 camera_ids = cam.get_camera_ids() 376 camera_id_combos = its.device.parse_camera_ids(camera_ids); 377 378 print "Running ITS on camera: %s, scene %s" % (camera_id_combos, scenes) 379 380 if auto_scene_switch: 381 # merge_result only supports run_parallel_tests 382 if merge_result_switch and camera_ids[0] == "1": 383 print "Skip chart screen" 384 time.sleep(1) 385 else: 386 print "Waking up chart screen: ", chart_host_id 387 screen_id_arg = ("screen=%s" % chart_host_id) 388 cmd = ["python", os.path.join(os.environ["CAMERA_ITS_TOP"], "tools", 389 "wake_up_screen.py"), screen_id_arg, 390 chart_level_arg] 391 wake_code = subprocess.call(cmd) 392 assert wake_code == 0 393 394 for id_combo in camera_id_combos: 395 # Initialize test results 396 results = {} 397 result_key = ItsSession.RESULT_KEY 398 for s in all_scenes: 399 results[s] = {result_key: ItsSession.RESULT_NOT_EXECUTED} 400 401 camera_fov = calc_camera_fov(id_combo.id, id_combo.sub_id) 402 id_combo_string = id_combo.id; 403 has_hidden_sub_camera = id_combo.sub_id is not None 404 if has_hidden_sub_camera: 405 id_combo_string += ":" + id_combo.sub_id 406 scenes = [scene for scene in scenes if HIDDEN_PHYSICAL_CAMERA_TESTS[scene]] 407 # Loop capturing images until user confirm test scene is correct 408 camera_id_arg = "camera=" + id_combo.id 409 print "Preparing to run ITS on camera", id_combo_string, "for scenes ", scenes 410 411 os.mkdir(os.path.join(topdir, id_combo_string)) 412 for d in scenes: 413 os.mkdir(os.path.join(topdir, id_combo_string, d)) 414 415 tot_tests = [] 416 tot_pass = 0 417 for scene in scenes: 418 # unit is millisecond for execution time record in CtsVerifier 419 scene_start_time = int(round(time.time() * 1000)) 420 skip_code = None 421 tests = [(s[:-3], os.path.join("tests", scene, s)) 422 for s in os.listdir(os.path.join("tests", scene)) 423 if s[-3:] == ".py" and s[:4] == "test"] 424 tests.sort() 425 tot_tests.extend(tests) 426 427 summary = "Cam" + id_combo_string + " " + scene + "\n" 428 numpass = 0 429 numskip = 0 430 num_not_mandated_fail = 0 431 numfail = 0 432 validate_switch = True 433 if scene_req[scene] is not None: 434 out_path = os.path.join(topdir, id_combo_string, scene+".jpg") 435 out_arg = "out=" + out_path 436 if scene == 'sensor_fusion': 437 skip_code = skip_sensor_fusion(id_combo.id) 438 if rot_rig_id or skip_code == SKIP_RET_CODE: 439 validate_switch = False 440 if skip_scene_validation: 441 validate_switch = False 442 cmd = None 443 if auto_scene_switch: 444 if (not merge_result_switch or 445 (merge_result_switch and id_combo_string == '0')): 446 scene_arg = 'scene=' + scene 447 fov_arg = 'fov=' + camera_fov 448 cmd = ['python', 449 os.path.join(os.getcwd(), 'tools/load_scene.py'), 450 scene_arg, chart_dist_arg, fov_arg, screen_id_arg] 451 else: 452 time.sleep(CHART_DELAY) 453 else: 454 # Skip scene validation under certain conditions 455 if validate_switch and not merge_result_switch: 456 scene_arg = 'scene=' + scene_req[scene] 457 extra_args = scene_extra_args.get(scene, []) 458 cmd = ['python', 459 os.path.join(os.getcwd(), 460 'tools/validate_scene.py'), 461 camera_id_arg, out_arg, 462 scene_arg, device_id_arg] + extra_args 463 if cmd is not None: 464 valid_scene_code = subprocess.call(cmd, cwd=topdir) 465 assert valid_scene_code == 0 466 print 'Start running ITS on camera %s, %s' % (id_combo_string, scene) 467 # Extract chart from scene for scene3 once up front 468 chart_loc_arg = '' 469 chart_height = CHART_HEIGHT 470 if scene == 'scene3': 471 chart_height *= its.cv2image.calc_chart_scaling( 472 chart_distance, camera_fov) 473 chart = its.cv2image.Chart(SCENE3_FILE, chart_height, 474 chart_distance, CHART_SCALE_START, 475 CHART_SCALE_STOP, CHART_SCALE_STEP, 476 id_combo.id) 477 chart_loc_arg = 'chart_loc=%.2f,%.2f,%.2f,%.2f,%.3f' % ( 478 chart.xnorm, chart.ynorm, chart.wnorm, chart.hnorm, 479 chart.scale) 480 # Run each test, capturing stdout and stderr. 481 for (testname, testpath) in tests: 482 # Only pick predefined tests for hidden physical camera 483 if has_hidden_sub_camera and \ 484 testname not in HIDDEN_PHYSICAL_CAMERA_TESTS[scene]: 485 numskip += 1 486 continue 487 if auto_scene_switch: 488 if merge_result_switch and id_combo_string == '0': 489 # Send an input event to keep the screen not dimmed. 490 # Since we are not using camera of chart screen, FOCUS event 491 # should do nothing but keep the screen from dimming. 492 # The "sleep after x minutes of inactivity" display setting 493 # determines how long this command can keep screen bright. 494 # Setting it to something like 30 minutes should be enough. 495 cmd = ('adb -s %s shell input keyevent FOCUS' 496 % chart_host_id) 497 subprocess.call(cmd.split()) 498 t0 = time.time() 499 for num_try in range(NUM_TRYS): 500 outdir = os.path.join(topdir, id_combo_string, scene) 501 outpath = os.path.join(outdir, testname+'_stdout.txt') 502 errpath = os.path.join(outdir, testname+'_stderr.txt') 503 if scene == 'sensor_fusion': 504 if skip_code is not SKIP_RET_CODE: 505 if rot_rig_id: 506 print 'Rotating phone w/ rig %s' % rot_rig_id 507 rig = ('python tools/rotation_rig.py rotator=%s' % 508 rot_rig_id) 509 subprocess.Popen(rig.split()) 510 else: 511 print 'Rotate phone 15s as shown in SensorFusion.pdf' 512 else: 513 test_code = skip_code 514 if skip_code is not SKIP_RET_CODE: 515 cmd = ['python', os.path.join(os.getcwd(), testpath)] 516 cmd += one_camera_argv + ["camera="+id_combo_string] + [chart_loc_arg] 517 cmd += [chart_dist_arg] 518 with open(outpath, 'w') as fout, open(errpath, 'w') as ferr: 519 test_code = run_subprocess_with_timeout( 520 cmd, fout, ferr, outdir) 521 if test_code == 0 or test_code == SKIP_RET_CODE: 522 break 523 else: 524 socket_fail = evaluate_socket_failure(errpath) 525 if socket_fail or test_code == PROC_TIMEOUT_CODE: 526 if num_try != NUM_TRYS-1: 527 print ' Retry %s/%s' % (scene, testname) 528 else: 529 break 530 else: 531 break 532 t1 = time.time() 533 534 test_failed = False 535 if test_code == 0: 536 retstr = "PASS " 537 numpass += 1 538 elif test_code == SKIP_RET_CODE: 539 retstr = "SKIP " 540 numskip += 1 541 elif test_code != 0 and testname in NOT_YET_MANDATED[scene]: 542 retstr = "FAIL*" 543 num_not_mandated_fail += 1 544 else: 545 retstr = "FAIL " 546 numfail += 1 547 test_failed = True 548 549 msg = "%s %s/%s [%.1fs]" % (retstr, scene, testname, t1-t0) 550 print msg 551 its.device.adb_log(device_id, msg) 552 msg_short = "%s %s [%.1fs]" % (retstr, testname, t1-t0) 553 if test_failed: 554 summary += msg_short + "\n" 555 556 # unit is millisecond for execution time record in CtsVerifier 557 scene_end_time = int(round(time.time() * 1000)) 558 559 if numskip > 0: 560 skipstr = ", %d test%s skipped" % ( 561 numskip, "s" if numskip > 1 else "") 562 else: 563 skipstr = "" 564 565 test_result = "\n%d / %d tests passed (%.1f%%)%s" % ( 566 numpass + num_not_mandated_fail, len(tests) - numskip, 567 100.0 * float(numpass + num_not_mandated_fail) / 568 (len(tests) - numskip) 569 if len(tests) != numskip else 100.0, skipstr) 570 print test_result 571 572 if num_not_mandated_fail > 0: 573 msg = "(*) tests are not yet mandated" 574 print msg 575 576 tot_pass += numpass 577 print "%s compatibility score: %.f/100\n" % ( 578 scene, 100.0 * numpass / len(tests)) 579 580 summary_path = os.path.join(topdir, id_combo_string, scene, "summary.txt") 581 with open(summary_path, "w") as f: 582 f.write(summary) 583 584 passed = numfail == 0 585 results[scene][result_key] = (ItsSession.RESULT_PASS if passed 586 else ItsSession.RESULT_FAIL) 587 results[scene][ItsSession.SUMMARY_KEY] = summary_path 588 results[scene][ItsSession.START_TIME_KEY] = scene_start_time 589 results[scene][ItsSession.END_TIME_KEY] = scene_end_time 590 591 if tot_tests: 592 print "Compatibility Score: %.f/100" % (100.0 * tot_pass / len(tot_tests)) 593 else: 594 print "Compatibility Score: 0/100" 595 596 msg = "Reporting ITS result to CtsVerifier" 597 print msg 598 its.device.adb_log(device_id, msg) 599 if merge_result_switch: 600 # results are modified by report_result 601 results_backup = copy.deepcopy(results) 602 its.device.report_result(result_device_id, id_combo_string, results_backup) 603 604 # Report hidden_physical_id results as well. 605 its.device.report_result(device_id, id_combo_string, results) 606 607 if auto_scene_switch: 608 if merge_result_switch: 609 print 'Skip shutting down chart screen' 610 else: 611 print 'Shutting down chart screen: ', chart_host_id 612 screen_id_arg = ('screen=%s' % chart_host_id) 613 cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools', 614 'turn_off_screen.py'), screen_id_arg] 615 screen_off_code = subprocess.call(cmd) 616 assert screen_off_code == 0 617 618 print 'Shutting down DUT screen: ', device_id 619 screen_id_arg = ('screen=%s' % device_id) 620 cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools', 621 'turn_off_screen.py'), screen_id_arg] 622 screen_off_code = subprocess.call(cmd) 623 assert screen_off_code == 0 624 625 print "ITS tests finished. Please go back to CtsVerifier and proceed" 626 627if __name__ == '__main__': 628 main() 629