1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Ble libraries
18"""
19
20from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes
21from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
22from acts.test_utils.bt.bt_constants import ble_scan_settings_modes
23from acts.test_utils.bt.bt_constants import small_timeout
24from acts.test_utils.bt.bt_constants import adv_fail
25from acts.test_utils.bt.bt_constants import adv_succ
26from acts.test_utils.bt.bt_constants import advertising_set_on_own_address_read
27from acts.test_utils.bt.bt_constants import advertising_set_started
28from acts.test_utils.bt.bt_test_utils import generate_ble_advertise_objects
29
30import time
31import os
32
33
34class BleLib():
35    def __init__(self, log, dut):
36        self.advertisement_list = []
37        self.dut = dut
38        self.log = log
39        self.default_timeout = 5
40        self.set_advertisement_list = []
41        self.generic_uuid = "0000{}-0000-1000-8000-00805f9b34fb"
42
43    def _verify_ble_adv_started(self, advertise_callback):
44        """Helper for verifying if an advertisment started or not"""
45        regex = "({}|{})".format(
46            adv_succ.format(advertise_callback),
47            adv_fail.format(advertise_callback))
48        try:
49            event = self.dut.ed.pop_events(regex, 5, small_timeout)
50        except Empty:
51            self.dut.log.error("Failed to get success or failed event.")
52            return
53        if event[0]["name"] == adv_succ.format(advertise_callback):
54            self.dut.log.info("Advertisement started successfully.")
55            return True
56        else:
57            self.dut.log.info("Advertisement failed to start.")
58            return False
59
60    def start_generic_connectable_advertisement(self, line):
61        """Start a connectable LE advertisement"""
62        scan_response = None
63        if line:
64            scan_response = bool(line)
65        self.dut.droid.bleSetAdvertiseSettingsAdvertiseMode(
66            ble_advertise_settings_modes['low_latency'])
67        self.dut.droid.bleSetAdvertiseSettingsIsConnectable(True)
68        advertise_callback, advertise_data, advertise_settings = (
69            generate_ble_advertise_objects(self.dut.droid))
70        if scan_response:
71            self.dut.droid.bleStartBleAdvertisingWithScanResponse(
72                advertise_callback, advertise_data, advertise_settings,
73                advertise_data)
74        else:
75            self.dut.droid.bleStartBleAdvertising(
76                advertise_callback, advertise_data, advertise_settings)
77        if self._verify_ble_adv_started(advertise_callback):
78            self.log.info(
79                "Tracking Callback ID: {}".format(advertise_callback))
80            self.advertisement_list.append(advertise_callback)
81            self.log.info(self.advertisement_list)
82
83    def start_connectable_advertisement_set(self, line):
84        """Start Connectable Advertisement Set"""
85        adv_callback = self.dut.droid.bleAdvSetGenCallback()
86        adv_data = {
87            "includeDeviceName": True,
88        }
89        self.dut.droid.bleAdvSetStartAdvertisingSet(
90            {
91                "connectable": True,
92                "legacyMode": False,
93                "primaryPhy": "PHY_LE_1M",
94                "secondaryPhy": "PHY_LE_1M",
95                "interval": 320
96            }, adv_data, None, None, None, 0, 0, adv_callback)
97        evt = self.dut.ed.pop_event(
98            advertising_set_started.format(adv_callback), self.default_timeout)
99        set_id = evt['data']['setId']
100        self.log.error("did not receive the set started event!")
101        evt = self.dut.ed.pop_event(
102            advertising_set_on_own_address_read.format(set_id),
103            self.default_timeout)
104        address = evt['data']['address']
105        self.log.info("Advertiser address is: {}".format(str(address)))
106        self.set_advertisement_list.append(adv_callback)
107
108    def stop_all_advertisement_set(self, line):
109        """Stop all Advertisement Sets"""
110        for adv in self.set_advertisement_list:
111            try:
112                self.dut.droid.bleAdvSetStopAdvertisingSet(adv)
113            except Exception as err:
114                self.log.error("Failed to stop advertisement: {}".format(err))
115
116    def adv_add_service_uuid_list(self, line):
117        """Add service UUID to the LE advertisement inputs:
118         [uuid1 uuid2 ... uuidN]"""
119        uuids = line.split()
120        uuid_list = []
121        for uuid in uuids:
122            if len(uuid) == 4:
123                uuid = self.generic_uuid.format(line)
124            uuid_list.append(uuid)
125        self.dut.droid.bleSetAdvertiseDataSetServiceUuids(uuid_list)
126
127    def adv_data_include_local_name(self, is_included):
128        """Include local name in the advertisement. inputs: [true|false]"""
129        self.dut.droid.bleSetAdvertiseDataIncludeDeviceName(bool(is_included))
130
131    def adv_data_include_tx_power_level(self, is_included):
132        """Include tx power level in the advertisement. inputs: [true|false]"""
133        self.dut.droid.bleSetAdvertiseDataIncludeTxPowerLevel(
134            bool(is_included))
135
136    def adv_data_add_manufacturer_data(self, line):
137        """Include manufacturer id and data to the advertisment:
138        [id data1 data2 ... dataN]"""
139        info = line.split()
140        manu_id = int(info[0])
141        manu_data = []
142        for data in info[1:]:
143            manu_data.append(int(data))
144        self.dut.droid.bleAddAdvertiseDataManufacturerId(manu_id, manu_data)
145
146    def start_generic_nonconnectable_advertisement(self, line):
147        """Start a nonconnectable LE advertisement"""
148        self.dut.droid.bleSetAdvertiseSettingsAdvertiseMode(
149            ble_advertise_settings_modes['low_latency'])
150        self.dut.droid.bleSetAdvertiseSettingsIsConnectable(False)
151        advertise_callback, advertise_data, advertise_settings = (
152            generate_ble_advertise_objects(self.dut.droid))
153        self.dut.droid.bleStartBleAdvertising(
154            advertise_callback, advertise_data, advertise_settings)
155        if self._verify_ble_adv_started(advertise_callback):
156            self.log.info(
157                "Tracking Callback ID: {}".format(advertise_callback))
158            self.advertisement_list.append(advertise_callback)
159            self.log.info(self.advertisement_list)
160
161    def stop_all_advertisements(self, line):
162        """Stop all LE advertisements"""
163        for callback_id in self.advertisement_list:
164            self.log.info("Stopping Advertisement {}".format(callback_id))
165            self.dut.droid.bleStopBleAdvertising(callback_id)
166            time.sleep(1)
167        self.advertisement_list = []
168
169    def ble_stop_advertisement(self, callback_id):
170        """Stop an LE advertisement"""
171        if not callback_id:
172            self.log.info("Need a callback ID")
173            return
174        callback_id = int(callback_id)
175        if callback_id not in self.advertisement_list:
176            self.log.info("Callback not in list of advertisements.")
177            return
178        self.dut.droid.bleStopBleAdvertising(callback_id)
179        self.advertisement_list.remove(callback_id)
180
181    def start_max_advertisements(self, line):
182        scan_response = None
183        if line:
184            scan_response = bool(line)
185        while (True):
186            try:
187                self.dut.droid.bleSetAdvertiseSettingsAdvertiseMode(
188                    ble_advertise_settings_modes['low_latency'])
189                self.dut.droid.bleSetAdvertiseSettingsIsConnectable(True)
190                advertise_callback, advertise_data, advertise_settings = (
191                    generate_ble_advertise_objects(self.dut.droid))
192                if scan_response:
193                    self.dut.droid.bleStartBleAdvertisingWithScanResponse(
194                        advertise_callback, advertise_data, advertise_settings,
195                        advertise_data)
196                else:
197                    self.dut.droid.bleStartBleAdvertising(
198                        advertise_callback, advertise_data, advertise_settings)
199                if self._verify_ble_adv_started(advertise_callback):
200                    self.log.info(
201                        "Tracking Callback ID: {}".format(advertise_callback))
202                    self.advertisement_list.append(advertise_callback)
203                    self.log.info(self.advertisement_list)
204                else:
205                    self.log.info("Advertisements active: {}".format(
206                        len(self.advertisement_list)))
207                    return False
208            except Exception as err:
209                self.log.info("Advertisements active: {}".format(
210                    len(self.advertisement_list)))
211                return True
212