1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Test script for concurrent Gatt connections.
18Testbed assumes 6 Android devices. One will be the central and the rest
19peripherals.
20"""
21
22from queue import Empty
23import concurrent.futures
24import threading
25import time
26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
27from acts.test_utils.bt.bt_constants import ble_scan_settings_modes
28from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes
29from acts.test_utils.bt.bt_constants import bt_profile_constants
30from acts.test_utils.bt.bt_constants import gatt_characteristic
31from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format
32from acts.test_utils.bt.bt_constants import gatt_char_desc_uuids
33from acts.test_utils.bt.bt_constants import gatt_descriptor
34from acts.test_utils.bt.bt_constants import gatt_service_types
35from acts.test_utils.bt.bt_constants import scan_result
36from acts.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor
37from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection
38from acts.test_utils.bt.gatts_lib import GattServerLib
39from acts.test_decorators import test_tracker_info
40
41service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb'
42characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630'
43descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb"
44
45gatt_server_read_descriptor_sample = {
46    'services': [{
47        'uuid':
48        service_uuid,
49        'type':
50        gatt_service_types['primary'],
51        'characteristics': [{
52            'uuid':
53            characteristic_uuid,
54            'properties':
55            gatt_characteristic['property_write'],
56            'permissions':
57            gatt_characteristic['permission_write'],
58            'instance_id':
59            0x002a,
60            'value_type':
61            gatt_characteristic_value_format['string'],
62            'value':
63            'Test Database',
64            'descriptors': [{
65                'uuid': descriptor_uuid,
66                'permissions': gatt_descriptor['permission_write'],
67            }]
68        }]
69    }]
70}
71
72
73class ConcurrentGattConnectTest(BluetoothBaseTest):
74    bt_default_timeout = 10
75    max_connections = 5
76    # List of tuples (android_device, advertise_callback)
77    advertise_callbacks = []
78    # List of tuples (android_device, advertisement_name)
79    advertisement_names = []
80    list_of_arguments_list = []
81
82    def setup_class(self):
83        super(BluetoothBaseTest, self).setup_class()
84        self.pri_dut = self.android_devices[0]
85
86
87        # Create 5 advertisements from different android devices
88        for i in range(1, self.max_connections + 1):
89            # Set device name
90            ad = self.android_devices[i]
91            name = "test_adv_{}".format(i)
92            self.advertisement_names.append((ad, name))
93            ad.droid.bluetoothSetLocalName(name)
94
95            # Setup and start advertisements
96            ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
97            ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
98                ble_advertise_settings_modes['low_latency'])
99            advertise_data = ad.droid.bleBuildAdvertiseData()
100            advertise_settings = ad.droid.bleBuildAdvertiseSettings()
101            advertise_callback = ad.droid.bleGenBleAdvertiseCallback()
102            ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
103                                            advertise_settings)
104            self.advertise_callbacks.append((ad, advertise_callback))
105
106    def obtain_address_list_from_scan(self):
107        """Returns the address list of all devices that match the scan filter.
108
109        Returns:
110          A list if all devices are found; None is any devices are not found.
111        """
112        # From central device, scan for all appropriate addresses by name.
113        filter_list = self.pri_dut.droid.bleGenFilterList()
114        self.pri_dut.droid.bleSetScanSettingsScanMode(
115            ble_scan_settings_modes['low_latency'])
116        scan_settings = self.pri_dut.droid.bleBuildScanSetting()
117        scan_callback = self.pri_dut.droid.bleGenScanCallback()
118        for android_device, name in self.advertisement_names:
119            self.pri_dut.droid.bleSetScanFilterDeviceName(name)
120            self.pri_dut.droid.bleBuildScanFilter(filter_list)
121        self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings,
122                                           scan_callback)
123        address_list = []
124        devices_found = []
125        # Set the scan time out to 20 sec to provide enough time to discover the
126        # devices in busy environment
127        scan_timeout = 20
128        end_time = time.time() + scan_timeout
129        while time.time() < end_time and len(address_list) < len(
130                self.advertisement_names):
131            try:
132                event = self.pri_dut.ed.pop_event(
133                    "BleScan{}onScanResults".format(scan_callback),
134                    self.bt_default_timeout)
135
136                adv_name = event['data']['Result']['deviceInfo']['name']
137                mac_address = event['data']['Result']['deviceInfo']['address']
138                # Look up the android device handle based on event name
139                device = [
140                    item for item in self.advertisement_names
141                    if adv_name in item
142                ]
143                devices_found.append(device[0][0].serial)
144                if len(device) is not 0:
145                    address_list_tuple = (device[0][0], mac_address)
146                else:
147                    continue
148                result = [item for item in address_list if mac_address in item]
149                # if length of result is 0, it indicates that we have discovered
150                # new mac address.
151                if len(result) is 0:
152                    self.log.info("Found new mac address: {}".format(
153                        address_list_tuple[1]))
154                    address_list.append(address_list_tuple)
155            except Empty as err:
156                self.log.error("Failed to find any scan results.")
157                return None
158        if len(address_list) < self.max_connections:
159            self.log.info("Only found these devices: {}".format(devices_found))
160            self.log.error("Could not find all necessary advertisements.")
161            return None
162        return address_list
163
164    @BluetoothBaseTest.bt_test_wrap
165    @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f')
166    def test_concurrent_gatt_connections(self):
167        """Test max concurrent GATT connections
168
169        Connect to all peripherals.
170
171        Steps:
172        1. Scan
173        2. Save addresses
174        3. Connect all addresses of the peripherals
175
176        Expected Result:
177        All connections successful.
178
179        Returns:
180          Pass if True
181          Fail if False
182
183        TAGS: Bluetooth, GATT
184        Priority: 2
185        """
186
187        address_list = self.obtain_address_list_from_scan()
188        if address_list is None:
189            return False
190
191        # Connect to all addresses
192        for address_tuple in address_list:
193            address = address_tuple[1]
194            try:
195                autoconnect = False
196                bluetooth_gatt, gatt_callback = setup_gatt_connection(
197                    self.pri_dut, address, autoconnect)
198                self.log.info("Successfully connected to {}".format(address))
199            except Exception as err:
200                self.log.error(
201                    "Failed to establish connection to {}".format(address))
202                return False
203        if (len(
204                self.pri_dut.droid.bluetoothGetConnectedLeDevices(
205                    bt_profile_constants['gatt_server'])) !=
206                self.max_connections):
207            self.log.error("Did not reach max connection count.")
208            return False
209
210        return True
211
212    @BluetoothBaseTest.bt_test_wrap
213    @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f')
214    def test_data_transfer_to_concurrent_gatt_connections(self):
215        """Test writing GATT descriptors concurrently to many peripherals.
216
217        Connect to all peripherals and write gatt descriptors concurrently.
218
219
220        Steps:
221        1. Scan the addresses by names
222        2. Save mac addresses of the peripherals
223        3. Connect all addresses of the peripherals and write gatt descriptors
224
225
226        Expected Result:
227        All connections and data transfers are successful.
228
229        Returns:
230          Pass if True
231          Fail if False
232
233        TAGS: Bluetooth, GATT
234        Priority: 2
235        """
236
237        address_list = self.obtain_address_list_from_scan()
238        if address_list is None:
239            return False
240
241        # Connect to all addresses
242        executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
243
244        for address_tuple in address_list:
245            ad, address = address_tuple
246
247            gatts = GattServerLib(log=self.log, dut=ad)
248            gatt_server, gatt_server_callback = gatts.setup_gatts_db(
249                database=gatt_server_read_descriptor_sample)
250
251            try:
252                bluetooth_gatt, gatt_callback = setup_gatt_connection(
253                    self.pri_dut, address, autoconnect=False)
254                self.log.info("Successfully connected to {}".format(address))
255
256            except Exception as err:
257                self.log.error(
258                    "Failed to establish connection to {}".format(address))
259                return False
260
261            if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt):
262                event = self.pri_dut.ed.pop_event(
263                    "GattConnect{}onServicesDiscovered".format(bluetooth_gatt),
264                    self.bt_default_timeout)
265                discovered_services_index = event['data']['ServicesIndex']
266            else:
267                self.log.info("Failed to discover services.")
268                return False
269            services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount(
270                discovered_services_index)
271
272            arguments_list = [
273                self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed,
274                gatt_server, gatt_server_callback, bluetooth_gatt,
275                services_count, discovered_services_index, 100
276            ]
277            self.list_of_arguments_list.append(arguments_list)
278
279        for arguments_list in self.list_of_arguments_list:
280            executor.submit(run_continuous_write_descriptor, *arguments_list)
281
282        executor.shutdown(wait=True)
283
284        if (len(
285                self.pri_dut.droid.bluetoothGetConnectedLeDevices(
286                    bt_profile_constants['gatt_server'])) !=
287                self.max_connections):
288            self.log.error("Failed to write concurrently.")
289            return False
290
291        return True
292