1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import os 19import random 20import re 21import string 22import threading 23import time 24from queue import Empty 25from subprocess import call 26 27from acts import asserts 28from acts.test_utils.bt.bt_constants import adv_fail 29from acts.test_utils.bt.bt_constants import adv_succ 30from acts.test_utils.bt.bt_constants import batch_scan_not_supported_list 31from acts.test_utils.bt.bt_constants import batch_scan_result 32from acts.test_utils.bt.bt_constants import bits_per_samples 33from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes 34from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers 35from acts.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed 36from acts.test_utils.bt.bt_constants import bluetooth_off 37from acts.test_utils.bt.bt_constants import bluetooth_on 38from acts.test_utils.bt.bt_constants import \ 39 bluetooth_profile_connection_state_changed 40from acts.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid 41from acts.test_utils.bt.bt_constants import bt_default_timeout 42from acts.test_utils.bt.bt_constants import bt_profile_constants 43from acts.test_utils.bt.bt_constants import bt_profile_states 44from acts.test_utils.bt.bt_constants import bt_rfcomm_uuids 45from acts.test_utils.bt.bt_constants import bt_scan_mode_types 46from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device 47from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device 48from acts.test_utils.bt.bt_constants import channel_modes 49from acts.test_utils.bt.bt_constants import codec_types 50from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms 51from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms 52from acts.test_utils.bt.bt_constants import hid_id_keyboard 53from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation 54from acts.test_utils.bt.bt_constants import pan_connect_timeout 55from acts.test_utils.bt.bt_constants import sample_rates 56from acts.test_utils.bt.bt_constants import scan_result 57from acts.test_utils.bt.bt_constants import sig_uuid_constants 58from acts.test_utils.bt.bt_constants import small_timeout 59from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb 60from acts.test_utils.tel.tel_test_utils import verify_http_connection 61from acts.utils import exe_cmd 62 63from acts import utils 64 65log = logging 66 67advertisements_to_devices = {} 68 69 70class BtTestUtilsError(Exception): 71 pass 72 73 74def _add_android_device_to_dictionary(android_device, profile_list, 75 selector_dict): 76 """Adds the AndroidDevice and supported features to the selector dictionary 77 78 Args: 79 android_device: The Android device. 80 profile_list: The list of profiles the Android device supports. 81 """ 82 for profile in profile_list: 83 if profile in selector_dict and android_device not in selector_dict[ 84 profile]: 85 selector_dict[profile].append(android_device) 86 else: 87 selector_dict[profile] = [android_device] 88 89 90def bluetooth_enabled_check(ad, timeout_sec=5): 91 """Checks if the Bluetooth state is enabled, if not it will attempt to 92 enable it. 93 94 Args: 95 ad: The Android device list to enable Bluetooth on. 96 timeout_sec: number of seconds to wait for toggle to take effect. 97 98 Returns: 99 True if successful, false if unsuccessful. 100 """ 101 if not ad.droid.bluetoothCheckState(): 102 ad.droid.bluetoothToggleState(True) 103 expected_bluetooth_on_event_name = bluetooth_on 104 try: 105 ad.ed.pop_event(expected_bluetooth_on_event_name, 106 bt_default_timeout) 107 except Empty: 108 ad.log.info( 109 "Failed to toggle Bluetooth on(no broadcast received).") 110 # Try one more time to poke at the actual state. 111 if ad.droid.bluetoothCheckState(): 112 ad.log.info(".. actual state is ON") 113 return True 114 ad.log.error(".. actual state is OFF") 115 return False 116 end_time = time.time() + timeout_sec 117 while not ad.droid.bluetoothCheckState() and time.time() < end_time: 118 time.sleep(1) 119 return ad.droid.bluetoothCheckState() 120 121 122def check_device_supported_profiles(droid): 123 """Checks for Android device supported profiles. 124 125 Args: 126 droid: The droid object to query. 127 128 Returns: 129 A dictionary of supported profiles. 130 """ 131 profile_dict = {} 132 profile_dict['hid'] = droid.bluetoothHidIsReady() 133 profile_dict['hsp'] = droid.bluetoothHspIsReady() 134 profile_dict['a2dp'] = droid.bluetoothA2dpIsReady() 135 profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady() 136 profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady() 137 profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady() 138 profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady() 139 return profile_dict 140 141 142def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list, 143 adv_android_device, adv_callback_list): 144 """Try to gracefully stop all scanning and advertising instances. 145 146 Args: 147 scn_android_device: The Android device that is actively scanning. 148 scn_callback_list: The scan callback id list that needs to be stopped. 149 adv_android_device: The Android device that is actively advertising. 150 adv_callback_list: The advertise callback id list that needs to be 151 stopped. 152 """ 153 scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed 154 adv_droid = adv_android_device.droid 155 try: 156 for scan_callback in scn_callback_list: 157 scan_droid.bleStopBleScan(scan_callback) 158 except Exception as err: 159 scn_android_device.log.debug( 160 "Failed to stop LE scan... reseting Bluetooth. Error {}".format( 161 err)) 162 reset_bluetooth([scn_android_device]) 163 try: 164 for adv_callback in adv_callback_list: 165 adv_droid.bleStopBleAdvertising(adv_callback) 166 except Exception as err: 167 adv_android_device.log.debug( 168 "Failed to stop LE advertisement... reseting Bluetooth. Error {}". 169 format(err)) 170 reset_bluetooth([adv_android_device]) 171 172 173def clear_bonded_devices(ad): 174 """Clear bonded devices from the input Android device. 175 176 Args: 177 ad: the Android device performing the connection. 178 Returns: 179 True if clearing bonded devices was successful, false if unsuccessful. 180 """ 181 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 182 while bonded_device_list: 183 device_address = bonded_device_list[0]['address'] 184 if not ad.droid.bluetoothUnbond(device_address): 185 log.error("Failed to unbond {} from {}".format( 186 device_address, ad.serial)) 187 return False 188 log.info("Successfully unbonded {} from {}".format( 189 device_address, ad.serial)) 190 #TODO: wait for BOND_STATE_CHANGED intent instead of waiting 191 time.sleep(1) 192 193 # If device was first connected using LE transport, after bonding it is 194 # accessible through it's LE address, and through it classic address. 195 # Unbonding it will unbond two devices representing different 196 # "addresses". Attempt to unbond such already unbonded devices will 197 # result in bluetoothUnbond returning false. 198 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 199 return True 200 201 202def connect_phone_to_headset(android, 203 headset, 204 timeout=bt_default_timeout, 205 connection_check_period=10): 206 """Connects android phone to bluetooth headset. 207 Headset object must have methods power_on and enter_pairing_mode, 208 and attribute mac_address. 209 210 Args: 211 android: AndroidDevice object with SL4A installed. 212 headset: Object with attribute mac_address and methods power_on and 213 enter_pairing_mode. 214 timeout: Seconds to wait for devices to connect. 215 connection_check_period: how often to check for connection once the 216 SL4A connect RPC has been sent. 217 Returns: 218 connected (bool): True if devices are paired and connected by end of 219 method. False otherwise. 220 """ 221 headset_mac_address = headset.mac_address 222 connected = is_a2dp_src_device_connected(android, headset_mac_address) 223 log.info('Devices connected before pair attempt: %s' % connected) 224 if not connected: 225 # Turn on headset and initiate pairing mode. 226 headset.enter_pairing_mode() 227 android.droid.bluetoothStartPairingHelper() 228 start_time = time.time() 229 # If already connected, skip pair and connect attempt. 230 while not connected and (time.time() - start_time < timeout): 231 bonded_info = android.droid.bluetoothGetBondedDevices() 232 if headset.mac_address not in [ 233 info["address"] for info in bonded_info 234 ]: 235 # Use SL4A to pair and connect with headset. 236 headset.enter_pairing_mode() 237 android.droid.bluetoothDiscoverAndBond(headset_mac_address) 238 else: # Device is bonded but not connected 239 android.droid.bluetoothConnectBonded(headset_mac_address) 240 log.info('Waiting for connection...') 241 time.sleep(connection_check_period) 242 # Check for connection. 243 connected = is_a2dp_src_device_connected(android, headset_mac_address) 244 log.info('Devices connected after pair attempt: %s' % connected) 245 return connected 246 247 248def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2): 249 """Connects pri droid to secondary droid. 250 251 Args: 252 pri_ad: AndroidDroid initiating connection 253 sec_ad: AndroidDroid accepting connection 254 profiles_set: Set of profiles to be connected 255 attempts: Number of attempts to try until failure. 256 257 Returns: 258 Pass if True 259 Fail if False 260 """ 261 device_addr = sec_ad.droid.bluetoothGetLocalAddress() 262 # Allows extra time for the SDP records to be updated. 263 time.sleep(2) 264 curr_attempts = 0 265 while curr_attempts < attempts: 266 log.info("connect_pri_to_sec curr attempt {} total {}".format( 267 curr_attempts, attempts)) 268 if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 269 return True 270 curr_attempts += 1 271 log.error("connect_pri_to_sec failed to connect after {} attempts".format( 272 attempts)) 273 return False 274 275 276def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 277 """Connects pri droid to secondary droid. 278 279 Args: 280 pri_ad: AndroidDroid initiating connection. 281 sec_ad: AndroidDroid accepting connection. 282 profiles_set: Set of profiles to be connected. 283 284 Returns: 285 True of connection is successful, false if unsuccessful. 286 """ 287 # Check if we support all profiles. 288 supported_profiles = bt_profile_constants.values() 289 for profile in profiles_set: 290 if profile not in supported_profiles: 291 pri_ad.log.info("Profile {} is not supported list {}".format( 292 profile, supported_profiles)) 293 return False 294 295 # First check that devices are bonded. 296 paired = False 297 for paired_device in pri_ad.droid.bluetoothGetBondedDevices(): 298 if paired_device['address'] == \ 299 sec_ad.droid.bluetoothGetLocalAddress(): 300 paired = True 301 break 302 303 if not paired: 304 pri_ad.log.error("Not paired to {}".format(sec_ad.serial)) 305 return False 306 307 # Now try to connect them, the following call will try to initiate all 308 # connections. 309 pri_ad.droid.bluetoothConnectBonded( 310 sec_ad.droid.bluetoothGetLocalAddress()) 311 312 end_time = time.time() + 10 313 profile_connected = set() 314 sec_addr = sec_ad.droid.bluetoothGetLocalAddress() 315 pri_ad.log.info("Profiles to be connected {}".format(profiles_set)) 316 # First use APIs to check profile connection state 317 while (time.time() < end_time 318 and not profile_connected.issuperset(profiles_set)): 319 if (bt_profile_constants['headset_client'] not in profile_connected 320 and bt_profile_constants['headset_client'] in profiles_set): 321 if is_hfp_client_device_connected(pri_ad, sec_addr): 322 profile_connected.add(bt_profile_constants['headset_client']) 323 if (bt_profile_constants['a2dp'] not in profile_connected 324 and bt_profile_constants['a2dp'] in profiles_set): 325 if is_a2dp_src_device_connected(pri_ad, sec_addr): 326 profile_connected.add(bt_profile_constants['a2dp']) 327 if (bt_profile_constants['a2dp_sink'] not in profile_connected 328 and bt_profile_constants['a2dp_sink'] in profiles_set): 329 if is_a2dp_snk_device_connected(pri_ad, sec_addr): 330 profile_connected.add(bt_profile_constants['a2dp_sink']) 331 if (bt_profile_constants['map_mce'] not in profile_connected 332 and bt_profile_constants['map_mce'] in profiles_set): 333 if is_map_mce_device_connected(pri_ad, sec_addr): 334 profile_connected.add(bt_profile_constants['map_mce']) 335 if (bt_profile_constants['map'] not in profile_connected 336 and bt_profile_constants['map'] in profiles_set): 337 if is_map_mse_device_connected(pri_ad, sec_addr): 338 profile_connected.add(bt_profile_constants['map']) 339 time.sleep(0.1) 340 # If APIs fail, try to find the connection broadcast receiver. 341 while not profile_connected.issuperset(profiles_set): 342 try: 343 profile_event = pri_ad.ed.pop_event( 344 bluetooth_profile_connection_state_changed, 345 bt_default_timeout + 10) 346 pri_ad.log.info("Got event {}".format(profile_event)) 347 except Exception: 348 pri_ad.log.error("Did not get {} profiles left {}".format( 349 bluetooth_profile_connection_state_changed, profile_connected)) 350 return False 351 352 profile = profile_event['data']['profile'] 353 state = profile_event['data']['state'] 354 device_addr = profile_event['data']['addr'] 355 if state == bt_profile_states['connected'] and \ 356 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 357 profile_connected.add(profile) 358 pri_ad.log.info( 359 "Profiles connected until now {}".format(profile_connected)) 360 # Failure happens inside the while loop. If we came here then we already 361 # connected. 362 return True 363 364 365def determine_max_advertisements(android_device): 366 """Determines programatically how many advertisements the Android device 367 supports. 368 369 Args: 370 android_device: The Android device to determine max advertisements of. 371 372 Returns: 373 The maximum advertisement count. 374 """ 375 android_device.log.info( 376 "Determining number of maximum concurrent advertisements...") 377 advertisement_count = 0 378 bt_enabled = False 379 expected_bluetooth_on_event_name = bluetooth_on 380 if not android_device.droid.bluetoothCheckState(): 381 android_device.droid.bluetoothToggleState(True) 382 try: 383 android_device.ed.pop_event(expected_bluetooth_on_event_name, 384 bt_default_timeout) 385 except Exception: 386 android_device.log.info( 387 "Failed to toggle Bluetooth on(no broadcast received).") 388 # Try one more time to poke at the actual state. 389 if android_device.droid.bluetoothCheckState() is True: 390 android_device.log.info(".. actual state is ON") 391 else: 392 android_device.log.error( 393 "Failed to turn Bluetooth on. Setting default advertisements to 1" 394 ) 395 advertisement_count = -1 396 return advertisement_count 397 advertise_callback_list = [] 398 advertise_data = android_device.droid.bleBuildAdvertiseData() 399 advertise_settings = android_device.droid.bleBuildAdvertiseSettings() 400 while (True): 401 advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() 402 advertise_callback_list.append(advertise_callback) 403 404 android_device.droid.bleStartBleAdvertising(advertise_callback, 405 advertise_data, 406 advertise_settings) 407 408 regex = "(" + adv_succ.format( 409 advertise_callback) + "|" + adv_fail.format( 410 advertise_callback) + ")" 411 # wait for either success or failure event 412 evt = android_device.ed.pop_events(regex, bt_default_timeout, 413 small_timeout) 414 if evt[0]["name"] == adv_succ.format(advertise_callback): 415 advertisement_count += 1 416 android_device.log.info( 417 "Advertisement {} started.".format(advertisement_count)) 418 else: 419 error = evt[0]["data"]["Error"] 420 if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS": 421 android_device.log.info( 422 "Advertisement failed to start. Reached max " + 423 "advertisements at {}".format(advertisement_count)) 424 break 425 else: 426 raise BtTestUtilsError( 427 "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," + 428 " but received bad error code {}".format(error)) 429 try: 430 for adv in advertise_callback_list: 431 android_device.droid.bleStopBleAdvertising(adv) 432 except Exception: 433 android_device.log.error( 434 "Failed to stop advertisingment, resetting Bluetooth.") 435 reset_bluetooth([android_device]) 436 return advertisement_count 437 438 439def disable_bluetooth(droid): 440 """Disable Bluetooth on input Droid object. 441 442 Args: 443 droid: The droid object to disable Bluetooth on. 444 445 Returns: 446 True if successful, false if unsuccessful. 447 """ 448 if droid.bluetoothCheckState() is True: 449 droid.bluetoothToggleState(False) 450 if droid.bluetoothCheckState() is True: 451 log.error("Failed to toggle Bluetooth off.") 452 return False 453 return True 454 455 456def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): 457 """ 458 Disconnect primary from secondary on a specific set of profiles 459 Args: 460 pri_ad - Primary android_device initiating disconnection 461 sec_ad - Secondary android droid (sl4a interface to keep the 462 method signature the same connect_pri_to_sec above) 463 profiles_list - List of profiles we want to disconnect from 464 465 Returns: 466 True on Success 467 False on Failure 468 """ 469 # Sanity check to see if all the profiles in the given set is supported 470 supported_profiles = bt_profile_constants.values() 471 for profile in profiles_list: 472 if profile not in supported_profiles: 473 pri_ad.log.info("Profile {} is not in supported list {}".format( 474 profile, supported_profiles)) 475 return False 476 477 pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices()) 478 # Disconnecting on a already disconnected profile is a nop, 479 # so not checking for the connection state 480 try: 481 pri_ad.droid.bluetoothDisconnectConnectedProfile( 482 sec_ad.droid.bluetoothGetLocalAddress(), profiles_list) 483 except Exception as err: 484 pri_ad.log.error( 485 "Exception while trying to disconnect profile(s) {}: {}".format( 486 profiles_list, err)) 487 return False 488 489 profile_disconnected = set() 490 pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list)) 491 492 while not profile_disconnected.issuperset(profiles_list): 493 try: 494 profile_event = pri_ad.ed.pop_event( 495 bluetooth_profile_connection_state_changed, bt_default_timeout) 496 pri_ad.log.info("Got event {}".format(profile_event)) 497 except Exception as e: 498 pri_ad.log.error( 499 "Did not disconnect from Profiles. Reason {}".format(e)) 500 return False 501 502 profile = profile_event['data']['profile'] 503 state = profile_event['data']['state'] 504 device_addr = profile_event['data']['addr'] 505 506 if state == bt_profile_states['disconnected'] and \ 507 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 508 profile_disconnected.add(profile) 509 pri_ad.log.info( 510 "Profiles disconnected so far {}".format(profile_disconnected)) 511 512 return True 513 514 515def enable_bluetooth(droid, ed): 516 if droid.bluetoothCheckState() is True: 517 return True 518 519 droid.bluetoothToggleState(True) 520 expected_bluetooth_on_event_name = bluetooth_on 521 try: 522 ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout) 523 except Exception: 524 log.info("Failed to toggle Bluetooth on (no broadcast received)") 525 if droid.bluetoothCheckState() is True: 526 log.info(".. actual state is ON") 527 return True 528 log.info(".. actual state is OFF") 529 return False 530 531 return True 532 533 534def factory_reset_bluetooth(android_devices): 535 """Clears Bluetooth stack of input Android device list. 536 537 Args: 538 android_devices: The Android device list to reset Bluetooth 539 540 Returns: 541 True if successful, false if unsuccessful. 542 """ 543 for a in android_devices: 544 droid, ed = a.droid, a.ed 545 a.log.info("Reset state of bluetooth on device.") 546 if not bluetooth_enabled_check(a): 547 return False 548 # TODO: remove device unbond b/79418045 549 # Temporary solution to ensure all devices are unbonded 550 bonded_devices = droid.bluetoothGetBondedDevices() 551 for b in bonded_devices: 552 a.log.info("Removing bond for device {}".format(b['address'])) 553 droid.bluetoothUnbond(b['address']) 554 555 droid.bluetoothFactoryReset() 556 wait_for_bluetooth_manager_state(droid) 557 if not enable_bluetooth(droid, ed): 558 return False 559 return True 560 561 562def generate_ble_advertise_objects(droid): 563 """Generate generic LE advertise objects. 564 565 Args: 566 droid: The droid object to generate advertise LE objects from. 567 568 Returns: 569 advertise_callback: The generated advertise callback id. 570 advertise_data: The generated advertise data id. 571 advertise_settings: The generated advertise settings id. 572 """ 573 advertise_callback = droid.bleGenBleAdvertiseCallback() 574 advertise_data = droid.bleBuildAdvertiseData() 575 advertise_settings = droid.bleBuildAdvertiseSettings() 576 return advertise_callback, advertise_data, advertise_settings 577 578 579def generate_ble_scan_objects(droid): 580 """Generate generic LE scan objects. 581 582 Args: 583 droid: The droid object to generate LE scan objects from. 584 585 Returns: 586 filter_list: The generated scan filter list id. 587 scan_settings: The generated scan settings id. 588 scan_callback: The generated scan callback id. 589 """ 590 filter_list = droid.bleGenFilterList() 591 scan_settings = droid.bleBuildScanSetting() 592 scan_callback = droid.bleGenScanCallback() 593 return filter_list, scan_settings, scan_callback 594 595 596def generate_id_by_size(size, 597 chars=(string.ascii_lowercase + 598 string.ascii_uppercase + string.digits)): 599 """Generate random ascii characters of input size and input char types 600 601 Args: 602 size: Input size of string. 603 chars: (Optional) Chars to use in generating a random string. 604 605 Returns: 606 String of random input chars at the input size. 607 """ 608 return ''.join(random.choice(chars) for _ in range(size)) 609 610 611def get_advanced_droid_list(android_devices): 612 """Add max_advertisement and batch_scan_supported attributes to input 613 Android devices 614 615 This will programatically determine maximum LE advertisements of each 616 input Android device. 617 618 Args: 619 android_devices: The Android devices to setup. 620 621 Returns: 622 List of Android devices with new attribtues. 623 """ 624 droid_list = [] 625 for a in android_devices: 626 d, e = a.droid, a.ed 627 model = d.getBuildModel() 628 max_advertisements = 1 629 batch_scan_supported = True 630 if model in advertisements_to_devices.keys(): 631 max_advertisements = advertisements_to_devices[model] 632 else: 633 max_advertisements = determine_max_advertisements(a) 634 max_tries = 3 635 # Retry to calculate max advertisements 636 while max_advertisements == -1 and max_tries > 0: 637 a.log.info( 638 "Attempts left to determine max advertisements: {}".format( 639 max_tries)) 640 max_advertisements = determine_max_advertisements(a) 641 max_tries -= 1 642 advertisements_to_devices[model] = max_advertisements 643 if model in batch_scan_not_supported_list: 644 batch_scan_supported = False 645 role = { 646 'droid': d, 647 'ed': e, 648 'max_advertisements': max_advertisements, 649 'batch_scan_supported': batch_scan_supported 650 } 651 droid_list.append(role) 652 return droid_list 653 654 655def get_bluetooth_crash_count(android_device): 656 out = android_device.adb.shell("dumpsys bluetooth_manager") 657 return int(re.search("crashed(.*\d)", out).group(1)) 658 659 660def read_otp(ad): 661 """Reads and parses the OTP output to return TX power backoff 662 663 Reads the OTP registers from the phone, parses them to return a 664 dict of TX power backoffs for different power levels 665 666 Args: 667 ad : android device object 668 669 Returns : 670 otp_dict : power backoff dict 671 """ 672 673 ad.adb.shell('svc bluetooth disable') 674 time.sleep(2) 675 otp_output = ad.adb.shell('bluetooth_sar_test -r') 676 ad.adb.shell('svc bluetooth enable') 677 time.sleep(2) 678 otp_dict = { 679 "BR": { 680 "10": 0, 681 "9": 0, 682 "8": 0 683 }, 684 "EDR": { 685 "10": 0, 686 "9": 0, 687 "8": 0 688 }, 689 "BLE": { 690 "10": 0, 691 "9": 0, 692 "8": 0 693 } 694 } 695 696 otp_regex = '\s+\[\s+PL10:\s+(\d+)\s+PL9:\s+(\d+)*\s+PL8:\s+(\d+)\s+\]' 697 698 for key in otp_dict: 699 bank_list = re.findall("{}{}".format(key, otp_regex), otp_output) 700 for bank_tuple in bank_list: 701 if ('0', '0', '0') != bank_tuple: 702 [otp_dict[key]["10"], otp_dict[key]["9"], 703 otp_dict[key]["8"]] = bank_tuple 704 return otp_dict 705 706 707def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True): 708 """ Function to get the bt metric from logcat. 709 710 Captures logcat for the specified duration and returns the bqr results. 711 Takes list of android objects as input. If a single android object is given, 712 converts it into a list. 713 714 Args: 715 ad_list: list of android_device objects 716 duration: time duration (seconds) for which the logcat is parsed. 717 tag: tag to be appended to the logcat dump. 718 processed: flag to process bqr output. 719 720 Returns: 721 metrics_dict: dict of metrics for each android device. 722 """ 723 724 # Defining bqr quantitites and their regex to extract 725 regex_dict = { 726 "vsp_txpl": "VSP_TxPL:\s(\S+)", 727 "pwlv": "PwLv:\s(\S+)", 728 "rssi": "RSSI:\s[-](\d+)" 729 } 730 metrics_dict = {"rssi": {}, "pwlv": {}, "vsp_txpl": {}} 731 732 # Converting a single android device object to list 733 if not isinstance(ad_list, list): 734 ad_list = [ad_list] 735 736 #Time sync with the test machine 737 for ad in ad_list: 738 ad.droid.setTime(int(round(time.time() * 1000))) 739 time.sleep(0.5) 740 741 begin_time = utils.get_current_epoch_time() 742 time.sleep(duration) 743 end_time = utils.get_current_epoch_time() 744 745 for ad in ad_list: 746 bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time) 747 bqr_tag = "Handle:" 748 749 # Extracting supporting bqr quantities 750 for metric, regex in regex_dict.items(): 751 bqr_metric = [] 752 file_bt_log = open(bt_rssi_log, "r") 753 for line in file_bt_log: 754 if bqr_tag in line: 755 if re.findall(regex, line): 756 m = re.findall(regex, line)[0].strip(",") 757 bqr_metric.append(m) 758 metrics_dict[metric][ad.serial] = bqr_metric 759 760 # Ensures back-compatibility for vsp_txpl enabled DUTs 761 if metrics_dict["vsp_txpl"][ad.serial]: 762 metrics_dict["pwlv"][ad.serial] = metrics_dict["vsp_txpl"][ 763 ad.serial] 764 765 # Formatting the raw data 766 metrics_dict["rssi"][ad.serial] = [ 767 (-1) * int(x) for x in metrics_dict["rssi"][ad.serial] 768 ] 769 metrics_dict["pwlv"][ad.serial] = [ 770 int(x, 16) for x in metrics_dict["pwlv"][ad.serial] 771 ] 772 773 # Processing formatted data if processing is required 774 if processed: 775 # Computes the average RSSI 776 metrics_dict["rssi"][ad.serial] = round( 777 sum(metrics_dict["rssi"][ad.serial]) / 778 len(metrics_dict["rssi"][ad.serial]), 2) 779 # Returns last noted value for power level 780 metrics_dict["pwlv"][ad.serial] = float( 781 sum(metrics_dict["pwlv"][ad.serial]) / 782 len(metrics_dict["pwlv"][ad.serial])) 783 784 return metrics_dict 785 786 787def get_bt_rssi(ad, duration=1, processed=True): 788 """Function to get average bt rssi from logcat. 789 790 This function returns the average RSSI for the given duration. RSSI values are 791 extracted from BQR. 792 793 Args: 794 ad: (list of) android_device object. 795 duration: time duration(seconds) for which logcat is parsed. 796 797 Returns: 798 avg_rssi: average RSSI on each android device for the given duration. 799 """ 800 function_tag = "get_bt_rssi" 801 bqr_results = get_bt_metric(ad, 802 duration, 803 tag=function_tag, 804 processed=processed) 805 return bqr_results["rssi"] 806 807 808def enable_bqr( 809 ad_list, 810 bqr_interval=10, 811 bqr_event_mask=15, 812): 813 """Sets up BQR reporting. 814 815 Sets up BQR to report BT metrics at the requested frequency and toggles 816 airplane mode for the bqr settings to take effect. 817 818 Args: 819 ad_list: an android_device or list of android devices. 820 """ 821 # Converting a single android device object to list 822 if not isinstance(ad_list, list): 823 ad_list = [ad_list] 824 825 for ad in ad_list: 826 #Setting BQR parameters 827 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 828 bqr_event_mask)) 829 ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format( 830 bqr_interval)) 831 832 ## Toggle airplane mode 833 ad.droid.connectivityToggleAirplaneMode(True) 834 ad.droid.connectivityToggleAirplaneMode(False) 835 836 837def disable_bqr(ad_list): 838 """Disables BQR reporting. 839 840 Args: 841 ad_list: an android_device or list of android devices. 842 """ 843 # Converting a single android device object to list 844 if not isinstance(ad_list, list): 845 ad_list = [ad_list] 846 847 DISABLE_BQR_MASK = 0 848 849 for ad in ad_list: 850 #Disabling BQR 851 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 852 DISABLE_BQR_MASK)) 853 854 ## Toggle airplane mode 855 ad.droid.connectivityToggleAirplaneMode(True) 856 ad.droid.connectivityToggleAirplaneMode(False) 857 858 859def get_device_selector_dictionary(android_device_list): 860 """Create a dictionary of Bluetooth features vs Android devices. 861 862 Args: 863 android_device_list: The list of Android devices. 864 Returns: 865 A dictionary of profiles/features to Android devices. 866 """ 867 selector_dict = {} 868 for ad in android_device_list: 869 uuids = ad.droid.bluetoothGetLocalUuids() 870 871 for profile, uuid_const in sig_uuid_constants.items(): 872 uuid_check = sig_uuid_constants['BASE_UUID'].format( 873 uuid_const).lower() 874 if uuids and uuid_check in uuids: 875 if profile in selector_dict: 876 selector_dict[profile].append(ad) 877 else: 878 selector_dict[profile] = [ad] 879 880 # Various services may not be active during BT startup. 881 # If the device can be identified through adb shell pm list features 882 # then try to add them to the appropriate profiles / features. 883 884 # Android TV. 885 if "feature:com.google.android.tv.installed" in ad.features: 886 ad.log.info("Android TV device found.") 887 supported_profiles = ['AudioSink'] 888 _add_android_device_to_dictionary(ad, supported_profiles, 889 selector_dict) 890 891 # Android Auto 892 elif "feature:android.hardware.type.automotive" in ad.features: 893 ad.log.info("Android Auto device found.") 894 # Add: AudioSink , A/V_RemoteControl, 895 supported_profiles = [ 896 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server' 897 ] 898 _add_android_device_to_dictionary(ad, supported_profiles, 899 selector_dict) 900 # Android Wear 901 elif "feature:android.hardware.type.watch" in ad.features: 902 ad.log.info("Android Wear device found.") 903 supported_profiles = [] 904 _add_android_device_to_dictionary(ad, supported_profiles, 905 selector_dict) 906 # Android Phone 907 elif "feature:android.hardware.telephony" in ad.features: 908 ad.log.info("Android Phone device found.") 909 # Add: AudioSink 910 supported_profiles = [ 911 'AudioSource', 'A/V_RemoteControlTarget', 912 'Message Access Server' 913 ] 914 _add_android_device_to_dictionary(ad, supported_profiles, 915 selector_dict) 916 return selector_dict 917 918 919def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): 920 """Start generic advertisement and get it's mac address by LE scanning. 921 922 Args: 923 scan_ad: The Android device to use as the scanner. 924 adv_ad: The Android device to use as the advertiser. 925 926 Returns: 927 mac_address: The mac address of the advertisement. 928 advertise_callback: The advertise callback id of the active 929 advertisement. 930 """ 931 adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 932 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 933 ble_advertise_settings_modes['low_latency']) 934 adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True) 935 adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel( 936 ble_advertise_settings_tx_powers['high']) 937 advertise_callback, advertise_data, advertise_settings = ( 938 generate_ble_advertise_objects(adv_ad.droid)) 939 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 940 advertise_settings) 941 try: 942 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 943 bt_default_timeout) 944 except Empty as err: 945 raise BtTestUtilsError( 946 "Advertiser did not start successfully {}".format(err)) 947 filter_list = scan_ad.droid.bleGenFilterList() 948 scan_settings = scan_ad.droid.bleBuildScanSetting() 949 scan_callback = scan_ad.droid.bleGenScanCallback() 950 scan_ad.droid.bleSetScanFilterDeviceName( 951 adv_ad.droid.bluetoothGetLocalName()) 952 scan_ad.droid.bleBuildScanFilter(filter_list) 953 scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 954 try: 955 event = scan_ad.ed.pop_event( 956 "BleScan{}onScanResults".format(scan_callback), bt_default_timeout) 957 except Empty as err: 958 raise BtTestUtilsError( 959 "Scanner did not find advertisement {}".format(err)) 960 mac_address = event['data']['Result']['deviceInfo']['address'] 961 return mac_address, advertise_callback, scan_callback 962 963 964def hid_device_send_key_data_report(host_id, device_ad, key, interval=1): 965 """Send a HID report simulating a 1-second keyboard press from host_ad to 966 device_ad 967 968 Args: 969 host_id: the Bluetooth MAC address or name of the HID host 970 device_ad: HID device 971 key: the key we want to send 972 interval: the interval between key press and key release 973 """ 974 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 975 hid_keyboard_report(key)) 976 time.sleep(interval) 977 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 978 hid_keyboard_report("00")) 979 980 981def hid_keyboard_report(key, modifier="00"): 982 """Get the HID keyboard report for the given key 983 984 Args: 985 key: the key we want 986 modifier: HID keyboard modifier bytes 987 Returns: 988 The byte array for the HID report. 989 """ 990 return str( 991 bytearray.fromhex(" ".join( 992 [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8") 993 994 995def is_a2dp_connected(sink, source): 996 """ 997 Convenience Function to see if the 2 devices are connected on 998 A2dp. 999 Args: 1000 sink: Audio Sink 1001 source: Audio Source 1002 Returns: 1003 True if Connected 1004 False if Not connected 1005 """ 1006 1007 devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices() 1008 for device in devices: 1009 sink.log.info("A2dp Connected device {}".format(device["name"])) 1010 if (device["address"] == source.droid.bluetoothGetLocalAddress()): 1011 return True 1012 return False 1013 1014 1015def is_a2dp_snk_device_connected(ad, addr): 1016 """Determines if an AndroidDevice has A2DP snk connectivity to input address 1017 1018 Args: 1019 ad: the Android device 1020 addr: the address that's expected 1021 Returns: 1022 True if connection was successful, false if unsuccessful. 1023 """ 1024 devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices() 1025 ad.log.info("Connected A2DP Sink devices: {}".format(devices)) 1026 if addr in {d['address'] for d in devices}: 1027 return True 1028 return False 1029 1030 1031def is_a2dp_src_device_connected(ad, addr): 1032 """Determines if an AndroidDevice has A2DP connectivity to input address 1033 1034 Args: 1035 ad: the Android device 1036 addr: the address that's expected 1037 Returns: 1038 True if connection was successful, false if unsuccessful. 1039 """ 1040 devices = ad.droid.bluetoothA2dpGetConnectedDevices() 1041 ad.log.info("Connected A2DP Source devices: {}".format(devices)) 1042 if addr in {d['address'] for d in devices}: 1043 return True 1044 return False 1045 1046 1047def is_hfp_client_device_connected(ad, addr): 1048 """Determines if an AndroidDevice has HFP connectivity to input address 1049 1050 Args: 1051 ad: the Android device 1052 addr: the address that's expected 1053 Returns: 1054 True if connection was successful, false if unsuccessful. 1055 """ 1056 devices = ad.droid.bluetoothHfpClientGetConnectedDevices() 1057 ad.log.info("Connected HFP Client devices: {}".format(devices)) 1058 if addr in {d['address'] for d in devices}: 1059 return True 1060 return False 1061 1062 1063def is_map_mce_device_connected(ad, addr): 1064 """Determines if an AndroidDevice has MAP MCE connectivity to input address 1065 1066 Args: 1067 ad: the Android device 1068 addr: the address that's expected 1069 Returns: 1070 True if connection was successful, false if unsuccessful. 1071 """ 1072 devices = ad.droid.bluetoothMapClientGetConnectedDevices() 1073 ad.log.info("Connected MAP MCE devices: {}".format(devices)) 1074 if addr in {d['address'] for d in devices}: 1075 return True 1076 return False 1077 1078 1079def is_map_mse_device_connected(ad, addr): 1080 """Determines if an AndroidDevice has MAP MSE connectivity to input address 1081 1082 Args: 1083 ad: the Android device 1084 addr: the address that's expected 1085 Returns: 1086 True if connection was successful, false if unsuccessful. 1087 """ 1088 devices = ad.droid.bluetoothMapGetConnectedDevices() 1089 ad.log.info("Connected MAP MSE devices: {}".format(devices)) 1090 if addr in {d['address'] for d in devices}: 1091 return True 1092 return False 1093 1094 1095def kill_bluetooth_process(ad): 1096 """Kill Bluetooth process on Android device. 1097 1098 Args: 1099 ad: Android device to kill BT process on. 1100 """ 1101 ad.log.info("Killing Bluetooth process.") 1102 pid = ad.adb.shell( 1103 "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii') 1104 call(["adb -s " + ad.serial + " shell kill " + pid], shell=True) 1105 1106 1107def log_energy_info(android_devices, state): 1108 """Logs energy info of input Android devices. 1109 1110 Args: 1111 android_devices: input Android device list to log energy info from. 1112 state: the input state to log. Usually 'Start' or 'Stop' for logging. 1113 1114 Returns: 1115 A logging string of the Bluetooth energy info reported. 1116 """ 1117 return_string = "{} Energy info collection:\n".format(state) 1118 # Bug: b/31966929 1119 return return_string 1120 1121 1122def orchestrate_and_verify_pan_connection(pan_dut, panu_dut): 1123 """Setups up a PAN conenction between two android devices. 1124 1125 Args: 1126 pan_dut: the Android device providing tethering services 1127 panu_dut: the Android device using the internet connection from the 1128 pan_dut 1129 Returns: 1130 True if PAN connection and verification is successful, 1131 false if unsuccessful. 1132 """ 1133 if not toggle_airplane_mode_by_adb(log, panu_dut, True): 1134 panu_dut.log.error("Failed to toggle airplane mode on") 1135 return False 1136 if not toggle_airplane_mode_by_adb(log, panu_dut, False): 1137 pan_dut.log.error("Failed to toggle airplane mode off") 1138 return False 1139 pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1140 panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1141 if not bluetooth_enabled_check(panu_dut): 1142 return False 1143 if not bluetooth_enabled_check(pan_dut): 1144 return False 1145 pan_dut.droid.bluetoothPanSetBluetoothTethering(True) 1146 if not (pair_pri_to_sec(pan_dut, panu_dut)): 1147 return False 1148 if not pan_dut.droid.bluetoothPanIsTetheringOn(): 1149 pan_dut.log.error("Failed to enable Bluetooth tethering.") 1150 return False 1151 # Magic sleep needed to give the stack time in between bonding and 1152 # connecting the PAN profile. 1153 time.sleep(pan_connect_timeout) 1154 panu_dut.droid.bluetoothConnectBonded( 1155 pan_dut.droid.bluetoothGetLocalAddress()) 1156 if not verify_http_connection(log, panu_dut): 1157 panu_dut.log.error("Can't verify http connection on PANU device.") 1158 if not verify_http_connection(log, pan_dut): 1159 pan_dut.log.info( 1160 "Can't verify http connection on PAN service device") 1161 return False 1162 return True 1163 1164 1165def orchestrate_bluetooth_socket_connection( 1166 client_ad, 1167 server_ad, 1168 accept_timeout_ms=default_bluetooth_socket_timeout_ms, 1169 uuid=None): 1170 """Sets up the Bluetooth Socket connection between two Android devices. 1171 1172 Args: 1173 client_ad: the Android device performing the connection. 1174 server_ad: the Android device accepting the connection. 1175 Returns: 1176 True if connection was successful, false if unsuccessful. 1177 """ 1178 server_ad.droid.bluetoothStartPairingHelper() 1179 client_ad.droid.bluetoothStartPairingHelper() 1180 1181 server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid( 1182 (bluetooth_socket_conn_test_uuid if uuid is None else uuid), 1183 accept_timeout_ms) 1184 client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid( 1185 server_ad.droid.bluetoothGetLocalAddress(), 1186 (bluetooth_socket_conn_test_uuid if uuid is None else uuid)) 1187 1188 end_time = time.time() + bt_default_timeout 1189 result = False 1190 test_result = True 1191 while time.time() < end_time: 1192 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0: 1193 test_result = True 1194 client_ad.log.info("Bluetooth socket Client Connection Active") 1195 break 1196 else: 1197 test_result = False 1198 time.sleep(1) 1199 if not test_result: 1200 client_ad.log.error( 1201 "Failed to establish a Bluetooth socket connection") 1202 return False 1203 return True 1204 1205 1206def orchestrate_rfcomm_connection(client_ad, 1207 server_ad, 1208 accept_timeout_ms=default_rfcomm_timeout_ms, 1209 uuid=None): 1210 """Sets up the RFCOMM connection between two Android devices. 1211 1212 Args: 1213 client_ad: the Android device performing the connection. 1214 server_ad: the Android device accepting the connection. 1215 Returns: 1216 True if connection was successful, false if unsuccessful. 1217 """ 1218 result = orchestrate_bluetooth_socket_connection( 1219 client_ad, server_ad, accept_timeout_ms, 1220 (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid)) 1221 1222 return result 1223 1224 1225def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True): 1226 """Pairs pri droid to secondary droid. 1227 1228 Args: 1229 pri_ad: Android device initiating connection 1230 sec_ad: Android device accepting connection 1231 attempts: Number of attempts to try until failure. 1232 auto_confirm: Auto confirm passkey match for both devices 1233 1234 Returns: 1235 Pass if True 1236 Fail if False 1237 """ 1238 pri_ad.droid.bluetoothStartConnectionStateChangeMonitor( 1239 sec_ad.droid.bluetoothGetLocalAddress()) 1240 curr_attempts = 0 1241 while curr_attempts < attempts: 1242 if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1243 return True 1244 # Wait 2 seconds before unbound 1245 time.sleep(2) 1246 if not clear_bonded_devices(pri_ad): 1247 log.error( 1248 "Failed to clear bond for primary device at attempt {}".format( 1249 str(curr_attempts))) 1250 return False 1251 if not clear_bonded_devices(sec_ad): 1252 log.error( 1253 "Failed to clear bond for secondary device at attempt {}". 1254 format(str(curr_attempts))) 1255 return False 1256 # Wait 2 seconds after unbound 1257 time.sleep(2) 1258 curr_attempts += 1 1259 log.error("pair_pri_to_sec failed to connect after {} attempts".format( 1260 str(attempts))) 1261 return False 1262 1263 1264def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1265 # Enable discovery on sec_ad so that pri_ad can find it. 1266 # The timeout here is based on how much time it would take for two devices 1267 # to pair with each other once pri_ad starts seeing devices. 1268 pri_droid = pri_ad.droid 1269 sec_droid = sec_ad.droid 1270 pri_ad.ed.clear_all_events() 1271 sec_ad.ed.clear_all_events() 1272 log.info("Bonding device {} to {}".format( 1273 pri_droid.bluetoothGetLocalAddress(), 1274 sec_droid.bluetoothGetLocalAddress())) 1275 sec_droid.bluetoothMakeDiscoverable(bt_default_timeout) 1276 target_address = sec_droid.bluetoothGetLocalAddress() 1277 log.debug("Starting paring helper on each device") 1278 pri_droid.bluetoothStartPairingHelper(auto_confirm) 1279 sec_droid.bluetoothStartPairingHelper(auto_confirm) 1280 pri_ad.log.info("Primary device starting discovery and executing bond") 1281 result = pri_droid.bluetoothDiscoverAndBond(target_address) 1282 if not auto_confirm: 1283 if not _wait_for_passkey_match(pri_ad, sec_ad): 1284 return False 1285 # Loop until we have bonded successfully or timeout. 1286 end_time = time.time() + bt_default_timeout 1287 pri_ad.log.info("Verifying devices are bonded") 1288 while time.time() < end_time: 1289 bonded_devices = pri_droid.bluetoothGetBondedDevices() 1290 bonded = False 1291 for d in bonded_devices: 1292 if d['address'] == target_address: 1293 pri_ad.log.info("Successfully bonded to device") 1294 return True 1295 time.sleep(0.1) 1296 # Timed out trying to bond. 1297 pri_ad.log.info("Failed to bond devices.") 1298 return False 1299 1300 1301def reset_bluetooth(android_devices): 1302 """Resets Bluetooth state of input Android device list. 1303 1304 Args: 1305 android_devices: The Android device list to reset Bluetooth state on. 1306 1307 Returns: 1308 True if successful, false if unsuccessful. 1309 """ 1310 for a in android_devices: 1311 droid, ed = a.droid, a.ed 1312 a.log.info("Reset state of bluetooth on device.") 1313 if droid.bluetoothCheckState() is True: 1314 droid.bluetoothToggleState(False) 1315 expected_bluetooth_off_event_name = bluetooth_off 1316 try: 1317 ed.pop_event(expected_bluetooth_off_event_name, 1318 bt_default_timeout) 1319 except Exception: 1320 a.log.error("Failed to toggle Bluetooth off.") 1321 return False 1322 # temp sleep for b/17723234 1323 time.sleep(3) 1324 if not bluetooth_enabled_check(a): 1325 return False 1326 return True 1327 1328 1329def scan_and_verify_n_advertisements(scn_ad, max_advertisements): 1330 """Verify that input number of advertisements can be found from the scanning 1331 Android device. 1332 1333 Args: 1334 scn_ad: The Android device to start LE scanning on. 1335 max_advertisements: The number of advertisements the scanner expects to 1336 find. 1337 1338 Returns: 1339 True if successful, false if unsuccessful. 1340 """ 1341 test_result = False 1342 address_list = [] 1343 filter_list = scn_ad.droid.bleGenFilterList() 1344 scn_ad.droid.bleBuildScanFilter(filter_list) 1345 scan_settings = scn_ad.droid.bleBuildScanSetting() 1346 scan_callback = scn_ad.droid.bleGenScanCallback() 1347 scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 1348 start_time = time.time() 1349 while (start_time + bt_default_timeout) > time.time(): 1350 event = None 1351 try: 1352 event = scn_ad.ed.pop_event(scan_result.format(scan_callback), 1353 bt_default_timeout) 1354 except Empty as error: 1355 raise BtTestUtilsError( 1356 "Failed to find scan event: {}".format(error)) 1357 address = event['data']['Result']['deviceInfo']['address'] 1358 if address not in address_list: 1359 address_list.append(address) 1360 if len(address_list) == max_advertisements: 1361 test_result = True 1362 break 1363 scn_ad.droid.bleStopBleScan(scan_callback) 1364 return test_result 1365 1366 1367def set_bluetooth_codec(android_device, 1368 codec_type, 1369 sample_rate, 1370 bits_per_sample, 1371 channel_mode, 1372 codec_specific_1=0): 1373 """Sets the A2DP codec configuration on the AndroidDevice. 1374 1375 Args: 1376 android_device (acts.controllers.android_device.AndroidDevice): the 1377 android device for which to switch the codec. 1378 codec_type (str): the desired codec type. Must be a key in 1379 bt_constants.codec_types. 1380 sample_rate (str): the desired sample rate. Must be a key in 1381 bt_constants.sample_rates. 1382 bits_per_sample (str): the desired bits per sample. Must be a key in 1383 bt_constants.bits_per_samples. 1384 channel_mode (str): the desired channel mode. Must be a key in 1385 bt_constants.channel_modes. 1386 codec_specific_1 (int): the desired bit rate (quality) for LDAC codec. 1387 Returns: 1388 bool: True if the codec config was successfully changed to the desired 1389 values. Else False. 1390 """ 1391 message = ("Set Android Device A2DP Bluetooth codec configuration:\n" 1392 "\tCodec: {codec_type}\n" 1393 "\tSample Rate: {sample_rate}\n" 1394 "\tBits per Sample: {bits_per_sample}\n" 1395 "\tChannel Mode: {channel_mode}".format( 1396 codec_type=codec_type, 1397 sample_rate=sample_rate, 1398 bits_per_sample=bits_per_sample, 1399 channel_mode=channel_mode)) 1400 android_device.log.info(message) 1401 1402 # Send SL4A command 1403 droid, ed = android_device.droid, android_device.ed 1404 if not droid.bluetoothA2dpSetCodecConfigPreference( 1405 codec_types[codec_type], sample_rates[str(sample_rate)], 1406 bits_per_samples[str(bits_per_sample)], 1407 channel_modes[channel_mode], codec_specific_1): 1408 android_device.log.warning( 1409 "SL4A command returned False. Codec was not " 1410 "changed.") 1411 else: 1412 try: 1413 ed.pop_event(bluetooth_a2dp_codec_config_changed, 1414 bt_default_timeout) 1415 except Exception: 1416 android_device.log.warning("SL4A event not registered. Codec " 1417 "may not have been changed.") 1418 1419 # Validate codec value through ADB 1420 # TODO (aidanhb): validate codec more robustly using SL4A 1421 command = "dumpsys bluetooth_manager | grep -i 'current codec'" 1422 out = android_device.adb.shell(command) 1423 split_out = out.split(": ") 1424 if len(split_out) != 2: 1425 android_device.log.warning("Could not verify codec config change " 1426 "through ADB.") 1427 elif split_out[1].strip().upper() != codec_type: 1428 android_device.log.error("Codec config was not changed.\n" 1429 "\tExpected codec: {exp}\n" 1430 "\tActual codec: {act}".format( 1431 exp=codec_type, act=split_out[1].strip())) 1432 return False 1433 android_device.log.info("Bluetooth codec successfully changed.") 1434 return True 1435 1436 1437def set_bt_scan_mode(ad, scan_mode_value): 1438 """Set Android device's Bluetooth scan mode. 1439 1440 Args: 1441 ad: The Android device to set the scan mode on. 1442 scan_mode_value: The value to set the scan mode to. 1443 1444 Returns: 1445 True if successful, false if unsuccessful. 1446 """ 1447 droid, ed = ad.droid, ad.ed 1448 if scan_mode_value == bt_scan_mode_types['state_off']: 1449 disable_bluetooth(droid) 1450 scan_mode = droid.bluetoothGetScanMode() 1451 reset_bluetooth([ad]) 1452 if scan_mode != scan_mode_value: 1453 return False 1454 elif scan_mode_value == bt_scan_mode_types['none']: 1455 droid.bluetoothMakeUndiscoverable() 1456 scan_mode = droid.bluetoothGetScanMode() 1457 if scan_mode != scan_mode_value: 1458 return False 1459 elif scan_mode_value == bt_scan_mode_types['connectable']: 1460 droid.bluetoothMakeUndiscoverable() 1461 droid.bluetoothMakeConnectable() 1462 scan_mode = droid.bluetoothGetScanMode() 1463 if scan_mode != scan_mode_value: 1464 return False 1465 elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']): 1466 droid.bluetoothMakeDiscoverable() 1467 scan_mode = droid.bluetoothGetScanMode() 1468 if scan_mode != scan_mode_value: 1469 return False 1470 else: 1471 # invalid scan mode 1472 return False 1473 return True 1474 1475 1476def set_device_name(droid, name): 1477 """Set and check Bluetooth local name on input droid object. 1478 1479 Args: 1480 droid: Droid object to set local name on. 1481 name: the Bluetooth local name to set. 1482 1483 Returns: 1484 True if successful, false if unsuccessful. 1485 """ 1486 droid.bluetoothSetLocalName(name) 1487 time.sleep(2) 1488 droid_name = droid.bluetoothGetLocalName() 1489 if droid_name != name: 1490 return False 1491 return True 1492 1493 1494def set_profile_priority(host_ad, client_ad, profiles, priority): 1495 """Sets the priority of said profile(s) on host_ad for client_ad""" 1496 for profile in profiles: 1497 host_ad.log.info("Profile {} on {} for {} set to priority {}".format( 1498 profile, host_ad.droid.bluetoothGetLocalName(), 1499 client_ad.droid.bluetoothGetLocalAddress(), priority.value)) 1500 if bt_profile_constants['a2dp_sink'] == profile: 1501 host_ad.droid.bluetoothA2dpSinkSetPriority( 1502 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1503 elif bt_profile_constants['headset_client'] == profile: 1504 host_ad.droid.bluetoothHfpClientSetPriority( 1505 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1506 elif bt_profile_constants['pbap_client'] == profile: 1507 host_ad.droid.bluetoothPbapClientSetPriority( 1508 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1509 else: 1510 host_ad.log.error( 1511 "Profile {} not yet supported for priority settings".format( 1512 profile)) 1513 1514 1515def setup_multiple_devices_for_bt_test(android_devices): 1516 """A common setup routine for Bluetooth on input Android device list. 1517 1518 Things this function sets up: 1519 1. Resets Bluetooth 1520 2. Set Bluetooth local name to random string of size 4 1521 3. Disable BLE background scanning. 1522 4. Enable Bluetooth snoop logging. 1523 1524 Args: 1525 android_devices: Android device list to setup Bluetooth on. 1526 1527 Returns: 1528 True if successful, false if unsuccessful. 1529 """ 1530 log.info("Setting up Android Devices") 1531 # TODO: Temp fix for an selinux error. 1532 for ad in android_devices: 1533 ad.adb.shell("setenforce 0") 1534 threads = [] 1535 try: 1536 for a in android_devices: 1537 thread = threading.Thread(target=factory_reset_bluetooth, 1538 args=([[a]])) 1539 threads.append(thread) 1540 thread.start() 1541 for t in threads: 1542 t.join() 1543 1544 for a in android_devices: 1545 d = a.droid 1546 # TODO: Create specific RPC command to instantiate 1547 # BluetoothConnectionFacade. This is just a workaround. 1548 d.bluetoothStartConnectionStateChangeMonitor("") 1549 setup_result = d.bluetoothSetLocalName(generate_id_by_size(4)) 1550 if not setup_result: 1551 a.log.error("Failed to set device name.") 1552 return setup_result 1553 d.bluetoothDisableBLE() 1554 utils.set_location_service(a, True) 1555 bonded_devices = d.bluetoothGetBondedDevices() 1556 for b in bonded_devices: 1557 a.log.info("Removing bond for device {}".format(b['address'])) 1558 d.bluetoothUnbond(b['address']) 1559 for a in android_devices: 1560 a.adb.shell("setprop persist.bluetooth.btsnooplogmode full") 1561 getprop_result = a.adb.shell( 1562 "getprop persist.bluetooth.btsnooplogmode") == "full" 1563 if not getprop_result: 1564 a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.") 1565 except Exception as err: 1566 log.error("Something went wrong in multi device setup: {}".format(err)) 1567 return False 1568 return setup_result 1569 1570 1571def setup_n_advertisements(adv_ad, num_advertisements): 1572 """Setup input number of advertisements on input Android device. 1573 1574 Args: 1575 adv_ad: The Android device to start LE advertisements on. 1576 num_advertisements: The number of advertisements to start. 1577 1578 Returns: 1579 advertise_callback_list: List of advertisement callback ids. 1580 """ 1581 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 1582 ble_advertise_settings_modes['low_latency']) 1583 advertise_data = adv_ad.droid.bleBuildAdvertiseData() 1584 advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings() 1585 advertise_callback_list = [] 1586 for i in range(num_advertisements): 1587 advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback() 1588 advertise_callback_list.append(advertise_callback) 1589 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 1590 advertise_settings) 1591 try: 1592 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 1593 bt_default_timeout) 1594 adv_ad.log.info("Advertisement {} started.".format(i + 1)) 1595 except Empty as error: 1596 adv_ad.log.error("Advertisement {} failed to start.".format(i + 1)) 1597 raise BtTestUtilsError( 1598 "Test failed with Empty error: {}".format(error)) 1599 return advertise_callback_list 1600 1601 1602def take_btsnoop_log(ad, testcase, testname): 1603 """Grabs the btsnoop_hci log on a device and stores it in the log directory 1604 of the test class. 1605 1606 If you want grab the btsnoop_hci log, call this function with android_device 1607 objects in on_fail. Bug report takes a relative long time to take, so use 1608 this cautiously. 1609 1610 Args: 1611 ad: The android_device instance to take bugreport on. 1612 testcase: Name of the test calss that triggered this snoop log. 1613 testname: Name of the test case that triggered this bug report. 1614 """ 1615 testname = "".join(x for x in testname if x.isalnum()) 1616 serial = ad.serial 1617 device_model = ad.droid.getBuildModel() 1618 device_model = device_model.replace(" ", "") 1619 out_name = ','.join((testname, device_model, serial)) 1620 snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs') 1621 os.makedirs(snoop_path, exist_ok=True) 1622 cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, 1623 " ", snoop_path + '/' + out_name, ".btsnoop_hci.log")) 1624 exe_cmd(cmd) 1625 try: 1626 cmd = ''.join( 1627 ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ", 1628 snoop_path + '/' + out_name, ".btsnoop_hci.log.last")) 1629 exe_cmd(cmd) 1630 except Exception as err: 1631 testcase.log.info( 1632 "File does not exist {}".format(btsnoop_last_log_path_on_device)) 1633 1634 1635def take_btsnoop_logs(android_devices, testcase, testname): 1636 """Pull btsnoop logs from an input list of android devices. 1637 1638 Args: 1639 android_devices: the list of Android devices to pull btsnoop logs from. 1640 testcase: Name of the test calss that triggered this snoop log. 1641 testname: Name of the test case that triggered this bug report. 1642 """ 1643 for a in android_devices: 1644 take_btsnoop_log(a, testcase, testname) 1645 1646 1647def teardown_n_advertisements(adv_ad, num_advertisements, 1648 advertise_callback_list): 1649 """Stop input number of advertisements on input Android device. 1650 1651 Args: 1652 adv_ad: The Android device to stop LE advertisements on. 1653 num_advertisements: The number of advertisements to stop. 1654 advertise_callback_list: The list of advertisement callbacks to stop. 1655 1656 Returns: 1657 True if successful, false if unsuccessful. 1658 """ 1659 for n in range(num_advertisements): 1660 adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n]) 1661 return True 1662 1663 1664def verify_server_and_client_connected(client_ad, server_ad, log=True): 1665 """Verify that input server and client Android devices are connected. 1666 1667 This code is under the assumption that there will only be 1668 a single connection. 1669 1670 Args: 1671 client_ad: the Android device to check number of active connections. 1672 server_ad: the Android device to check number of active connections. 1673 1674 Returns: 1675 True both server and client have at least 1 active connection, 1676 false if unsuccessful. 1677 """ 1678 test_result = True 1679 if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1680 if log: 1681 server_ad.log.error("No socket connections found on server.") 1682 test_result = False 1683 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1684 if log: 1685 client_ad.log.error("No socket connections found on client.") 1686 test_result = False 1687 return test_result 1688 1689 1690def wait_for_bluetooth_manager_state(droid, 1691 state=None, 1692 timeout=10, 1693 threshold=5): 1694 """ Waits for BlueTooth normalized state or normalized explicit state 1695 args: 1696 droid: droid device object 1697 state: expected BlueTooth state 1698 timeout: max timeout threshold 1699 threshold: list len of bt state 1700 Returns: 1701 True if successful, false if unsuccessful. 1702 """ 1703 all_states = [] 1704 get_state = lambda: droid.bluetoothGetLeState() 1705 start_time = time.time() 1706 while time.time() < start_time + timeout: 1707 all_states.append(get_state()) 1708 if len(all_states) >= threshold: 1709 # for any normalized state 1710 if state is None: 1711 if len(set(all_states[-threshold:])) == 1: 1712 log.info("State normalized {}".format( 1713 set(all_states[-threshold:]))) 1714 return True 1715 else: 1716 # explicit check against normalized state 1717 if set([state]).issubset(all_states[-threshold:]): 1718 return True 1719 time.sleep(0.5) 1720 log.error( 1721 "Bluetooth state fails to normalize" if state is None else 1722 "Failed to match bluetooth state, current state {} expected state {}". 1723 format(get_state(), state)) 1724 return False 1725 1726 1727def _wait_for_passkey_match(pri_ad, sec_ad): 1728 pri_pin, sec_pin = -1, 1 1729 pri_variant, sec_variant = -1, 1 1730 pri_pairing_req, sec_pairing_req = None, None 1731 try: 1732 pri_pairing_req = pri_ad.ed.pop_event( 1733 event_name="BluetoothActionPairingRequest", 1734 timeout=bt_default_timeout) 1735 pri_variant = pri_pairing_req["data"]["PairingVariant"] 1736 pri_pin = pri_pairing_req["data"]["Pin"] 1737 pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format( 1738 pri_pin, pri_variant)) 1739 sec_pairing_req = sec_ad.ed.pop_event( 1740 event_name="BluetoothActionPairingRequest", 1741 timeout=bt_default_timeout) 1742 sec_variant = sec_pairing_req["data"]["PairingVariant"] 1743 sec_pin = sec_pairing_req["data"]["Pin"] 1744 sec_ad.log.info( 1745 "Secondary device received Pin: {}, Variant: {}".format( 1746 sec_pin, sec_variant)) 1747 except Empty as err: 1748 log.error("Wait for pin error: {}".format(err)) 1749 log.error("Pairing request state, Primary: {}, Secondary: {}".format( 1750 pri_pairing_req, sec_pairing_req)) 1751 return False 1752 if pri_variant == sec_variant == pairing_variant_passkey_confirmation: 1753 confirmation = pri_pin == sec_pin 1754 if confirmation: 1755 log.info("Pairing code matched, accepting connection") 1756 else: 1757 log.info("Pairing code mismatched, rejecting connection") 1758 pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1759 str(confirmation)) 1760 sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1761 str(confirmation)) 1762 if not confirmation: 1763 return False 1764 elif pri_variant != sec_variant: 1765 log.error("Pairing variant mismatched, abort connection") 1766 return False 1767 return True 1768 1769 1770def write_read_verify_data(client_ad, server_ad, msg, binary=False): 1771 """Verify that the client wrote data to the server Android device correctly. 1772 1773 Args: 1774 client_ad: the Android device to perform the write. 1775 server_ad: the Android device to read the data written. 1776 msg: the message to write. 1777 binary: if the msg arg is binary or not. 1778 1779 Returns: 1780 True if the data written matches the data read, false if not. 1781 """ 1782 client_ad.log.info("Write message.") 1783 try: 1784 if binary: 1785 client_ad.droid.bluetoothSocketConnWriteBinary(msg) 1786 else: 1787 client_ad.droid.bluetoothSocketConnWrite(msg) 1788 except Exception as err: 1789 client_ad.log.error("Failed to write data: {}".format(err)) 1790 return False 1791 server_ad.log.info("Read message.") 1792 try: 1793 if binary: 1794 read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip( 1795 "\r\n") 1796 else: 1797 read_msg = server_ad.droid.bluetoothSocketConnRead() 1798 except Exception as err: 1799 server_ad.log.error("Failed to read data: {}".format(err)) 1800 return False 1801 log.info("Verify message.") 1802 if msg != read_msg: 1803 log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg)) 1804 return False 1805 return True 1806 1807 1808class MediaControlOverSl4a(object): 1809 """Media control using sl4a facade for general purpose. 1810 1811 """ 1812 def __init__(self, android_device, music_file): 1813 """Initialize the media_control class. 1814 1815 Args: 1816 android_dut: android_device object 1817 music_file: location of the music file 1818 """ 1819 self.android_device = android_device 1820 self.music_file = music_file 1821 1822 def play(self): 1823 """Play media. 1824 1825 """ 1826 self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file, 1827 'default', True) 1828 playing = self.android_device.droid.mediaIsPlaying() 1829 asserts.assert_true(playing, 1830 'Failed to play music %s' % self.music_file) 1831 1832 def pause(self): 1833 """Pause media. 1834 1835 """ 1836 self.android_device.droid.mediaPlayPause('default') 1837 paused = not self.android_device.droid.mediaIsPlaying() 1838 asserts.assert_true(paused, 1839 'Failed to pause music %s' % self.music_file) 1840 1841 def resume(self): 1842 """Resume media. 1843 1844 """ 1845 self.android_device.droid.mediaPlayStart('default') 1846 playing = self.android_device.droid.mediaIsPlaying() 1847 asserts.assert_true(playing, 1848 'Failed to play music %s' % self.music_file) 1849 1850 def stop(self): 1851 """Stop media. 1852 1853 """ 1854 self.android_device.droid.mediaPlayStop('default') 1855 stopped = not self.android_device.droid.mediaIsPlaying() 1856 asserts.assert_true(stopped, 1857 'Failed to stop music %s' % self.music_file) 1858