1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18import os
19import random
20import re
21import string
22import threading
23import time
24from queue import Empty
25from subprocess import call
26
27from acts import asserts
28from acts.test_utils.bt.bt_constants import adv_fail
29from acts.test_utils.bt.bt_constants import adv_succ
30from acts.test_utils.bt.bt_constants import batch_scan_not_supported_list
31from acts.test_utils.bt.bt_constants import batch_scan_result
32from acts.test_utils.bt.bt_constants import bits_per_samples
33from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes
34from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
35from acts.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed
36from acts.test_utils.bt.bt_constants import bluetooth_off
37from acts.test_utils.bt.bt_constants import bluetooth_on
38from acts.test_utils.bt.bt_constants import \
39    bluetooth_profile_connection_state_changed
40from acts.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid
41from acts.test_utils.bt.bt_constants import bt_default_timeout
42from acts.test_utils.bt.bt_constants import bt_profile_constants
43from acts.test_utils.bt.bt_constants import bt_profile_states
44from acts.test_utils.bt.bt_constants import bt_rfcomm_uuids
45from acts.test_utils.bt.bt_constants import bt_scan_mode_types
46from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device
47from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device
48from acts.test_utils.bt.bt_constants import channel_modes
49from acts.test_utils.bt.bt_constants import codec_types
50from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms
51from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms
52from acts.test_utils.bt.bt_constants import hid_id_keyboard
53from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation
54from acts.test_utils.bt.bt_constants import pan_connect_timeout
55from acts.test_utils.bt.bt_constants import sample_rates
56from acts.test_utils.bt.bt_constants import scan_result
57from acts.test_utils.bt.bt_constants import sig_uuid_constants
58from acts.test_utils.bt.bt_constants import small_timeout
59from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
60from acts.test_utils.tel.tel_test_utils import verify_http_connection
61from acts.utils import exe_cmd
62
63from acts import utils
64
65log = logging
66
67advertisements_to_devices = {}
68
69
70class BtTestUtilsError(Exception):
71    pass
72
73
74def _add_android_device_to_dictionary(android_device, profile_list,
75                                      selector_dict):
76    """Adds the AndroidDevice and supported features to the selector dictionary
77
78    Args:
79        android_device: The Android device.
80        profile_list: The list of profiles the Android device supports.
81    """
82    for profile in profile_list:
83        if profile in selector_dict and android_device not in selector_dict[
84                profile]:
85            selector_dict[profile].append(android_device)
86        else:
87            selector_dict[profile] = [android_device]
88
89
90def bluetooth_enabled_check(ad, timeout_sec=5):
91    """Checks if the Bluetooth state is enabled, if not it will attempt to
92    enable it.
93
94    Args:
95        ad: The Android device list to enable Bluetooth on.
96        timeout_sec: number of seconds to wait for toggle to take effect.
97
98    Returns:
99        True if successful, false if unsuccessful.
100    """
101    if not ad.droid.bluetoothCheckState():
102        ad.droid.bluetoothToggleState(True)
103        expected_bluetooth_on_event_name = bluetooth_on
104        try:
105            ad.ed.pop_event(expected_bluetooth_on_event_name,
106                            bt_default_timeout)
107        except Empty:
108            ad.log.info(
109                "Failed to toggle Bluetooth on(no broadcast received).")
110            # Try one more time to poke at the actual state.
111            if ad.droid.bluetoothCheckState():
112                ad.log.info(".. actual state is ON")
113                return True
114            ad.log.error(".. actual state is OFF")
115            return False
116    end_time = time.time() + timeout_sec
117    while not ad.droid.bluetoothCheckState() and time.time() < end_time:
118        time.sleep(1)
119    return ad.droid.bluetoothCheckState()
120
121
122def check_device_supported_profiles(droid):
123    """Checks for Android device supported profiles.
124
125    Args:
126        droid: The droid object to query.
127
128    Returns:
129        A dictionary of supported profiles.
130    """
131    profile_dict = {}
132    profile_dict['hid'] = droid.bluetoothHidIsReady()
133    profile_dict['hsp'] = droid.bluetoothHspIsReady()
134    profile_dict['a2dp'] = droid.bluetoothA2dpIsReady()
135    profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady()
136    profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady()
137    profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady()
138    profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady()
139    return profile_dict
140
141
142def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list,
143                                     adv_android_device, adv_callback_list):
144    """Try to gracefully stop all scanning and advertising instances.
145
146    Args:
147        scn_android_device: The Android device that is actively scanning.
148        scn_callback_list: The scan callback id list that needs to be stopped.
149        adv_android_device: The Android device that is actively advertising.
150        adv_callback_list: The advertise callback id list that needs to be
151            stopped.
152    """
153    scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed
154    adv_droid = adv_android_device.droid
155    try:
156        for scan_callback in scn_callback_list:
157            scan_droid.bleStopBleScan(scan_callback)
158    except Exception as err:
159        scn_android_device.log.debug(
160            "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
161                err))
162        reset_bluetooth([scn_android_device])
163    try:
164        for adv_callback in adv_callback_list:
165            adv_droid.bleStopBleAdvertising(adv_callback)
166    except Exception as err:
167        adv_android_device.log.debug(
168            "Failed to stop LE advertisement... reseting Bluetooth. Error {}".
169            format(err))
170        reset_bluetooth([adv_android_device])
171
172
173def clear_bonded_devices(ad):
174    """Clear bonded devices from the input Android device.
175
176    Args:
177        ad: the Android device performing the connection.
178    Returns:
179        True if clearing bonded devices was successful, false if unsuccessful.
180    """
181    bonded_device_list = ad.droid.bluetoothGetBondedDevices()
182    while bonded_device_list:
183        device_address = bonded_device_list[0]['address']
184        if not ad.droid.bluetoothUnbond(device_address):
185            log.error("Failed to unbond {} from {}".format(
186                device_address, ad.serial))
187            return False
188        log.info("Successfully unbonded {} from {}".format(
189            device_address, ad.serial))
190        #TODO: wait for BOND_STATE_CHANGED intent instead of waiting
191        time.sleep(1)
192
193        # If device was first connected using LE transport, after bonding it is
194        # accessible through it's LE address, and through it classic address.
195        # Unbonding it will unbond two devices representing different
196        # "addresses". Attempt to unbond such already unbonded devices will
197        # result in bluetoothUnbond returning false.
198        bonded_device_list = ad.droid.bluetoothGetBondedDevices()
199    return True
200
201
202def connect_phone_to_headset(android,
203                             headset,
204                             timeout=bt_default_timeout,
205                             connection_check_period=10):
206    """Connects android phone to bluetooth headset.
207    Headset object must have methods power_on and enter_pairing_mode,
208    and attribute mac_address.
209
210    Args:
211        android: AndroidDevice object with SL4A installed.
212        headset: Object with attribute mac_address and methods power_on and
213            enter_pairing_mode.
214        timeout: Seconds to wait for devices to connect.
215        connection_check_period: how often to check for connection once the
216            SL4A connect RPC has been sent.
217    Returns:
218        connected (bool): True if devices are paired and connected by end of
219        method. False otherwise.
220    """
221    headset_mac_address = headset.mac_address
222    connected = is_a2dp_src_device_connected(android, headset_mac_address)
223    log.info('Devices connected before pair attempt: %s' % connected)
224    if not connected:
225        # Turn on headset and initiate pairing mode.
226        headset.enter_pairing_mode()
227        android.droid.bluetoothStartPairingHelper()
228    start_time = time.time()
229    # If already connected, skip pair and connect attempt.
230    while not connected and (time.time() - start_time < timeout):
231        bonded_info = android.droid.bluetoothGetBondedDevices()
232        if headset.mac_address not in [
233                info["address"] for info in bonded_info
234        ]:
235            # Use SL4A to pair and connect with headset.
236            headset.enter_pairing_mode()
237            android.droid.bluetoothDiscoverAndBond(headset_mac_address)
238        else:  # Device is bonded but not connected
239            android.droid.bluetoothConnectBonded(headset_mac_address)
240        log.info('Waiting for connection...')
241        time.sleep(connection_check_period)
242        # Check for connection.
243        connected = is_a2dp_src_device_connected(android, headset_mac_address)
244    log.info('Devices connected after pair attempt: %s' % connected)
245    return connected
246
247
248def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2):
249    """Connects pri droid to secondary droid.
250
251    Args:
252        pri_ad: AndroidDroid initiating connection
253        sec_ad: AndroidDroid accepting connection
254        profiles_set: Set of profiles to be connected
255        attempts: Number of attempts to try until failure.
256
257    Returns:
258        Pass if True
259        Fail if False
260    """
261    device_addr = sec_ad.droid.bluetoothGetLocalAddress()
262    # Allows extra time for the SDP records to be updated.
263    time.sleep(2)
264    curr_attempts = 0
265    while curr_attempts < attempts:
266        log.info("connect_pri_to_sec curr attempt {} total {}".format(
267            curr_attempts, attempts))
268        if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
269            return True
270        curr_attempts += 1
271    log.error("connect_pri_to_sec failed to connect after {} attempts".format(
272        attempts))
273    return False
274
275
276def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
277    """Connects pri droid to secondary droid.
278
279    Args:
280        pri_ad: AndroidDroid initiating connection.
281        sec_ad: AndroidDroid accepting connection.
282        profiles_set: Set of profiles to be connected.
283
284    Returns:
285        True of connection is successful, false if unsuccessful.
286    """
287    # Check if we support all profiles.
288    supported_profiles = bt_profile_constants.values()
289    for profile in profiles_set:
290        if profile not in supported_profiles:
291            pri_ad.log.info("Profile {} is not supported list {}".format(
292                profile, supported_profiles))
293            return False
294
295    # First check that devices are bonded.
296    paired = False
297    for paired_device in pri_ad.droid.bluetoothGetBondedDevices():
298        if paired_device['address'] == \
299                sec_ad.droid.bluetoothGetLocalAddress():
300            paired = True
301            break
302
303    if not paired:
304        pri_ad.log.error("Not paired to {}".format(sec_ad.serial))
305        return False
306
307    # Now try to connect them, the following call will try to initiate all
308    # connections.
309    pri_ad.droid.bluetoothConnectBonded(
310        sec_ad.droid.bluetoothGetLocalAddress())
311
312    end_time = time.time() + 10
313    profile_connected = set()
314    sec_addr = sec_ad.droid.bluetoothGetLocalAddress()
315    pri_ad.log.info("Profiles to be connected {}".format(profiles_set))
316    # First use APIs to check profile connection state
317    while (time.time() < end_time
318           and not profile_connected.issuperset(profiles_set)):
319        if (bt_profile_constants['headset_client'] not in profile_connected
320                and bt_profile_constants['headset_client'] in profiles_set):
321            if is_hfp_client_device_connected(pri_ad, sec_addr):
322                profile_connected.add(bt_profile_constants['headset_client'])
323        if (bt_profile_constants['a2dp'] not in profile_connected
324                and bt_profile_constants['a2dp'] in profiles_set):
325            if is_a2dp_src_device_connected(pri_ad, sec_addr):
326                profile_connected.add(bt_profile_constants['a2dp'])
327        if (bt_profile_constants['a2dp_sink'] not in profile_connected
328                and bt_profile_constants['a2dp_sink'] in profiles_set):
329            if is_a2dp_snk_device_connected(pri_ad, sec_addr):
330                profile_connected.add(bt_profile_constants['a2dp_sink'])
331        if (bt_profile_constants['map_mce'] not in profile_connected
332                and bt_profile_constants['map_mce'] in profiles_set):
333            if is_map_mce_device_connected(pri_ad, sec_addr):
334                profile_connected.add(bt_profile_constants['map_mce'])
335        if (bt_profile_constants['map'] not in profile_connected
336                and bt_profile_constants['map'] in profiles_set):
337            if is_map_mse_device_connected(pri_ad, sec_addr):
338                profile_connected.add(bt_profile_constants['map'])
339        time.sleep(0.1)
340    # If APIs fail, try to find the connection broadcast receiver.
341    while not profile_connected.issuperset(profiles_set):
342        try:
343            profile_event = pri_ad.ed.pop_event(
344                bluetooth_profile_connection_state_changed,
345                bt_default_timeout + 10)
346            pri_ad.log.info("Got event {}".format(profile_event))
347        except Exception:
348            pri_ad.log.error("Did not get {} profiles left {}".format(
349                bluetooth_profile_connection_state_changed, profile_connected))
350            return False
351
352        profile = profile_event['data']['profile']
353        state = profile_event['data']['state']
354        device_addr = profile_event['data']['addr']
355        if state == bt_profile_states['connected'] and \
356                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
357            profile_connected.add(profile)
358        pri_ad.log.info(
359            "Profiles connected until now {}".format(profile_connected))
360    # Failure happens inside the while loop. If we came here then we already
361    # connected.
362    return True
363
364
365def determine_max_advertisements(android_device):
366    """Determines programatically how many advertisements the Android device
367    supports.
368
369    Args:
370        android_device: The Android device to determine max advertisements of.
371
372    Returns:
373        The maximum advertisement count.
374    """
375    android_device.log.info(
376        "Determining number of maximum concurrent advertisements...")
377    advertisement_count = 0
378    bt_enabled = False
379    expected_bluetooth_on_event_name = bluetooth_on
380    if not android_device.droid.bluetoothCheckState():
381        android_device.droid.bluetoothToggleState(True)
382    try:
383        android_device.ed.pop_event(expected_bluetooth_on_event_name,
384                                    bt_default_timeout)
385    except Exception:
386        android_device.log.info(
387            "Failed to toggle Bluetooth on(no broadcast received).")
388        # Try one more time to poke at the actual state.
389        if android_device.droid.bluetoothCheckState() is True:
390            android_device.log.info(".. actual state is ON")
391        else:
392            android_device.log.error(
393                "Failed to turn Bluetooth on. Setting default advertisements to 1"
394            )
395            advertisement_count = -1
396            return advertisement_count
397    advertise_callback_list = []
398    advertise_data = android_device.droid.bleBuildAdvertiseData()
399    advertise_settings = android_device.droid.bleBuildAdvertiseSettings()
400    while (True):
401        advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
402        advertise_callback_list.append(advertise_callback)
403
404        android_device.droid.bleStartBleAdvertising(advertise_callback,
405                                                    advertise_data,
406                                                    advertise_settings)
407
408        regex = "(" + adv_succ.format(
409            advertise_callback) + "|" + adv_fail.format(
410                advertise_callback) + ")"
411        # wait for either success or failure event
412        evt = android_device.ed.pop_events(regex, bt_default_timeout,
413                                           small_timeout)
414        if evt[0]["name"] == adv_succ.format(advertise_callback):
415            advertisement_count += 1
416            android_device.log.info(
417                "Advertisement {} started.".format(advertisement_count))
418        else:
419            error = evt[0]["data"]["Error"]
420            if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS":
421                android_device.log.info(
422                    "Advertisement failed to start. Reached max " +
423                    "advertisements at {}".format(advertisement_count))
424                break
425            else:
426                raise BtTestUtilsError(
427                    "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," +
428                    " but received bad error code {}".format(error))
429    try:
430        for adv in advertise_callback_list:
431            android_device.droid.bleStopBleAdvertising(adv)
432    except Exception:
433        android_device.log.error(
434            "Failed to stop advertisingment, resetting Bluetooth.")
435        reset_bluetooth([android_device])
436    return advertisement_count
437
438
439def disable_bluetooth(droid):
440    """Disable Bluetooth on input Droid object.
441
442    Args:
443        droid: The droid object to disable Bluetooth on.
444
445    Returns:
446        True if successful, false if unsuccessful.
447    """
448    if droid.bluetoothCheckState() is True:
449        droid.bluetoothToggleState(False)
450        if droid.bluetoothCheckState() is True:
451            log.error("Failed to toggle Bluetooth off.")
452            return False
453    return True
454
455
456def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list):
457    """
458    Disconnect primary from secondary on a specific set of profiles
459    Args:
460        pri_ad - Primary android_device initiating disconnection
461        sec_ad - Secondary android droid (sl4a interface to keep the
462          method signature the same connect_pri_to_sec above)
463        profiles_list - List of profiles we want to disconnect from
464
465    Returns:
466        True on Success
467        False on Failure
468    """
469    # Sanity check to see if all the profiles in the given set is supported
470    supported_profiles = bt_profile_constants.values()
471    for profile in profiles_list:
472        if profile not in supported_profiles:
473            pri_ad.log.info("Profile {} is not in supported list {}".format(
474                profile, supported_profiles))
475            return False
476
477    pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices())
478    # Disconnecting on a already disconnected profile is a nop,
479    # so not checking for the connection state
480    try:
481        pri_ad.droid.bluetoothDisconnectConnectedProfile(
482            sec_ad.droid.bluetoothGetLocalAddress(), profiles_list)
483    except Exception as err:
484        pri_ad.log.error(
485            "Exception while trying to disconnect profile(s) {}: {}".format(
486                profiles_list, err))
487        return False
488
489    profile_disconnected = set()
490    pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list))
491
492    while not profile_disconnected.issuperset(profiles_list):
493        try:
494            profile_event = pri_ad.ed.pop_event(
495                bluetooth_profile_connection_state_changed, bt_default_timeout)
496            pri_ad.log.info("Got event {}".format(profile_event))
497        except Exception as e:
498            pri_ad.log.error(
499                "Did not disconnect from Profiles. Reason {}".format(e))
500            return False
501
502        profile = profile_event['data']['profile']
503        state = profile_event['data']['state']
504        device_addr = profile_event['data']['addr']
505
506        if state == bt_profile_states['disconnected'] and \
507                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
508            profile_disconnected.add(profile)
509        pri_ad.log.info(
510            "Profiles disconnected so far {}".format(profile_disconnected))
511
512    return True
513
514
515def enable_bluetooth(droid, ed):
516    if droid.bluetoothCheckState() is True:
517        return True
518
519    droid.bluetoothToggleState(True)
520    expected_bluetooth_on_event_name = bluetooth_on
521    try:
522        ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout)
523    except Exception:
524        log.info("Failed to toggle Bluetooth on (no broadcast received)")
525        if droid.bluetoothCheckState() is True:
526            log.info(".. actual state is ON")
527            return True
528        log.info(".. actual state is OFF")
529        return False
530
531    return True
532
533
534def factory_reset_bluetooth(android_devices):
535    """Clears Bluetooth stack of input Android device list.
536
537        Args:
538            android_devices: The Android device list to reset Bluetooth
539
540        Returns:
541            True if successful, false if unsuccessful.
542        """
543    for a in android_devices:
544        droid, ed = a.droid, a.ed
545        a.log.info("Reset state of bluetooth on device.")
546        if not bluetooth_enabled_check(a):
547            return False
548        # TODO: remove device unbond b/79418045
549        # Temporary solution to ensure all devices are unbonded
550        bonded_devices = droid.bluetoothGetBondedDevices()
551        for b in bonded_devices:
552            a.log.info("Removing bond for device {}".format(b['address']))
553            droid.bluetoothUnbond(b['address'])
554
555        droid.bluetoothFactoryReset()
556        wait_for_bluetooth_manager_state(droid)
557        if not enable_bluetooth(droid, ed):
558            return False
559    return True
560
561
562def generate_ble_advertise_objects(droid):
563    """Generate generic LE advertise objects.
564
565    Args:
566        droid: The droid object to generate advertise LE objects from.
567
568    Returns:
569        advertise_callback: The generated advertise callback id.
570        advertise_data: The generated advertise data id.
571        advertise_settings: The generated advertise settings id.
572    """
573    advertise_callback = droid.bleGenBleAdvertiseCallback()
574    advertise_data = droid.bleBuildAdvertiseData()
575    advertise_settings = droid.bleBuildAdvertiseSettings()
576    return advertise_callback, advertise_data, advertise_settings
577
578
579def generate_ble_scan_objects(droid):
580    """Generate generic LE scan objects.
581
582    Args:
583        droid: The droid object to generate LE scan objects from.
584
585    Returns:
586        filter_list: The generated scan filter list id.
587        scan_settings: The generated scan settings id.
588        scan_callback: The generated scan callback id.
589    """
590    filter_list = droid.bleGenFilterList()
591    scan_settings = droid.bleBuildScanSetting()
592    scan_callback = droid.bleGenScanCallback()
593    return filter_list, scan_settings, scan_callback
594
595
596def generate_id_by_size(size,
597                        chars=(string.ascii_lowercase +
598                               string.ascii_uppercase + string.digits)):
599    """Generate random ascii characters of input size and input char types
600
601    Args:
602        size: Input size of string.
603        chars: (Optional) Chars to use in generating a random string.
604
605    Returns:
606        String of random input chars at the input size.
607    """
608    return ''.join(random.choice(chars) for _ in range(size))
609
610
611def get_advanced_droid_list(android_devices):
612    """Add max_advertisement and batch_scan_supported attributes to input
613    Android devices
614
615    This will programatically determine maximum LE advertisements of each
616    input Android device.
617
618    Args:
619        android_devices: The Android devices to setup.
620
621    Returns:
622        List of Android devices with new attribtues.
623    """
624    droid_list = []
625    for a in android_devices:
626        d, e = a.droid, a.ed
627        model = d.getBuildModel()
628        max_advertisements = 1
629        batch_scan_supported = True
630        if model in advertisements_to_devices.keys():
631            max_advertisements = advertisements_to_devices[model]
632        else:
633            max_advertisements = determine_max_advertisements(a)
634            max_tries = 3
635            # Retry to calculate max advertisements
636            while max_advertisements == -1 and max_tries > 0:
637                a.log.info(
638                    "Attempts left to determine max advertisements: {}".format(
639                        max_tries))
640                max_advertisements = determine_max_advertisements(a)
641                max_tries -= 1
642            advertisements_to_devices[model] = max_advertisements
643        if model in batch_scan_not_supported_list:
644            batch_scan_supported = False
645        role = {
646            'droid': d,
647            'ed': e,
648            'max_advertisements': max_advertisements,
649            'batch_scan_supported': batch_scan_supported
650        }
651        droid_list.append(role)
652    return droid_list
653
654
655def get_bluetooth_crash_count(android_device):
656    out = android_device.adb.shell("dumpsys bluetooth_manager")
657    return int(re.search("crashed(.*\d)", out).group(1))
658
659
660def read_otp(ad):
661    """Reads and parses the OTP output to return TX power backoff
662
663    Reads the OTP registers from the phone, parses them to return a
664    dict of TX power backoffs for different power levels
665
666    Args:
667        ad : android device object
668
669    Returns :
670        otp_dict : power backoff dict
671    """
672
673    ad.adb.shell('svc bluetooth disable')
674    time.sleep(2)
675    otp_output = ad.adb.shell('bluetooth_sar_test -r')
676    ad.adb.shell('svc bluetooth enable')
677    time.sleep(2)
678    otp_dict = {
679        "BR": {
680            "10": 0,
681            "9": 0,
682            "8": 0
683        },
684        "EDR": {
685            "10": 0,
686            "9": 0,
687            "8": 0
688        },
689        "BLE": {
690            "10": 0,
691            "9": 0,
692            "8": 0
693        }
694    }
695
696    otp_regex = '\s+\[\s+PL10:\s+(\d+)\s+PL9:\s+(\d+)*\s+PL8:\s+(\d+)\s+\]'
697
698    for key in otp_dict:
699        bank_list = re.findall("{}{}".format(key, otp_regex), otp_output)
700        for bank_tuple in bank_list:
701            if ('0', '0', '0') != bank_tuple:
702                [otp_dict[key]["10"], otp_dict[key]["9"],
703                 otp_dict[key]["8"]] = bank_tuple
704    return otp_dict
705
706
707def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True):
708    """ Function to get the bt metric from logcat.
709
710    Captures logcat for the specified duration and returns the bqr results.
711    Takes list of android objects as input. If a single android object is given,
712    converts it into a list.
713
714    Args:
715        ad_list: list of android_device objects
716        duration: time duration (seconds) for which the logcat is parsed.
717        tag: tag to be appended to the logcat dump.
718        processed: flag to process bqr output.
719
720    Returns:
721        metrics_dict: dict of metrics for each android device.
722    """
723
724    # Defining bqr quantitites and their regex to extract
725    regex_dict = {
726        "vsp_txpl": "VSP_TxPL:\s(\S+)",
727        "pwlv": "PwLv:\s(\S+)",
728        "rssi": "RSSI:\s[-](\d+)"
729    }
730    metrics_dict = {"rssi": {}, "pwlv": {}, "vsp_txpl": {}}
731
732    # Converting a single android device object to list
733    if not isinstance(ad_list, list):
734        ad_list = [ad_list]
735
736    #Time sync with the test machine
737    for ad in ad_list:
738        ad.droid.setTime(int(round(time.time() * 1000)))
739        time.sleep(0.5)
740
741    begin_time = utils.get_current_epoch_time()
742    time.sleep(duration)
743    end_time = utils.get_current_epoch_time()
744
745    for ad in ad_list:
746        bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time)
747        bqr_tag = "Handle:"
748
749        # Extracting supporting bqr quantities
750        for metric, regex in regex_dict.items():
751            bqr_metric = []
752            file_bt_log = open(bt_rssi_log, "r")
753            for line in file_bt_log:
754                if bqr_tag in line:
755                    if re.findall(regex, line):
756                        m = re.findall(regex, line)[0].strip(",")
757                        bqr_metric.append(m)
758            metrics_dict[metric][ad.serial] = bqr_metric
759
760        # Ensures back-compatibility for vsp_txpl enabled DUTs
761        if metrics_dict["vsp_txpl"][ad.serial]:
762            metrics_dict["pwlv"][ad.serial] = metrics_dict["vsp_txpl"][
763                ad.serial]
764
765        # Formatting the raw data
766        metrics_dict["rssi"][ad.serial] = [
767            (-1) * int(x) for x in metrics_dict["rssi"][ad.serial]
768        ]
769        metrics_dict["pwlv"][ad.serial] = [
770            int(x, 16) for x in metrics_dict["pwlv"][ad.serial]
771        ]
772
773        # Processing formatted data if processing is required
774        if processed:
775            # Computes the average RSSI
776            metrics_dict["rssi"][ad.serial] = round(
777                sum(metrics_dict["rssi"][ad.serial]) /
778                len(metrics_dict["rssi"][ad.serial]), 2)
779            # Returns last noted value for power level
780            metrics_dict["pwlv"][ad.serial] = float(
781                sum(metrics_dict["pwlv"][ad.serial]) /
782                len(metrics_dict["pwlv"][ad.serial]))
783
784    return metrics_dict
785
786
787def get_bt_rssi(ad, duration=1, processed=True):
788    """Function to get average bt rssi from logcat.
789
790    This function returns the average RSSI for the given duration. RSSI values are
791    extracted from BQR.
792
793    Args:
794        ad: (list of) android_device object.
795        duration: time duration(seconds) for which logcat is parsed.
796
797    Returns:
798        avg_rssi: average RSSI on each android device for the given duration.
799    """
800    function_tag = "get_bt_rssi"
801    bqr_results = get_bt_metric(ad,
802                                duration,
803                                tag=function_tag,
804                                processed=processed)
805    return bqr_results["rssi"]
806
807
808def enable_bqr(
809    ad_list,
810    bqr_interval=10,
811    bqr_event_mask=15,
812):
813    """Sets up BQR reporting.
814
815       Sets up BQR to report BT metrics at the requested frequency and toggles
816       airplane mode for the bqr settings to take effect.
817
818    Args:
819        ad_list: an android_device or list of android devices.
820    """
821    # Converting a single android device object to list
822    if not isinstance(ad_list, list):
823        ad_list = [ad_list]
824
825    for ad in ad_list:
826        #Setting BQR parameters
827        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
828            bqr_event_mask))
829        ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format(
830            bqr_interval))
831
832        ## Toggle airplane mode
833        ad.droid.connectivityToggleAirplaneMode(True)
834        ad.droid.connectivityToggleAirplaneMode(False)
835
836
837def disable_bqr(ad_list):
838    """Disables BQR reporting.
839
840    Args:
841        ad_list: an android_device or list of android devices.
842    """
843    # Converting a single android device object to list
844    if not isinstance(ad_list, list):
845        ad_list = [ad_list]
846
847    DISABLE_BQR_MASK = 0
848
849    for ad in ad_list:
850        #Disabling BQR
851        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
852            DISABLE_BQR_MASK))
853
854        ## Toggle airplane mode
855        ad.droid.connectivityToggleAirplaneMode(True)
856        ad.droid.connectivityToggleAirplaneMode(False)
857
858
859def get_device_selector_dictionary(android_device_list):
860    """Create a dictionary of Bluetooth features vs Android devices.
861
862    Args:
863        android_device_list: The list of Android devices.
864    Returns:
865        A dictionary of profiles/features to Android devices.
866    """
867    selector_dict = {}
868    for ad in android_device_list:
869        uuids = ad.droid.bluetoothGetLocalUuids()
870
871        for profile, uuid_const in sig_uuid_constants.items():
872            uuid_check = sig_uuid_constants['BASE_UUID'].format(
873                uuid_const).lower()
874            if uuids and uuid_check in uuids:
875                if profile in selector_dict:
876                    selector_dict[profile].append(ad)
877                else:
878                    selector_dict[profile] = [ad]
879
880        # Various services may not be active during BT startup.
881        # If the device can be identified through adb shell pm list features
882        # then try to add them to the appropriate profiles / features.
883
884        # Android TV.
885        if "feature:com.google.android.tv.installed" in ad.features:
886            ad.log.info("Android TV device found.")
887            supported_profiles = ['AudioSink']
888            _add_android_device_to_dictionary(ad, supported_profiles,
889                                              selector_dict)
890
891        # Android Auto
892        elif "feature:android.hardware.type.automotive" in ad.features:
893            ad.log.info("Android Auto device found.")
894            # Add: AudioSink , A/V_RemoteControl,
895            supported_profiles = [
896                'AudioSink', 'A/V_RemoteControl', 'Message Notification Server'
897            ]
898            _add_android_device_to_dictionary(ad, supported_profiles,
899                                              selector_dict)
900        # Android Wear
901        elif "feature:android.hardware.type.watch" in ad.features:
902            ad.log.info("Android Wear device found.")
903            supported_profiles = []
904            _add_android_device_to_dictionary(ad, supported_profiles,
905                                              selector_dict)
906        # Android Phone
907        elif "feature:android.hardware.telephony" in ad.features:
908            ad.log.info("Android Phone device found.")
909            # Add: AudioSink
910            supported_profiles = [
911                'AudioSource', 'A/V_RemoteControlTarget',
912                'Message Access Server'
913            ]
914            _add_android_device_to_dictionary(ad, supported_profiles,
915                                              selector_dict)
916    return selector_dict
917
918
919def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
920    """Start generic advertisement and get it's mac address by LE scanning.
921
922    Args:
923        scan_ad: The Android device to use as the scanner.
924        adv_ad: The Android device to use as the advertiser.
925
926    Returns:
927        mac_address: The mac address of the advertisement.
928        advertise_callback: The advertise callback id of the active
929            advertisement.
930    """
931    adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
932    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
933        ble_advertise_settings_modes['low_latency'])
934    adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True)
935    adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel(
936        ble_advertise_settings_tx_powers['high'])
937    advertise_callback, advertise_data, advertise_settings = (
938        generate_ble_advertise_objects(adv_ad.droid))
939    adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
940                                        advertise_settings)
941    try:
942        adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
943                            bt_default_timeout)
944    except Empty as err:
945        raise BtTestUtilsError(
946            "Advertiser did not start successfully {}".format(err))
947    filter_list = scan_ad.droid.bleGenFilterList()
948    scan_settings = scan_ad.droid.bleBuildScanSetting()
949    scan_callback = scan_ad.droid.bleGenScanCallback()
950    scan_ad.droid.bleSetScanFilterDeviceName(
951        adv_ad.droid.bluetoothGetLocalName())
952    scan_ad.droid.bleBuildScanFilter(filter_list)
953    scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
954    try:
955        event = scan_ad.ed.pop_event(
956            "BleScan{}onScanResults".format(scan_callback), bt_default_timeout)
957    except Empty as err:
958        raise BtTestUtilsError(
959            "Scanner did not find advertisement {}".format(err))
960    mac_address = event['data']['Result']['deviceInfo']['address']
961    return mac_address, advertise_callback, scan_callback
962
963
964def hid_device_send_key_data_report(host_id, device_ad, key, interval=1):
965    """Send a HID report simulating a 1-second keyboard press from host_ad to
966    device_ad
967
968    Args:
969        host_id: the Bluetooth MAC address or name of the HID host
970        device_ad: HID device
971        key: the key we want to send
972        interval: the interval between key press and key release
973    """
974    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
975                                                 hid_keyboard_report(key))
976    time.sleep(interval)
977    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
978                                                 hid_keyboard_report("00"))
979
980
981def hid_keyboard_report(key, modifier="00"):
982    """Get the HID keyboard report for the given key
983
984    Args:
985        key: the key we want
986        modifier: HID keyboard modifier bytes
987    Returns:
988        The byte array for the HID report.
989    """
990    return str(
991        bytearray.fromhex(" ".join(
992            [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8")
993
994
995def is_a2dp_connected(sink, source):
996    """
997    Convenience Function to see if the 2 devices are connected on
998    A2dp.
999    Args:
1000        sink:       Audio Sink
1001        source:     Audio Source
1002    Returns:
1003        True if Connected
1004        False if Not connected
1005    """
1006
1007    devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices()
1008    for device in devices:
1009        sink.log.info("A2dp Connected device {}".format(device["name"]))
1010        if (device["address"] == source.droid.bluetoothGetLocalAddress()):
1011            return True
1012    return False
1013
1014
1015def is_a2dp_snk_device_connected(ad, addr):
1016    """Determines if an AndroidDevice has A2DP snk connectivity to input address
1017
1018    Args:
1019        ad: the Android device
1020        addr: the address that's expected
1021    Returns:
1022        True if connection was successful, false if unsuccessful.
1023    """
1024    devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices()
1025    ad.log.info("Connected A2DP Sink devices: {}".format(devices))
1026    if addr in {d['address'] for d in devices}:
1027        return True
1028    return False
1029
1030
1031def is_a2dp_src_device_connected(ad, addr):
1032    """Determines if an AndroidDevice has A2DP connectivity to input address
1033
1034    Args:
1035        ad: the Android device
1036        addr: the address that's expected
1037    Returns:
1038        True if connection was successful, false if unsuccessful.
1039    """
1040    devices = ad.droid.bluetoothA2dpGetConnectedDevices()
1041    ad.log.info("Connected A2DP Source devices: {}".format(devices))
1042    if addr in {d['address'] for d in devices}:
1043        return True
1044    return False
1045
1046
1047def is_hfp_client_device_connected(ad, addr):
1048    """Determines if an AndroidDevice has HFP connectivity to input address
1049
1050    Args:
1051        ad: the Android device
1052        addr: the address that's expected
1053    Returns:
1054        True if connection was successful, false if unsuccessful.
1055    """
1056    devices = ad.droid.bluetoothHfpClientGetConnectedDevices()
1057    ad.log.info("Connected HFP Client devices: {}".format(devices))
1058    if addr in {d['address'] for d in devices}:
1059        return True
1060    return False
1061
1062
1063def is_map_mce_device_connected(ad, addr):
1064    """Determines if an AndroidDevice has MAP MCE connectivity to input address
1065
1066    Args:
1067        ad: the Android device
1068        addr: the address that's expected
1069    Returns:
1070        True if connection was successful, false if unsuccessful.
1071    """
1072    devices = ad.droid.bluetoothMapClientGetConnectedDevices()
1073    ad.log.info("Connected MAP MCE devices: {}".format(devices))
1074    if addr in {d['address'] for d in devices}:
1075        return True
1076    return False
1077
1078
1079def is_map_mse_device_connected(ad, addr):
1080    """Determines if an AndroidDevice has MAP MSE connectivity to input address
1081
1082    Args:
1083        ad: the Android device
1084        addr: the address that's expected
1085    Returns:
1086        True if connection was successful, false if unsuccessful.
1087    """
1088    devices = ad.droid.bluetoothMapGetConnectedDevices()
1089    ad.log.info("Connected MAP MSE devices: {}".format(devices))
1090    if addr in {d['address'] for d in devices}:
1091        return True
1092    return False
1093
1094
1095def kill_bluetooth_process(ad):
1096    """Kill Bluetooth process on Android device.
1097
1098    Args:
1099        ad: Android device to kill BT process on.
1100    """
1101    ad.log.info("Killing Bluetooth process.")
1102    pid = ad.adb.shell(
1103        "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
1104    call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)
1105
1106
1107def log_energy_info(android_devices, state):
1108    """Logs energy info of input Android devices.
1109
1110    Args:
1111        android_devices: input Android device list to log energy info from.
1112        state: the input state to log. Usually 'Start' or 'Stop' for logging.
1113
1114    Returns:
1115        A logging string of the Bluetooth energy info reported.
1116    """
1117    return_string = "{} Energy info collection:\n".format(state)
1118    # Bug: b/31966929
1119    return return_string
1120
1121
1122def orchestrate_and_verify_pan_connection(pan_dut, panu_dut):
1123    """Setups up a PAN conenction between two android devices.
1124
1125    Args:
1126        pan_dut: the Android device providing tethering services
1127        panu_dut: the Android device using the internet connection from the
1128            pan_dut
1129    Returns:
1130        True if PAN connection and verification is successful,
1131        false if unsuccessful.
1132    """
1133    if not toggle_airplane_mode_by_adb(log, panu_dut, True):
1134        panu_dut.log.error("Failed to toggle airplane mode on")
1135        return False
1136    if not toggle_airplane_mode_by_adb(log, panu_dut, False):
1137        pan_dut.log.error("Failed to toggle airplane mode off")
1138        return False
1139    pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1140    panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1141    if not bluetooth_enabled_check(panu_dut):
1142        return False
1143    if not bluetooth_enabled_check(pan_dut):
1144        return False
1145    pan_dut.droid.bluetoothPanSetBluetoothTethering(True)
1146    if not (pair_pri_to_sec(pan_dut, panu_dut)):
1147        return False
1148    if not pan_dut.droid.bluetoothPanIsTetheringOn():
1149        pan_dut.log.error("Failed to enable Bluetooth tethering.")
1150        return False
1151    # Magic sleep needed to give the stack time in between bonding and
1152    # connecting the PAN profile.
1153    time.sleep(pan_connect_timeout)
1154    panu_dut.droid.bluetoothConnectBonded(
1155        pan_dut.droid.bluetoothGetLocalAddress())
1156    if not verify_http_connection(log, panu_dut):
1157        panu_dut.log.error("Can't verify http connection on PANU device.")
1158        if not verify_http_connection(log, pan_dut):
1159            pan_dut.log.info(
1160                "Can't verify http connection on PAN service device")
1161        return False
1162    return True
1163
1164
1165def orchestrate_bluetooth_socket_connection(
1166        client_ad,
1167        server_ad,
1168        accept_timeout_ms=default_bluetooth_socket_timeout_ms,
1169        uuid=None):
1170    """Sets up the Bluetooth Socket connection between two Android devices.
1171
1172    Args:
1173        client_ad: the Android device performing the connection.
1174        server_ad: the Android device accepting the connection.
1175    Returns:
1176        True if connection was successful, false if unsuccessful.
1177    """
1178    server_ad.droid.bluetoothStartPairingHelper()
1179    client_ad.droid.bluetoothStartPairingHelper()
1180
1181    server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
1182        (bluetooth_socket_conn_test_uuid if uuid is None else uuid),
1183        accept_timeout_ms)
1184    client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
1185        server_ad.droid.bluetoothGetLocalAddress(),
1186        (bluetooth_socket_conn_test_uuid if uuid is None else uuid))
1187
1188    end_time = time.time() + bt_default_timeout
1189    result = False
1190    test_result = True
1191    while time.time() < end_time:
1192        if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
1193            test_result = True
1194            client_ad.log.info("Bluetooth socket Client Connection Active")
1195            break
1196        else:
1197            test_result = False
1198        time.sleep(1)
1199    if not test_result:
1200        client_ad.log.error(
1201            "Failed to establish a Bluetooth socket connection")
1202        return False
1203    return True
1204
1205
1206def orchestrate_rfcomm_connection(client_ad,
1207                                  server_ad,
1208                                  accept_timeout_ms=default_rfcomm_timeout_ms,
1209                                  uuid=None):
1210    """Sets up the RFCOMM connection between two Android devices.
1211
1212    Args:
1213        client_ad: the Android device performing the connection.
1214        server_ad: the Android device accepting the connection.
1215    Returns:
1216        True if connection was successful, false if unsuccessful.
1217    """
1218    result = orchestrate_bluetooth_socket_connection(
1219        client_ad, server_ad, accept_timeout_ms,
1220        (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid))
1221
1222    return result
1223
1224
1225def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True):
1226    """Pairs pri droid to secondary droid.
1227
1228    Args:
1229        pri_ad: Android device initiating connection
1230        sec_ad: Android device accepting connection
1231        attempts: Number of attempts to try until failure.
1232        auto_confirm: Auto confirm passkey match for both devices
1233
1234    Returns:
1235        Pass if True
1236        Fail if False
1237    """
1238    pri_ad.droid.bluetoothStartConnectionStateChangeMonitor(
1239        sec_ad.droid.bluetoothGetLocalAddress())
1240    curr_attempts = 0
1241    while curr_attempts < attempts:
1242        if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1243            return True
1244        # Wait 2 seconds before unbound
1245        time.sleep(2)
1246        if not clear_bonded_devices(pri_ad):
1247            log.error(
1248                "Failed to clear bond for primary device at attempt {}".format(
1249                    str(curr_attempts)))
1250            return False
1251        if not clear_bonded_devices(sec_ad):
1252            log.error(
1253                "Failed to clear bond for secondary device at attempt {}".
1254                format(str(curr_attempts)))
1255            return False
1256        # Wait 2 seconds after unbound
1257        time.sleep(2)
1258        curr_attempts += 1
1259    log.error("pair_pri_to_sec failed to connect after {} attempts".format(
1260        str(attempts)))
1261    return False
1262
1263
1264def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1265    # Enable discovery on sec_ad so that pri_ad can find it.
1266    # The timeout here is based on how much time it would take for two devices
1267    # to pair with each other once pri_ad starts seeing devices.
1268    pri_droid = pri_ad.droid
1269    sec_droid = sec_ad.droid
1270    pri_ad.ed.clear_all_events()
1271    sec_ad.ed.clear_all_events()
1272    log.info("Bonding device {} to {}".format(
1273        pri_droid.bluetoothGetLocalAddress(),
1274        sec_droid.bluetoothGetLocalAddress()))
1275    sec_droid.bluetoothMakeDiscoverable(bt_default_timeout)
1276    target_address = sec_droid.bluetoothGetLocalAddress()
1277    log.debug("Starting paring helper on each device")
1278    pri_droid.bluetoothStartPairingHelper(auto_confirm)
1279    sec_droid.bluetoothStartPairingHelper(auto_confirm)
1280    pri_ad.log.info("Primary device starting discovery and executing bond")
1281    result = pri_droid.bluetoothDiscoverAndBond(target_address)
1282    if not auto_confirm:
1283        if not _wait_for_passkey_match(pri_ad, sec_ad):
1284            return False
1285    # Loop until we have bonded successfully or timeout.
1286    end_time = time.time() + bt_default_timeout
1287    pri_ad.log.info("Verifying devices are bonded")
1288    while time.time() < end_time:
1289        bonded_devices = pri_droid.bluetoothGetBondedDevices()
1290        bonded = False
1291        for d in bonded_devices:
1292            if d['address'] == target_address:
1293                pri_ad.log.info("Successfully bonded to device")
1294                return True
1295        time.sleep(0.1)
1296    # Timed out trying to bond.
1297    pri_ad.log.info("Failed to bond devices.")
1298    return False
1299
1300
1301def reset_bluetooth(android_devices):
1302    """Resets Bluetooth state of input Android device list.
1303
1304    Args:
1305        android_devices: The Android device list to reset Bluetooth state on.
1306
1307    Returns:
1308        True if successful, false if unsuccessful.
1309    """
1310    for a in android_devices:
1311        droid, ed = a.droid, a.ed
1312        a.log.info("Reset state of bluetooth on device.")
1313        if droid.bluetoothCheckState() is True:
1314            droid.bluetoothToggleState(False)
1315            expected_bluetooth_off_event_name = bluetooth_off
1316            try:
1317                ed.pop_event(expected_bluetooth_off_event_name,
1318                             bt_default_timeout)
1319            except Exception:
1320                a.log.error("Failed to toggle Bluetooth off.")
1321                return False
1322        # temp sleep for b/17723234
1323        time.sleep(3)
1324        if not bluetooth_enabled_check(a):
1325            return False
1326    return True
1327
1328
1329def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
1330    """Verify that input number of advertisements can be found from the scanning
1331    Android device.
1332
1333    Args:
1334        scn_ad: The Android device to start LE scanning on.
1335        max_advertisements: The number of advertisements the scanner expects to
1336        find.
1337
1338    Returns:
1339        True if successful, false if unsuccessful.
1340    """
1341    test_result = False
1342    address_list = []
1343    filter_list = scn_ad.droid.bleGenFilterList()
1344    scn_ad.droid.bleBuildScanFilter(filter_list)
1345    scan_settings = scn_ad.droid.bleBuildScanSetting()
1346    scan_callback = scn_ad.droid.bleGenScanCallback()
1347    scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
1348    start_time = time.time()
1349    while (start_time + bt_default_timeout) > time.time():
1350        event = None
1351        try:
1352            event = scn_ad.ed.pop_event(scan_result.format(scan_callback),
1353                                        bt_default_timeout)
1354        except Empty as error:
1355            raise BtTestUtilsError(
1356                "Failed to find scan event: {}".format(error))
1357        address = event['data']['Result']['deviceInfo']['address']
1358        if address not in address_list:
1359            address_list.append(address)
1360        if len(address_list) == max_advertisements:
1361            test_result = True
1362            break
1363    scn_ad.droid.bleStopBleScan(scan_callback)
1364    return test_result
1365
1366
1367def set_bluetooth_codec(android_device,
1368                        codec_type,
1369                        sample_rate,
1370                        bits_per_sample,
1371                        channel_mode,
1372                        codec_specific_1=0):
1373    """Sets the A2DP codec configuration on the AndroidDevice.
1374
1375    Args:
1376        android_device (acts.controllers.android_device.AndroidDevice): the
1377            android device for which to switch the codec.
1378        codec_type (str): the desired codec type. Must be a key in
1379            bt_constants.codec_types.
1380        sample_rate (str): the desired sample rate. Must be a key in
1381            bt_constants.sample_rates.
1382        bits_per_sample (str): the desired bits per sample. Must be a key in
1383            bt_constants.bits_per_samples.
1384        channel_mode (str): the desired channel mode. Must be a key in
1385            bt_constants.channel_modes.
1386        codec_specific_1 (int): the desired bit rate (quality) for LDAC codec.
1387    Returns:
1388        bool: True if the codec config was successfully changed to the desired
1389            values. Else False.
1390    """
1391    message = ("Set Android Device A2DP Bluetooth codec configuration:\n"
1392               "\tCodec: {codec_type}\n"
1393               "\tSample Rate: {sample_rate}\n"
1394               "\tBits per Sample: {bits_per_sample}\n"
1395               "\tChannel Mode: {channel_mode}".format(
1396                   codec_type=codec_type,
1397                   sample_rate=sample_rate,
1398                   bits_per_sample=bits_per_sample,
1399                   channel_mode=channel_mode))
1400    android_device.log.info(message)
1401
1402    # Send SL4A command
1403    droid, ed = android_device.droid, android_device.ed
1404    if not droid.bluetoothA2dpSetCodecConfigPreference(
1405            codec_types[codec_type], sample_rates[str(sample_rate)],
1406            bits_per_samples[str(bits_per_sample)],
1407            channel_modes[channel_mode], codec_specific_1):
1408        android_device.log.warning(
1409            "SL4A command returned False. Codec was not "
1410            "changed.")
1411    else:
1412        try:
1413            ed.pop_event(bluetooth_a2dp_codec_config_changed,
1414                         bt_default_timeout)
1415        except Exception:
1416            android_device.log.warning("SL4A event not registered. Codec "
1417                                       "may not have been changed.")
1418
1419    # Validate codec value through ADB
1420    # TODO (aidanhb): validate codec more robustly using SL4A
1421    command = "dumpsys bluetooth_manager | grep -i 'current codec'"
1422    out = android_device.adb.shell(command)
1423    split_out = out.split(": ")
1424    if len(split_out) != 2:
1425        android_device.log.warning("Could not verify codec config change "
1426                                   "through ADB.")
1427    elif split_out[1].strip().upper() != codec_type:
1428        android_device.log.error("Codec config was not changed.\n"
1429                                 "\tExpected codec: {exp}\n"
1430                                 "\tActual codec: {act}".format(
1431                                     exp=codec_type, act=split_out[1].strip()))
1432        return False
1433    android_device.log.info("Bluetooth codec successfully changed.")
1434    return True
1435
1436
1437def set_bt_scan_mode(ad, scan_mode_value):
1438    """Set Android device's Bluetooth scan mode.
1439
1440    Args:
1441        ad: The Android device to set the scan mode on.
1442        scan_mode_value: The value to set the scan mode to.
1443
1444    Returns:
1445        True if successful, false if unsuccessful.
1446    """
1447    droid, ed = ad.droid, ad.ed
1448    if scan_mode_value == bt_scan_mode_types['state_off']:
1449        disable_bluetooth(droid)
1450        scan_mode = droid.bluetoothGetScanMode()
1451        reset_bluetooth([ad])
1452        if scan_mode != scan_mode_value:
1453            return False
1454    elif scan_mode_value == bt_scan_mode_types['none']:
1455        droid.bluetoothMakeUndiscoverable()
1456        scan_mode = droid.bluetoothGetScanMode()
1457        if scan_mode != scan_mode_value:
1458            return False
1459    elif scan_mode_value == bt_scan_mode_types['connectable']:
1460        droid.bluetoothMakeUndiscoverable()
1461        droid.bluetoothMakeConnectable()
1462        scan_mode = droid.bluetoothGetScanMode()
1463        if scan_mode != scan_mode_value:
1464            return False
1465    elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']):
1466        droid.bluetoothMakeDiscoverable()
1467        scan_mode = droid.bluetoothGetScanMode()
1468        if scan_mode != scan_mode_value:
1469            return False
1470    else:
1471        # invalid scan mode
1472        return False
1473    return True
1474
1475
1476def set_device_name(droid, name):
1477    """Set and check Bluetooth local name on input droid object.
1478
1479    Args:
1480        droid: Droid object to set local name on.
1481        name: the Bluetooth local name to set.
1482
1483    Returns:
1484        True if successful, false if unsuccessful.
1485    """
1486    droid.bluetoothSetLocalName(name)
1487    time.sleep(2)
1488    droid_name = droid.bluetoothGetLocalName()
1489    if droid_name != name:
1490        return False
1491    return True
1492
1493
1494def set_profile_priority(host_ad, client_ad, profiles, priority):
1495    """Sets the priority of said profile(s) on host_ad for client_ad"""
1496    for profile in profiles:
1497        host_ad.log.info("Profile {} on {} for {} set to priority {}".format(
1498            profile, host_ad.droid.bluetoothGetLocalName(),
1499            client_ad.droid.bluetoothGetLocalAddress(), priority.value))
1500        if bt_profile_constants['a2dp_sink'] == profile:
1501            host_ad.droid.bluetoothA2dpSinkSetPriority(
1502                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1503        elif bt_profile_constants['headset_client'] == profile:
1504            host_ad.droid.bluetoothHfpClientSetPriority(
1505                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1506        elif bt_profile_constants['pbap_client'] == profile:
1507            host_ad.droid.bluetoothPbapClientSetPriority(
1508                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1509        else:
1510            host_ad.log.error(
1511                "Profile {} not yet supported for priority settings".format(
1512                    profile))
1513
1514
1515def setup_multiple_devices_for_bt_test(android_devices):
1516    """A common setup routine for Bluetooth on input Android device list.
1517
1518    Things this function sets up:
1519    1. Resets Bluetooth
1520    2. Set Bluetooth local name to random string of size 4
1521    3. Disable BLE background scanning.
1522    4. Enable Bluetooth snoop logging.
1523
1524    Args:
1525        android_devices: Android device list to setup Bluetooth on.
1526
1527    Returns:
1528        True if successful, false if unsuccessful.
1529    """
1530    log.info("Setting up Android Devices")
1531    # TODO: Temp fix for an selinux error.
1532    for ad in android_devices:
1533        ad.adb.shell("setenforce 0")
1534    threads = []
1535    try:
1536        for a in android_devices:
1537            thread = threading.Thread(target=factory_reset_bluetooth,
1538                                      args=([[a]]))
1539            threads.append(thread)
1540            thread.start()
1541        for t in threads:
1542            t.join()
1543
1544        for a in android_devices:
1545            d = a.droid
1546            # TODO: Create specific RPC command to instantiate
1547            # BluetoothConnectionFacade. This is just a workaround.
1548            d.bluetoothStartConnectionStateChangeMonitor("")
1549            setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
1550            if not setup_result:
1551                a.log.error("Failed to set device name.")
1552                return setup_result
1553            d.bluetoothDisableBLE()
1554            utils.set_location_service(a, True)
1555            bonded_devices = d.bluetoothGetBondedDevices()
1556            for b in bonded_devices:
1557                a.log.info("Removing bond for device {}".format(b['address']))
1558                d.bluetoothUnbond(b['address'])
1559        for a in android_devices:
1560            a.adb.shell("setprop persist.bluetooth.btsnooplogmode full")
1561            getprop_result = a.adb.shell(
1562                "getprop persist.bluetooth.btsnooplogmode") == "full"
1563            if not getprop_result:
1564                a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.")
1565    except Exception as err:
1566        log.error("Something went wrong in multi device setup: {}".format(err))
1567        return False
1568    return setup_result
1569
1570
1571def setup_n_advertisements(adv_ad, num_advertisements):
1572    """Setup input number of advertisements on input Android device.
1573
1574    Args:
1575        adv_ad: The Android device to start LE advertisements on.
1576        num_advertisements: The number of advertisements to start.
1577
1578    Returns:
1579        advertise_callback_list: List of advertisement callback ids.
1580    """
1581    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
1582        ble_advertise_settings_modes['low_latency'])
1583    advertise_data = adv_ad.droid.bleBuildAdvertiseData()
1584    advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings()
1585    advertise_callback_list = []
1586    for i in range(num_advertisements):
1587        advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback()
1588        advertise_callback_list.append(advertise_callback)
1589        adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
1590                                            advertise_settings)
1591        try:
1592            adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
1593                                bt_default_timeout)
1594            adv_ad.log.info("Advertisement {} started.".format(i + 1))
1595        except Empty as error:
1596            adv_ad.log.error("Advertisement {} failed to start.".format(i + 1))
1597            raise BtTestUtilsError(
1598                "Test failed with Empty error: {}".format(error))
1599    return advertise_callback_list
1600
1601
1602def take_btsnoop_log(ad, testcase, testname):
1603    """Grabs the btsnoop_hci log on a device and stores it in the log directory
1604    of the test class.
1605
1606    If you want grab the btsnoop_hci log, call this function with android_device
1607    objects in on_fail. Bug report takes a relative long time to take, so use
1608    this cautiously.
1609
1610    Args:
1611        ad: The android_device instance to take bugreport on.
1612        testcase: Name of the test calss that triggered this snoop log.
1613        testname: Name of the test case that triggered this bug report.
1614    """
1615    testname = "".join(x for x in testname if x.isalnum())
1616    serial = ad.serial
1617    device_model = ad.droid.getBuildModel()
1618    device_model = device_model.replace(" ", "")
1619    out_name = ','.join((testname, device_model, serial))
1620    snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs')
1621    os.makedirs(snoop_path, exist_ok=True)
1622    cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device,
1623                   " ", snoop_path + '/' + out_name, ".btsnoop_hci.log"))
1624    exe_cmd(cmd)
1625    try:
1626        cmd = ''.join(
1627            ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ",
1628             snoop_path + '/' + out_name, ".btsnoop_hci.log.last"))
1629        exe_cmd(cmd)
1630    except Exception as err:
1631        testcase.log.info(
1632            "File does not exist {}".format(btsnoop_last_log_path_on_device))
1633
1634
1635def take_btsnoop_logs(android_devices, testcase, testname):
1636    """Pull btsnoop logs from an input list of android devices.
1637
1638    Args:
1639        android_devices: the list of Android devices to pull btsnoop logs from.
1640        testcase: Name of the test calss that triggered this snoop log.
1641        testname: Name of the test case that triggered this bug report.
1642    """
1643    for a in android_devices:
1644        take_btsnoop_log(a, testcase, testname)
1645
1646
1647def teardown_n_advertisements(adv_ad, num_advertisements,
1648                              advertise_callback_list):
1649    """Stop input number of advertisements on input Android device.
1650
1651    Args:
1652        adv_ad: The Android device to stop LE advertisements on.
1653        num_advertisements: The number of advertisements to stop.
1654        advertise_callback_list: The list of advertisement callbacks to stop.
1655
1656    Returns:
1657        True if successful, false if unsuccessful.
1658    """
1659    for n in range(num_advertisements):
1660        adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n])
1661    return True
1662
1663
1664def verify_server_and_client_connected(client_ad, server_ad, log=True):
1665    """Verify that input server and client Android devices are connected.
1666
1667    This code is under the assumption that there will only be
1668    a single connection.
1669
1670    Args:
1671        client_ad: the Android device to check number of active connections.
1672        server_ad: the Android device to check number of active connections.
1673
1674    Returns:
1675        True both server and client have at least 1 active connection,
1676        false if unsuccessful.
1677    """
1678    test_result = True
1679    if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1680        if log:
1681            server_ad.log.error("No socket connections found on server.")
1682        test_result = False
1683    if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1684        if log:
1685            client_ad.log.error("No socket connections found on client.")
1686        test_result = False
1687    return test_result
1688
1689
1690def wait_for_bluetooth_manager_state(droid,
1691                                     state=None,
1692                                     timeout=10,
1693                                     threshold=5):
1694    """ Waits for BlueTooth normalized state or normalized explicit state
1695    args:
1696        droid: droid device object
1697        state: expected BlueTooth state
1698        timeout: max timeout threshold
1699        threshold: list len of bt state
1700    Returns:
1701        True if successful, false if unsuccessful.
1702    """
1703    all_states = []
1704    get_state = lambda: droid.bluetoothGetLeState()
1705    start_time = time.time()
1706    while time.time() < start_time + timeout:
1707        all_states.append(get_state())
1708        if len(all_states) >= threshold:
1709            # for any normalized state
1710            if state is None:
1711                if len(set(all_states[-threshold:])) == 1:
1712                    log.info("State normalized {}".format(
1713                        set(all_states[-threshold:])))
1714                    return True
1715            else:
1716                # explicit check against normalized state
1717                if set([state]).issubset(all_states[-threshold:]):
1718                    return True
1719        time.sleep(0.5)
1720    log.error(
1721        "Bluetooth state fails to normalize" if state is None else
1722        "Failed to match bluetooth state, current state {} expected state {}".
1723        format(get_state(), state))
1724    return False
1725
1726
1727def _wait_for_passkey_match(pri_ad, sec_ad):
1728    pri_pin, sec_pin = -1, 1
1729    pri_variant, sec_variant = -1, 1
1730    pri_pairing_req, sec_pairing_req = None, None
1731    try:
1732        pri_pairing_req = pri_ad.ed.pop_event(
1733            event_name="BluetoothActionPairingRequest",
1734            timeout=bt_default_timeout)
1735        pri_variant = pri_pairing_req["data"]["PairingVariant"]
1736        pri_pin = pri_pairing_req["data"]["Pin"]
1737        pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format(
1738            pri_pin, pri_variant))
1739        sec_pairing_req = sec_ad.ed.pop_event(
1740            event_name="BluetoothActionPairingRequest",
1741            timeout=bt_default_timeout)
1742        sec_variant = sec_pairing_req["data"]["PairingVariant"]
1743        sec_pin = sec_pairing_req["data"]["Pin"]
1744        sec_ad.log.info(
1745            "Secondary device received Pin: {}, Variant: {}".format(
1746                sec_pin, sec_variant))
1747    except Empty as err:
1748        log.error("Wait for pin error: {}".format(err))
1749        log.error("Pairing request state, Primary: {}, Secondary: {}".format(
1750            pri_pairing_req, sec_pairing_req))
1751        return False
1752    if pri_variant == sec_variant == pairing_variant_passkey_confirmation:
1753        confirmation = pri_pin == sec_pin
1754        if confirmation:
1755            log.info("Pairing code matched, accepting connection")
1756        else:
1757            log.info("Pairing code mismatched, rejecting connection")
1758        pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1759                               str(confirmation))
1760        sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1761                               str(confirmation))
1762        if not confirmation:
1763            return False
1764    elif pri_variant != sec_variant:
1765        log.error("Pairing variant mismatched, abort connection")
1766        return False
1767    return True
1768
1769
1770def write_read_verify_data(client_ad, server_ad, msg, binary=False):
1771    """Verify that the client wrote data to the server Android device correctly.
1772
1773    Args:
1774        client_ad: the Android device to perform the write.
1775        server_ad: the Android device to read the data written.
1776        msg: the message to write.
1777        binary: if the msg arg is binary or not.
1778
1779    Returns:
1780        True if the data written matches the data read, false if not.
1781    """
1782    client_ad.log.info("Write message.")
1783    try:
1784        if binary:
1785            client_ad.droid.bluetoothSocketConnWriteBinary(msg)
1786        else:
1787            client_ad.droid.bluetoothSocketConnWrite(msg)
1788    except Exception as err:
1789        client_ad.log.error("Failed to write data: {}".format(err))
1790        return False
1791    server_ad.log.info("Read message.")
1792    try:
1793        if binary:
1794            read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip(
1795                "\r\n")
1796        else:
1797            read_msg = server_ad.droid.bluetoothSocketConnRead()
1798    except Exception as err:
1799        server_ad.log.error("Failed to read data: {}".format(err))
1800        return False
1801    log.info("Verify message.")
1802    if msg != read_msg:
1803        log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
1804        return False
1805    return True
1806
1807
1808class MediaControlOverSl4a(object):
1809    """Media control using sl4a facade for general purpose.
1810
1811    """
1812    def __init__(self, android_device, music_file):
1813        """Initialize the media_control class.
1814
1815        Args:
1816            android_dut: android_device object
1817            music_file: location of the music file
1818        """
1819        self.android_device = android_device
1820        self.music_file = music_file
1821
1822    def play(self):
1823        """Play media.
1824
1825        """
1826        self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file,
1827                                                'default', True)
1828        playing = self.android_device.droid.mediaIsPlaying()
1829        asserts.assert_true(playing,
1830                            'Failed to play music %s' % self.music_file)
1831
1832    def pause(self):
1833        """Pause media.
1834
1835        """
1836        self.android_device.droid.mediaPlayPause('default')
1837        paused = not self.android_device.droid.mediaIsPlaying()
1838        asserts.assert_true(paused,
1839                            'Failed to pause music %s' % self.music_file)
1840
1841    def resume(self):
1842        """Resume media.
1843
1844        """
1845        self.android_device.droid.mediaPlayStart('default')
1846        playing = self.android_device.droid.mediaIsPlaying()
1847        asserts.assert_true(playing,
1848                            'Failed to play music %s' % self.music_file)
1849
1850    def stop(self):
1851        """Stop media.
1852
1853        """
1854        self.android_device.droid.mediaPlayStop('default')
1855        stopped = not self.android_device.droid.mediaIsPlaying()
1856        asserts.assert_true(stopped,
1857                            'Failed to stop music %s' % self.music_file)
1858