1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import time 18import os 19 20from acts.keys import Config 21from acts.utils import rand_ascii_str 22from acts.test_utils.bt.bt_constants import gatt_cb_strings 23from acts.test_utils.bt.bt_constants import gatt_characteristic 24from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format 25from acts.test_utils.bt.bt_constants import gatt_cb_err 26from acts.test_utils.bt.bt_constants import gatt_transport 27from acts.test_utils.bt.bt_constants import gatt_event 28from acts.test_utils.bt.bt_constants import gatt_server_responses 29from acts.test_utils.bt.bt_constants import gatt_service_types 30from acts.test_utils.bt.bt_constants import small_timeout 31from acts.test_utils.bt.gatt_test_database import STRING_512BYTES 32 33from acts.utils import exe_cmd 34from math import ceil 35 36 37class GattServerLib(): 38 39 characteristic_list = [] 40 default_timeout = 10 41 descriptor_list = [] 42 dut = None 43 gatt_server = None 44 gatt_server_callback = None 45 gatt_server_list = [] 46 log = None 47 service_list = [] 48 write_mapping = {} 49 50 def __init__(self, log, dut): 51 self.dut = dut 52 self.log = log 53 54 def list_all_uuids(self): 55 """From the GATT Client, discover services and list all services, 56 chars and descriptors. 57 """ 58 self.log.info("Service List:") 59 for service in self.dut.droid.gattGetServiceUuidList(self.gatt_server): 60 self.dut.log.info("GATT Server service uuid: {}".format(service)) 61 self.log.info("Characteristics List:") 62 for characteristic in self.characteristic_list: 63 instance_id = self.dut.droid.gattServerGetCharacteristicInstanceId( 64 characteristic) 65 uuid = self.dut.droid.gattServerGetCharacteristicUuid( 66 characteristic) 67 self.dut.log.info( 68 "GATT Server characteristic handle uuid: {} {}".format( 69 hex(instance_id), uuid)) 70 # TODO: add getting insance ids and uuids from each descriptor. 71 72 def open(self): 73 """Open an empty GATT Server instance""" 74 self.gatt_server_callback = self.dut.droid.gattServerCreateGattServerCallback( 75 ) 76 self.gatt_server = self.dut.droid.gattServerOpenGattServer( 77 self.gatt_server_callback) 78 self.gatt_server_list.append(self.gatt_server) 79 80 def clear_services(self): 81 """Clear BluetoothGattServices from BluetoothGattServer""" 82 self.dut.droid.gattServerClearServices(self.gatt_server) 83 84 def close_bluetooth_gatt_servers(self): 85 """Close Bluetooth Gatt Servers""" 86 try: 87 for btgs in self.gatt_server_list: 88 self.dut.droid.gattServerClose(btgs) 89 except Exception as err: 90 self.log.error( 91 "Failed to close Bluetooth GATT Servers: {}".format(err)) 92 self.characteristic_list = [] 93 self.descriptor_list = [] 94 self.gatt_server_list = [] 95 self.service_list = [] 96 97 def characteristic_set_value_by_instance_id(self, instance_id, value): 98 """Set Characteristic value by instance id""" 99 self.dut.droid.gattServerCharacteristicSetValueByInstanceId( 100 int(instance_id, 16), value) 101 102 def notify_characteristic_changed(self, instance_id, confirm): 103 """ Notify characteristic changed """ 104 self.dut.droid.gattServerNotifyCharacteristicChangedByInstanceId( 105 self.gatt_server, 0, int(instance_id, 16), confirm) 106 107 def send_response(self, user_input): 108 """Send a single response to the GATT Client""" 109 args = user_input.split() 110 mtu = 23 111 if len(args) == 2: 112 user_input = args[0] 113 mtu = int(args[1]) 114 desc_read = gatt_event['desc_read_req']['evt'].format( 115 self.gatt_server_callback) 116 desc_write = gatt_event['desc_write_req']['evt'].format( 117 self.gatt_server_callback) 118 char_read = gatt_event['char_read_req']['evt'].format( 119 self.gatt_server_callback) 120 char_write_req = gatt_event['char_write_req']['evt'].format( 121 self.gatt_server_callback) 122 char_write = gatt_event['char_write']['evt'].format( 123 self.gatt_server_callback) 124 execute_write = gatt_event['exec_write']['evt'].format( 125 self.gatt_server_callback) 126 regex = "({}|{}|{}|{}|{}|{})".format(desc_read, desc_write, char_read, 127 char_write, execute_write, 128 char_write_req) 129 events = self.dut.ed.pop_events(regex, 5, small_timeout) 130 status = 0 131 if user_input: 132 status = gatt_server_responses.get(user_input) 133 for event in events: 134 self.log.debug("Found event: {}.".format(event)) 135 request_id = event['data']['requestId'] 136 if event['name'] == execute_write: 137 if ('execute' in event['data'] 138 and event['data']['execute'] == True): 139 for key in self.write_mapping: 140 value = self.write_mapping[key] 141 self.log.info("Writing key, value: {}, {}".format( 142 key, value)) 143 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 144 key, value) 145 else: 146 self.log.info("Execute result is false") 147 self.write_mapping = {} 148 self.dut.droid.gattServerSendResponse( 149 self.gatt_server, 0, request_id, status, 0, []) 150 continue 151 offset = event['data']['offset'] 152 instance_id = event['data']['instanceId'] 153 if (event['name'] == desc_write or event['name'] == char_write 154 or event['name'] == char_write_req): 155 if ('preparedWrite' in event['data'] 156 and event['data']['preparedWrite'] == True): 157 value = event['data']['value'] 158 if instance_id in self.write_mapping.keys(): 159 self.write_mapping[ 160 instance_id] = self.write_mapping[instance_id] + value 161 self.log.info( 162 "New Prepared Write Value for {}: {}".format( 163 instance_id, self.write_mapping[instance_id])) 164 else: 165 self.log.info("write mapping key, value {}, {}".format( 166 instance_id, value)) 167 self.write_mapping[instance_id] = value 168 self.log.info("current value {}, {}".format( 169 instance_id, value)) 170 self.dut.droid.gattServerSendResponse( 171 self.gatt_server, 0, request_id, status, 0, value) 172 continue 173 else: 174 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 175 event['data']['instanceId'], event['data']['value']) 176 177 try: 178 data = self.dut.droid.gattServerGetReadValueByInstanceId( 179 int(event['data']['instanceId'])) 180 except Exception as err: 181 self.log.error(err) 182 if not data: 183 data = [1] 184 self.log.info( 185 "GATT Server Send Response [request_id, status, offset, data]" \ 186 " [{}, {}, {}, {}]". 187 format(request_id, status, offset, data)) 188 data = data[offset:offset + mtu - 1] 189 self.dut.droid.gattServerSendResponse( 190 self.gatt_server, 0, request_id, status, offset, data) 191 192 def _setup_service(self, serv): 193 service = self.dut.droid.gattServerCreateService( 194 serv['uuid'], serv['type']) 195 if 'handles' in serv: 196 self.dut.droid.gattServerServiceSetHandlesToReserve( 197 service, serv['handles']) 198 return service 199 200 def _setup_characteristic(self, char): 201 characteristic = \ 202 self.dut.droid.gattServerCreateBluetoothGattCharacteristic( 203 char['uuid'], char['properties'], char['permissions']) 204 if 'instance_id' in char: 205 self.dut.droid.gattServerCharacteristicSetInstanceId( 206 characteristic, char['instance_id']) 207 set_id = self.dut.droid.gattServerCharacteristicGetInstanceId( 208 characteristic) 209 if set_id != char['instance_id']: 210 self.log.error( 211 "Instance ID did not match up. Found {} Expected {}". 212 format(set_id, char['instance_id'])) 213 if 'value_type' in char: 214 value_type = char['value_type'] 215 value = char['value'] 216 if value_type == gatt_characteristic_value_format['string']: 217 self.log.info("Set String value result: {}".format( 218 self.dut.droid.gattServerCharacteristicSetStringValue( 219 characteristic, value))) 220 elif value_type == gatt_characteristic_value_format['byte']: 221 self.log.info("Set Byte Array value result: {}".format( 222 self.dut.droid.gattServerCharacteristicSetByteValue( 223 characteristic, value))) 224 else: 225 self.log.info("Set Int value result: {}".format( 226 self.dut.droid.gattServerCharacteristicSetIntValue( 227 characteristic, value, value_type, char['offset']))) 228 return characteristic 229 230 def _setup_descriptor(self, desc): 231 descriptor = self.dut.droid.gattServerCreateBluetoothGattDescriptor( 232 desc['uuid'], desc['permissions']) 233 if 'value' in desc: 234 self.dut.droid.gattServerDescriptorSetByteValue( 235 descriptor, desc['value']) 236 if 'instance_id' in desc: 237 self.dut.droid.gattServerDescriptorSetInstanceId( 238 descriptor, desc['instance_id']) 239 self.descriptor_list.append(descriptor) 240 return descriptor 241 242 def setup_gatts_db(self, database): 243 """Setup GATT Server database""" 244 self.gatt_server_callback = \ 245 self.dut.droid.gattServerCreateGattServerCallback() 246 self.gatt_server = self.dut.droid.gattServerOpenGattServer( 247 self.gatt_server_callback) 248 self.gatt_server_list.append(self.gatt_server) 249 for serv in database['services']: 250 service = self._setup_service(serv) 251 self.service_list.append(service) 252 if 'characteristics' in serv: 253 for char in serv['characteristics']: 254 characteristic = self._setup_characteristic(char) 255 if 'descriptors' in char: 256 for desc in char['descriptors']: 257 descriptor = self._setup_descriptor(desc) 258 self.dut.droid.gattServerCharacteristicAddDescriptor( 259 characteristic, descriptor) 260 self.characteristic_list.append(characteristic) 261 self.dut.droid.gattServerAddCharacteristicToService( 262 service, characteristic) 263 self.dut.droid.gattServerAddService(self.gatt_server, service) 264 expected_event = gatt_cb_strings['serv_added'].format( 265 self.gatt_server_callback) 266 self.dut.ed.pop_event(expected_event, 10) 267 return self.gatt_server, self.gatt_server_callback 268 269 def send_continuous_response(self, user_input): 270 """Send the same response""" 271 desc_read = gatt_event['desc_read_req']['evt'].format( 272 self.gatt_server_callback) 273 desc_write = gatt_event['desc_write_req']['evt'].format( 274 self.gatt_server_callback) 275 char_read = gatt_event['char_read_req']['evt'].format( 276 self.gatt_server_callback) 277 char_write = gatt_event['char_write']['evt'].format( 278 self.gatt_server_callback) 279 execute_write = gatt_event['exec_write']['evt'].format( 280 self.gatt_server_callback) 281 regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read, 282 char_write, execute_write) 283 offset = 0 284 status = 0 285 mtu = 23 286 char_value = [] 287 for i in range(512): 288 char_value.append(i % 256) 289 len_min = 470 290 end_time = time.time() + 180 291 i = 0 292 num_packets = ceil((len(char_value) + 1) / (mtu - 1)) 293 while time.time() < end_time: 294 events = self.dut.ed.pop_events(regex, 10, small_timeout) 295 for event in events: 296 start_offset = i * (mtu - 1) 297 i += 1 298 self.log.debug("Found event: {}.".format(event)) 299 request_id = event['data']['requestId'] 300 data = char_value[start_offset:start_offset + mtu - 1] 301 if not data: 302 data = [1] 303 self.log.debug( 304 "GATT Server Send Response [request_id, status, offset, " \ 305 "data] [{}, {}, {}, {}]".format(request_id, status, offset, 306 data)) 307 self.dut.droid.gattServerSendResponse( 308 self.gatt_server, 0, request_id, status, offset, data) 309 310 def send_continuous_response_data(self, user_input): 311 """Send the same response with data""" 312 desc_read = gatt_event['desc_read_req']['evt'].format( 313 self.gatt_server_callback) 314 desc_write = gatt_event['desc_write_req']['evt'].format( 315 self.gatt_server_callback) 316 char_read = gatt_event['char_read_req']['evt'].format( 317 self.gatt_server_callback) 318 char_write = gatt_event['char_write']['evt'].format( 319 self.gatt_server_callback) 320 execute_write = gatt_event['exec_write']['evt'].format( 321 self.gatt_server_callback) 322 regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read, 323 char_write, execute_write) 324 offset = 0 325 status = 0 326 mtu = 11 327 char_value = [] 328 len_min = 470 329 end_time = time.time() + 180 330 i = 0 331 num_packets = ceil((len(char_value) + 1) / (mtu - 1)) 332 while time.time() < end_time: 333 events = self.dut.ed.pop_events(regex, 10, small_timeout) 334 for event in events: 335 self.log.info(event) 336 request_id = event['data']['requestId'] 337 if event['name'] == execute_write: 338 if ('execute' in event['data'] 339 and event['data']['execute'] == True): 340 for key in self.write_mapping: 341 value = self.write_mapping[key] 342 self.log.debug("Writing key, value: {}, {}".format( 343 key, value)) 344 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 345 key, value) 346 self.write_mapping = {} 347 self.dut.droid.gattServerSendResponse( 348 self.gatt_server, 0, request_id, status, 0, [1]) 349 continue 350 offset = event['data']['offset'] 351 instance_id = event['data']['instanceId'] 352 if (event['name'] == desc_write 353 or event['name'] == char_write): 354 if ('preparedWrite' in event['data'] 355 and event['data']['preparedWrite'] == True): 356 value = event['data']['value'] 357 if instance_id in self.write_mapping: 358 self.write_mapping[ 359 instance_id] = self.write_mapping[instance_id] + value 360 else: 361 self.write_mapping[instance_id] = value 362 else: 363 self.dut.droid.gattServerSetByteArrayValueByInstanceId( 364 event['data']['instanceId'], 365 event['data']['value']) 366 try: 367 data = self.dut.droid.gattServerGetReadValueByInstanceId( 368 int(event['data']['instanceId'])) 369 except Exception as err: 370 self.log.error(err) 371 if not data: 372 self.dut.droid.gattServerSendResponse( 373 self.gatt_server, 0, request_id, status, offset, [1]) 374 else: 375 self.dut.droid.gattServerSendResponse( 376 self.gatt_server, 0, request_id, status, offset, 377 data[offset:offset + 17]) 378