1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Test script for concurrent Gatt connections. 18Testbed assumes 6 Android devices. One will be the central and the rest 19peripherals. 20""" 21 22from queue import Empty 23import concurrent.futures 24import threading 25import time 26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 27from acts.test_utils.bt.bt_constants import ble_scan_settings_modes 28from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes 29from acts.test_utils.bt.bt_constants import bt_profile_constants 30from acts.test_utils.bt.bt_constants import gatt_characteristic 31from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format 32from acts.test_utils.bt.bt_constants import gatt_char_desc_uuids 33from acts.test_utils.bt.bt_constants import gatt_descriptor 34from acts.test_utils.bt.bt_constants import gatt_service_types 35from acts.test_utils.bt.bt_constants import scan_result 36from acts.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor 37from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection 38from acts.test_utils.bt.gatts_lib import GattServerLib 39from acts.test_decorators import test_tracker_info 40 41service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb' 42characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630' 43descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb" 44 45gatt_server_read_descriptor_sample = { 46 'services': [{ 47 'uuid': 48 service_uuid, 49 'type': 50 gatt_service_types['primary'], 51 'characteristics': [{ 52 'uuid': 53 characteristic_uuid, 54 'properties': 55 gatt_characteristic['property_write'], 56 'permissions': 57 gatt_characteristic['permission_write'], 58 'instance_id': 59 0x002a, 60 'value_type': 61 gatt_characteristic_value_format['string'], 62 'value': 63 'Test Database', 64 'descriptors': [{ 65 'uuid': descriptor_uuid, 66 'permissions': gatt_descriptor['permission_write'], 67 }] 68 }] 69 }] 70} 71 72 73class ConcurrentGattConnectTest(BluetoothBaseTest): 74 bt_default_timeout = 10 75 max_connections = 5 76 # List of tuples (android_device, advertise_callback) 77 advertise_callbacks = [] 78 # List of tuples (android_device, advertisement_name) 79 advertisement_names = [] 80 list_of_arguments_list = [] 81 82 def setup_class(self): 83 super(BluetoothBaseTest, self).setup_class() 84 self.pri_dut = self.android_devices[0] 85 86 87 # Create 5 advertisements from different android devices 88 for i in range(1, self.max_connections + 1): 89 # Set device name 90 ad = self.android_devices[i] 91 name = "test_adv_{}".format(i) 92 self.advertisement_names.append((ad, name)) 93 ad.droid.bluetoothSetLocalName(name) 94 95 # Setup and start advertisements 96 ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 97 ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 98 ble_advertise_settings_modes['low_latency']) 99 advertise_data = ad.droid.bleBuildAdvertiseData() 100 advertise_settings = ad.droid.bleBuildAdvertiseSettings() 101 advertise_callback = ad.droid.bleGenBleAdvertiseCallback() 102 ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 103 advertise_settings) 104 self.advertise_callbacks.append((ad, advertise_callback)) 105 106 def obtain_address_list_from_scan(self): 107 """Returns the address list of all devices that match the scan filter. 108 109 Returns: 110 A list if all devices are found; None is any devices are not found. 111 """ 112 # From central device, scan for all appropriate addresses by name. 113 filter_list = self.pri_dut.droid.bleGenFilterList() 114 self.pri_dut.droid.bleSetScanSettingsScanMode( 115 ble_scan_settings_modes['low_latency']) 116 scan_settings = self.pri_dut.droid.bleBuildScanSetting() 117 scan_callback = self.pri_dut.droid.bleGenScanCallback() 118 for android_device, name in self.advertisement_names: 119 self.pri_dut.droid.bleSetScanFilterDeviceName(name) 120 self.pri_dut.droid.bleBuildScanFilter(filter_list) 121 self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings, 122 scan_callback) 123 address_list = [] 124 devices_found = [] 125 # Set the scan time out to 20 sec to provide enough time to discover the 126 # devices in busy environment 127 scan_timeout = 20 128 end_time = time.time() + scan_timeout 129 while time.time() < end_time and len(address_list) < len( 130 self.advertisement_names): 131 try: 132 event = self.pri_dut.ed.pop_event( 133 "BleScan{}onScanResults".format(scan_callback), 134 self.bt_default_timeout) 135 136 adv_name = event['data']['Result']['deviceInfo']['name'] 137 mac_address = event['data']['Result']['deviceInfo']['address'] 138 # Look up the android device handle based on event name 139 device = [ 140 item for item in self.advertisement_names 141 if adv_name in item 142 ] 143 devices_found.append(device[0][0].serial) 144 if len(device) is not 0: 145 address_list_tuple = (device[0][0], mac_address) 146 else: 147 continue 148 result = [item for item in address_list if mac_address in item] 149 # if length of result is 0, it indicates that we have discovered 150 # new mac address. 151 if len(result) is 0: 152 self.log.info("Found new mac address: {}".format( 153 address_list_tuple[1])) 154 address_list.append(address_list_tuple) 155 except Empty as err: 156 self.log.error("Failed to find any scan results.") 157 return None 158 if len(address_list) < self.max_connections: 159 self.log.info("Only found these devices: {}".format(devices_found)) 160 self.log.error("Could not find all necessary advertisements.") 161 return None 162 return address_list 163 164 @BluetoothBaseTest.bt_test_wrap 165 @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f') 166 def test_concurrent_gatt_connections(self): 167 """Test max concurrent GATT connections 168 169 Connect to all peripherals. 170 171 Steps: 172 1. Scan 173 2. Save addresses 174 3. Connect all addresses of the peripherals 175 176 Expected Result: 177 All connections successful. 178 179 Returns: 180 Pass if True 181 Fail if False 182 183 TAGS: Bluetooth, GATT 184 Priority: 2 185 """ 186 187 address_list = self.obtain_address_list_from_scan() 188 if address_list is None: 189 return False 190 191 # Connect to all addresses 192 for address_tuple in address_list: 193 address = address_tuple[1] 194 try: 195 autoconnect = False 196 bluetooth_gatt, gatt_callback = setup_gatt_connection( 197 self.pri_dut, address, autoconnect) 198 self.log.info("Successfully connected to {}".format(address)) 199 except Exception as err: 200 self.log.error( 201 "Failed to establish connection to {}".format(address)) 202 return False 203 if (len( 204 self.pri_dut.droid.bluetoothGetConnectedLeDevices( 205 bt_profile_constants['gatt_server'])) != 206 self.max_connections): 207 self.log.error("Did not reach max connection count.") 208 return False 209 210 return True 211 212 @BluetoothBaseTest.bt_test_wrap 213 @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f') 214 def test_data_transfer_to_concurrent_gatt_connections(self): 215 """Test writing GATT descriptors concurrently to many peripherals. 216 217 Connect to all peripherals and write gatt descriptors concurrently. 218 219 220 Steps: 221 1. Scan the addresses by names 222 2. Save mac addresses of the peripherals 223 3. Connect all addresses of the peripherals and write gatt descriptors 224 225 226 Expected Result: 227 All connections and data transfers are successful. 228 229 Returns: 230 Pass if True 231 Fail if False 232 233 TAGS: Bluetooth, GATT 234 Priority: 2 235 """ 236 237 address_list = self.obtain_address_list_from_scan() 238 if address_list is None: 239 return False 240 241 # Connect to all addresses 242 executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) 243 244 for address_tuple in address_list: 245 ad, address = address_tuple 246 247 gatts = GattServerLib(log=self.log, dut=ad) 248 gatt_server, gatt_server_callback = gatts.setup_gatts_db( 249 database=gatt_server_read_descriptor_sample) 250 251 try: 252 bluetooth_gatt, gatt_callback = setup_gatt_connection( 253 self.pri_dut, address, autoconnect=False) 254 self.log.info("Successfully connected to {}".format(address)) 255 256 except Exception as err: 257 self.log.error( 258 "Failed to establish connection to {}".format(address)) 259 return False 260 261 if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt): 262 event = self.pri_dut.ed.pop_event( 263 "GattConnect{}onServicesDiscovered".format(bluetooth_gatt), 264 self.bt_default_timeout) 265 discovered_services_index = event['data']['ServicesIndex'] 266 else: 267 self.log.info("Failed to discover services.") 268 return False 269 services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount( 270 discovered_services_index) 271 272 arguments_list = [ 273 self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed, 274 gatt_server, gatt_server_callback, bluetooth_gatt, 275 services_count, discovered_services_index, 100 276 ] 277 self.list_of_arguments_list.append(arguments_list) 278 279 for arguments_list in self.list_of_arguments_list: 280 executor.submit(run_continuous_write_descriptor, *arguments_list) 281 282 executor.shutdown(wait=True) 283 284 if (len( 285 self.pri_dut.droid.bluetoothGetConnectedLeDevices( 286 bt_profile_constants['gatt_server'])) != 287 self.max_connections): 288 self.log.error("Failed to write concurrently.") 289 return False 290 291 return True 292