1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script exercises different testcases with a lot of ble beacon traffic.
18
19This test script was designed with this setup in mind:
20Shield box one: Android Device as DUT. 7x Sprout devices acting as 192 beacons
21"""
22
23import threading
24
25from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
26from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseMode
27from acts.test_utils.bt.BleEnum import ScanSettingsScanMode
28from acts.test_utils.bt.bt_test_utils import adv_succ
29from acts.test_utils.bt.bt_test_utils import batch_scan_result
30from acts.test_utils.bt.bt_test_utils import scan_result
31from acts.test_utils.bt.bt_test_utils import generate_ble_advertise_objects
32from acts.test_utils.bt.bt_test_utils import generate_ble_scan_objects
33from acts.test_utils.bt.bt_test_utils import reset_bluetooth
34from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
35from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
36
37
38class BeaconSwarmTest(BluetoothBaseTest):
39    default_timeout = 10
40    beacon_swarm_count = 0
41    advertising_device_name_list = []
42    discovered_mac_address_list = []
43
44    def setup_test(self):
45        self.discovered_mac_address_list = []
46        for a in self.android_devices:
47            a.ed.clear_all_events()
48        return True
49
50    def teardown_test(self):
51        reset_bluetooth([self.android_devices[0]])
52        return True
53
54    def setup_class(self):
55        self.scn_ad = self.android_devices[0]
56        if not setup_multiple_devices_for_bt_test(self.android_devices):
57            return False
58        return self._start_special_advertisements()
59
60    def cleanup_class(self):
61        return reset_bluetooth(self.android_devices)
62
63    def on_fail(self, test_name, begin_time):
64        take_btsnoop_logs(self.android_devices, self, test_name)
65        reset_bluetooth([self.scn_ad])
66
67    def _start_advertisements_thread(self, ad, beacon_count, restart=False):
68        d, e = ad.droid, ad.ed
69        if restart:
70            try:
71                reset_bluetooth([ad])
72            except Exception:
73                self.log.debug("Failed resetting Bluetooth, continuing...")
74                return
75        try:
76            for _ in range(beacon_count):
77                d.bleSetAdvertiseDataIncludeDeviceName(True)
78                d.bleSetAdvertiseSettingsAdvertiseMode(
79                    AdvertiseSettingsAdvertiseMode.ADVERTISE_MODE_LOW_LATENCY.
80                    value)
81                advertise_callback, advertise_data, advertise_settings = (
82                    generate_ble_advertise_objects(d))
83                d.bleStartBleAdvertising(advertise_callback, advertise_data,
84                                         advertise_settings)
85                try:
86                    e.pop_event(
87                        adv_succ.format(advertise_callback),
88                        self.default_timeout)
89                    self.beacon_swarm_count += 1
90                    local_bt_name = d.bluetoothGetLocalName()
91                    if local_bt_name not in self.advertising_device_name_list:
92                        self.advertising_device_name_list.append(
93                            d.bluetoothGetLocalName())
94                except Exception as e:
95                    self.log.info("Advertising failed due to " + str(e))
96                self.log.info("Beacons active: {}".format(
97                    self.beacon_swarm_count))
98        except Exception:
99            self.log.debug(
100                "Something went wrong in starting advertisements, continuing.")
101        return
102
103    def _start_special_advertisements(self):
104        self.log.info("Setting up advertisements.")
105        beacon_serials = []
106        beacon_count = 0
107        try:
108            beacon_serials = self.user_params['beacon_devices']
109            beacon_count = self.user_params['beacon_count']
110        except AttributeError:
111            self.log.info(
112                "No controllable devices connected to create beacons with."
113                " Continuing...")
114        threads = []
115        for a in self.android_devices:
116            d, e = a.droid, a.ed
117            serial_no = a.serial
118            if serial_no not in beacon_serials:
119                continue
120            thread = threading.Thread(target=self._start_advertisements_thread,
121                                      args=(d, e, beacon_count))
122            threads.append(thread)
123            thread.start()
124        for t in threads:
125            t.join()
126        if self.beacon_swarm_count < (beacon_count * len(beacon_serials)):
127            self.log.error("Not enough beacons advertising: {}".format(
128                self.beacon_swarm_count))
129            return False
130        return True
131
132    def _restart_special_advertisements_thread(self):
133        beacon_serials = []
134        beacon_count = 0
135        try:
136            beacon_serials = self.user_params['beacon_devices']
137            beacon_count = self.user_params['beacon_count']
138        except AttributeError:
139            self.log.info("No controllable devices connected to create beacons"
140                          " with. Continuing...")
141        threads = []
142        while True:
143            self.log.info("Restarting advertisements.")
144            for a in self.android_devices:
145                d, e = a.droid, a.ed
146                serial_no = a.serial
147                if serial_no not in beacon_serials:
148                    continue
149                thread = threading.Thread(
150                    target=self._start_advertisements_thread,
151                    args=(d, e, beacon_count, True))
152                threads.append(thread)
153                thread.start()
154            for t in threads:
155                t.join()
156        return True
157
158    def test_swarm_1000_on_scan_result(self):
159        """Test LE scanning in a mass beacon deployment.
160
161        Test finding 1000 LE scan results in a mass beacon deployment.
162
163        Steps:
164        1. Assume that mass beacon deployment is setup.
165        2. Set LE scanning mode to low latency.
166        3. Start LE scan.
167        4. Pop scan results off the event dispatcher 1000 times.
168        5. Stop LE scanning.
169
170        Expected Result:
171        1000 scan results should be found without any exceptions.
172
173        Returns:
174          Pass if True
175          Fail if False
176
177        TAGS: LE, Scanning, Beacon
178        Priority: 1
179        """
180        self.scn_ad.droid.bleSetScanSettingsScanMode(
181            ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
182        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
183            self.scn_ad.droid)
184        self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
185                                          scan_callback)
186        for _ in range(1000000):
187            event_info = self.scn_ad.ed.pop_event(
188                scan_result.format(scan_callback), self.default_timeout)
189            mac_address = event_info['data']['Result']['deviceInfo']['address']
190            if mac_address not in self.discovered_mac_address_list:
191                self.discovered_mac_address_list.append(mac_address)
192                self.log.info("Discovered {} different devices.".format(len(
193                    self.discovered_mac_address_list)))
194        self.log.debug("Discovered {} different devices.".format(len(
195            self.discovered_mac_address_list)))
196        self.scn_ad.droid.bleStopBleScan(scan_callback)
197        return True
198
199    def test_swarm_10000_on_batch_scan_result(self):
200        """Test LE batch scanning in a mass beacon deployment.
201
202        Test finding 10000 LE batch scan results in a mass beacon deployment.
203
204        Steps:
205        1. Assume that mass beacon deployment is setup.
206        2. Set LE scanning mode to low latency and report delay millis to 1
207        second.
208        3. Start LE scan.
209        4. Pop batch scan results off the event dispatcher 10000 times.
210        5. Stop LE scanning.
211
212        Expected Result:
213        1000 scan results should be found without any exceptions.
214
215        Returns:
216          Pass if True
217          Fail if False
218
219        TAGS: LE, Scanning, Beacon
220        Priority: 1
221        """
222        self.scn_ad.droid.bleSetScanSettingsScanMode(
223            ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
224        self.scn_ad.droid.bleSetScanSettingsReportDelayMillis(1000)
225        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
226            self.scn_ad.droid)
227        self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
228                                          scan_callback)
229        for _ in range(10000):
230            event_info = self.scn_ad.ed.pop_event(
231                batch_scan_result.format(scan_callback), self.default_timeout)
232            for result in event_info['data']['Results']:
233                mac_address = result['deviceInfo']['address']
234                if mac_address not in self.discovered_mac_address_list:
235                    self.discovered_mac_address_list.append(mac_address)
236        self.log.info("Discovered {} different devices.".format(len(
237            self.discovered_mac_address_list)))
238        self.scn_ad.droid.bleStopBleScan(scan_callback)
239        return True
240
241    def test_swarm_scan_result_filter_each_device_name(self):
242        """Test basic LE scan filtering in a mass beacon deployment.
243
244        Test finding LE scan results in a mass beacon deployment. This
245        test specifically tests scan filtering of different device names and
246        that each device name is found.
247
248        Steps:
249        1. Assume that mass beacon deployment is setup with device names
250        advertising.
251        2. Set LE scanning mode to low latency.
252        3. Filter device name from one of the known advertising device names
253        4. Start LE scan.
254        5. Pop scan results matching the scan filter.
255        6. Stop LE scanning.
256        7. Repeat steps 2-6 until all advertising device names are found.
257
258        Expected Result:
259        All advertising beacons are found by their device name.
260
261        Returns:
262          Pass if True
263          Fail if False
264
265        TAGS: LE, Scanning, Beacon, Filtering
266        Priority: 1
267        """
268        for filter_name in self.advertising_device_name_list:
269            self.scn_ad.droid.bleSetScanSettingsScanMode(
270                ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
271            filter_list, scan_settings, scan_callback = (
272                generate_ble_scan_objects(self.scn_ad.droid))
273            try:
274                self.scn_ad.droid.bleSetScanFilterDeviceName(filter_name)
275                self.scn_ad.droid.bleBuildScanFilter(filter_list)
276                self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
277                                                  scan_callback)
278                self.log.debug(self.scn_ad.ed.pop_event(
279                    scan_result.format(scan_callback), self.default_timeout))
280            except Exception:
281                self.log.info("Couldn't find advertiser name {}.".format(
282                    filter_name))
283                return False
284            self.scn_ad.droid.bleStopBleScan(scan_callback)
285        return True
286
287    def test_swarm_rotate_addresses(self):
288        """Test basic LE scan filtering in a mass beacon deployment.
289
290        Test finding LE scan results in a mass beacon deployment. This test
291        rotates the mac address of the advertising devices at a consistent
292        interval in order to make the scanning device think there are
293        thousands of devices nearby.
294
295        Steps:
296        1. Assume that mass beacon deployment is setup with device names
297        advertising.
298        2. Set LE scanning mode to low latency on 28 scan instances.
299        3. Start LE scan on each of the scan instances.
300        5. Continuously Pop scan results matching the scan filter.
301        6. Rotate mac address of each advertising device sequentially.
302        7. 5-6 10,000 times.
303        8. Stop LE scanning
304
305        Expected Result:
306        The Bluetooth stack doesn't crash.
307
308        Returns:
309          Pass if True
310          Fail if False
311
312        TAGS: LE, Scanning, Beacon
313        Priority: 1
314        """
315        scan_callback_list = []
316        for _ in range(28):
317            self.scn_ad.droid.bleSetScanSettingsScanMode(
318                ScanSettingsScanMode.SCAN_MODE_LOW_LATENCY.value)
319            filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
320                self.scn_ad.droid)
321            self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
322                                              scan_callback)
323            scan_callback_list.append(scan_callback)
324        thread = threading.Thread(
325            target=self._restart_special_advertisements_thread,
326            args=())
327        thread.start()
328        n = 0
329        while n < 10000:
330            for cb in scan_callback_list:
331                event_info = self.scn_ad.ed.pop_event(
332                    scan_result.format(cb), self.default_timeout)
333                mac_address = event_info['data']['Result']['deviceInfo'][
334                    'address']
335                if mac_address not in self.discovered_mac_address_list:
336                    self.discovered_mac_address_list.append(mac_address)
337                self.log.info("Discovered {} different devices.".format(len(
338                    self.discovered_mac_address_list)))
339                n += 1
340        self.scn_ad.droid.bleStopBleScan(scan_callback)
341        return True
342