1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17This is base class for tests that exercises different GATT procedures between two connected devices. 18Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery. 19""" 20 21from queue import Empty 22 23from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 24from acts.test_utils.bt.bt_constants import gatt_characteristic 25from acts.test_utils.bt.bt_constants import gatt_descriptor 26from acts.test_utils.bt.bt_constants import gatt_service_types 27from acts.test_utils.bt.bt_constants import gatt_event 28from acts.test_utils.bt.bt_constants import gatt_cb_err 29from acts.test_utils.bt.bt_constants import gatt_cb_strings 30from acts.test_utils.bt.bt_constants import gatt_mtu_size 31from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection 32from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection 33from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics 34from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors 35from acts.test_utils.bt.bt_constants import gatt_char_desc_uuids 36from acts.test_utils.bt.bt_constants import bt_default_timeout 37 38 39class GattConnectedBaseTest(BluetoothBaseTest): 40 41 TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" 42 READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8" 43 READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 44 WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 45 WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32" 46 NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77" 47 48 def setup_class(self): 49 super().setup_class() 50 self.cen_ad = self.android_devices[0] 51 self.per_ad = self.android_devices[1] 52 53 def setup_test(self): 54 super(GattConnectedBaseTest, self).setup_test() 55 56 self.gatt_server_callback, self.gatt_server = self._setup_multiple_services( 57 ) 58 if not self.gatt_server_callback or not self.gatt_server: 59 raise AssertionError('Service setup failed') 60 61 self.bluetooth_gatt, self.gatt_callback, self.adv_callback = ( 62 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 63 self.per_ad.droid.bleStopBleAdvertising(self.adv_callback) 64 65 self.mtu = gatt_mtu_size['min'] 66 67 if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt): 68 event = self._client_wait(gatt_event['gatt_serv_disc']) 69 self.discovered_services_index = event['data']['ServicesIndex'] 70 services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( 71 self.discovered_services_index) 72 self.test_service_index = None 73 for i in range(services_count): 74 disc_service_uuid = ( 75 self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( 76 self.discovered_services_index, i).upper()) 77 if disc_service_uuid == self.TEST_SERVICE_UUID: 78 self.test_service_index = i 79 break 80 81 if not self.test_service_index: 82 print("Service not found") 83 return False 84 85 connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( 86 self.gatt_server) 87 if len(connected_device_list) == 0: 88 self.log.info("No devices connected from peripheral.") 89 return False 90 91 return True 92 93 def teardown_test(self): 94 self.per_ad.droid.gattServerClearServices(self.gatt_server) 95 self.per_ad.droid.gattServerClose(self.gatt_server) 96 97 del self.gatt_server_callback 98 del self.gatt_server 99 100 self._orchestrate_gatt_disconnection(self.bluetooth_gatt, 101 self.gatt_callback) 102 103 return super(GattConnectedBaseTest, self).teardown_test() 104 105 def _server_wait(self, gatt_event): 106 return self._timed_pop(gatt_event, self.per_ad, 107 self.gatt_server_callback) 108 109 def _client_wait(self, gatt_event): 110 return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback) 111 112 def _timed_pop(self, gatt_event, droid, gatt_callback): 113 expected_event = gatt_event["evt"].format(gatt_callback) 114 try: 115 return droid.ed.pop_event(expected_event, bt_default_timeout) 116 except Empty as emp: 117 raise AssertionError(gatt_event["err"].format(expected_event)) 118 119 def _setup_characteristics_and_descriptors(self, droid): 120 characteristic_input = [ 121 { 122 'uuid': self.WRITABLE_CHAR_UUID, 123 'property': gatt_characteristic['property_write'] | 124 gatt_characteristic['property_write_no_response'], 125 'permission': gatt_characteristic['permission_write'] 126 }, 127 { 128 'uuid': self.READABLE_CHAR_UUID, 129 'property': gatt_characteristic['property_read'], 130 'permission': gatt_characteristic['permission_read'] 131 }, 132 { 133 'uuid': self.NOTIFIABLE_CHAR_UUID, 134 'property': gatt_characteristic['property_notify'] | 135 gatt_characteristic['property_indicate'], 136 'permission': gatt_characteristic['permission_read'] 137 }, 138 ] 139 descriptor_input = [{ 140 'uuid': self.WRITABLE_DESC_UUID, 141 'property': gatt_descriptor['permission_read'] | 142 gatt_characteristic['permission_write'], 143 }, { 144 'uuid': self.READABLE_DESC_UUID, 145 'property': gatt_descriptor['permission_read'] | 146 gatt_descriptor['permission_write'], 147 }, { 148 'uuid': gatt_char_desc_uuids['client_char_cfg'], 149 'property': gatt_descriptor['permission_read'] | 150 gatt_descriptor['permission_write'], 151 }] 152 characteristic_list = setup_gatt_characteristics(droid, 153 characteristic_input) 154 self.notifiable_char_index = characteristic_list[2] 155 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 156 return characteristic_list, descriptor_list 157 158 def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): 159 self.log.info("Disconnecting from peripheral device.") 160 try: 161 disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, 162 gatt_callback) 163 except GattTestUtilsError as err: 164 log.error(err) 165 return False 166 self.cen_ad.droid.gattClientClose(bluetooth_gatt) 167 return True 168 169 def _find_service_added_event(self, gatt_server_callback, uuid): 170 expected_event = gatt_cb_strings['serv_added'].format( 171 gatt_server_callback) 172 try: 173 event = self.per_ad.ed.pop_event(expected_event, 174 bt_default_timeout) 175 except Empty: 176 self.log.error(gatt_cb_err['serv_added_err'].format( 177 expected_event)) 178 return False 179 if event['data']['serviceUuid'].lower() != uuid.lower(): 180 self.log.error("Uuid mismatch. Found: {}, Expected {}.".format( 181 event['data']['serviceUuid'], uuid)) 182 return False 183 return True 184 185 def _setup_multiple_services(self): 186 gatt_server_callback = ( 187 self.per_ad.droid.gattServerCreateGattServerCallback()) 188 gatt_server = self.per_ad.droid.gattServerOpenGattServer( 189 gatt_server_callback) 190 characteristic_list, descriptor_list = ( 191 self._setup_characteristics_and_descriptors(self.per_ad.droid)) 192 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 193 characteristic_list[0], descriptor_list[0]) 194 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 195 characteristic_list[1], descriptor_list[1]) 196 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 197 characteristic_list[2], descriptor_list[2]) 198 gatt_service3 = self.per_ad.droid.gattServerCreateService( 199 self.TEST_SERVICE_UUID, gatt_service_types['primary']) 200 for characteristic in characteristic_list: 201 self.per_ad.droid.gattServerAddCharacteristicToService( 202 gatt_service3, characteristic) 203 self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3) 204 result = self._find_service_added_event(gatt_server_callback, 205 self.TEST_SERVICE_UUID) 206 if not result: 207 return False, False 208 return gatt_server_callback, gatt_server 209 210 def assertEqual(self, first, second, msg=None): 211 if not first == second: 212 if not msg: 213 raise AssertionError('%r != %r' % (first, second)) 214 else: 215 raise AssertionError(msg + ' %r != %r' % (first, second)) 216