1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Python script for wrappers to various libraries. 18 19Class CmdInput inherts from the cmd library. 20 21Functions that start with "do_" have a method 22signature that doesn't match the actual command 23line command and that is intended. This is so the 24"help" command knows what to display (in this case 25the documentation of the command itself). 26 27For example: 28Looking at the function "do_tool_set_target_device_name" 29has the inputs self and line which is expected of this type 30of method signature. When the "help" command is done on the 31method name you get the function documentation as such: 32 33(Cmd) help tool_set_target_device_name 34 35 Description: Reset the target device name. 36 Input(s): 37 device_name: Required. The advertising name to connect to. 38 Usage: tool_set_target_device_name new_target_device name 39 Examples: 40 tool_set_target_device_name le_watch 41 42This is all to say this documentation pattern is expected. 43 44""" 45 46from acts.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device 47from acts.test_utils.bt.bt_constants import bt_attribute_values 48from acts.test_utils.bt.bt_constants import sig_appearance_constants 49from acts.test_utils.bt.bt_constants import sig_uuid_constants 50from acts.test_utils.fuchsia.sdp_records import sdp_pts_record_list 51 52import acts.test_utils.bt.gatt_test_database as gatt_test_database 53 54import cmd 55import pprint 56import time 57"""Various Global Strings""" 58BASE_UUID = sig_uuid_constants['BASE_UUID'] 59CMD_LOG = "CMD {} result: {}" 60FAILURE = "CMD {} threw exception: {}" 61BASIC_ADV_NAME = "fs_test" 62 63 64class CommandInput(cmd.Cmd): 65 ble_adv_interval = 1000 66 ble_adv_appearance = None 67 ble_adv_data_include_tx_power_level = False 68 ble_adv_include_name = True 69 ble_adv_include_scan_response = False 70 ble_adv_name = "fs_test" 71 ble_adv_data_manufacturer_data = None 72 ble_adv_data_service_data = None 73 ble_adv_data_service_uuid_list = None 74 ble_adv_data_uris = None 75 76 bt_control_ids = [] 77 bt_control_names = [] 78 bt_control_devices = [] 79 bt_scan_poll_timer = 0.5 80 target_device_name = "" 81 le_ids = [] 82 unique_mac_addr_id = None 83 84 def setup_vars(self, dut, target_device_name, log): 85 self.pri_dut = dut 86 # Note: test_dut is the start of a slow conversion from a Fuchsia specific 87 # Tool to an abstract_device tool. Only commands that use test_dut will work 88 # Otherwise this tool is primarially targeted at Fuchsia devices. 89 self.test_dut = create_bluetooth_device(self.pri_dut) 90 self.test_dut.initialize_bluetooth_controller() 91 self.target_device_name = target_device_name 92 self.log = log 93 94 def emptyline(self): 95 pass 96 97 def do_EOF(self, line): 98 "End Script" 99 return True 100 101 """ Useful Helper functions and cmd line tooling """ 102 103 def str_to_bool(self, s): 104 if s.lower() == 'true': 105 return True 106 elif s.lower() == 'false': 107 return False 108 109 def _find_unique_id_over_le(self): 110 scan_filter = {"name_substring": self.target_device_name} 111 self.unique_mac_addr_id = None 112 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 113 tries = 10 114 for i in range(tries): 115 time.sleep(self.bt_scan_poll_timer) 116 scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices( 117 )['result'] 118 for device in scan_res: 119 name, did, connectable = device["name"], device["id"], device[ 120 "connectable"] 121 if (self.target_device_name in name): 122 self.unique_mac_addr_id = did 123 self.log.info( 124 "Successfully found device: name, id: {}, {}".format( 125 name, did)) 126 break 127 if self.unique_mac_addr_id: 128 break 129 self.pri_dut.gattc_lib.bleStopBleScan() 130 131 def _find_unique_id_over_bt_control(self): 132 self.unique_mac_addr_id = None 133 self.bt_control_devices = [] 134 self.pri_dut.btc_lib.requestDiscovery(True) 135 tries = 10 136 for i in range(tries): 137 if self.unique_mac_addr_id: 138 break 139 time.sleep(self.bt_scan_poll_timer) 140 device_list = self.pri_dut.btc_lib.getKnownRemoteDevices( 141 )['result'] 142 for id_dict in device_list: 143 device = device_list[id_dict] 144 self.bt_control_devices.append(device) 145 name = None 146 if device['name'] is not None: 147 name = device['name'] 148 did, address = device['id'], device['address'] 149 150 self.bt_control_ids.append(did) 151 if name is not None: 152 self.bt_control_names.append(name) 153 if self.target_device_name in name: 154 self.unique_mac_addr_id = did 155 self.log.info( 156 "Successfully found device: name, id, address: {}, {}, {}" 157 .format(name, did, address)) 158 break 159 self.pri_dut.btc_lib.requestDiscovery(False) 160 161 def do_tool_take_bt_snoop_log(self, custom_name): 162 """ 163 Description: Takes the bt snoop log from the Fuchsia device. 164 Logs will show up in your config files' logpath directory. 165 166 Input(s): 167 custom_name: Optional. Override the default pcap file name. 168 169 Usage: tool_set_target_device_name new_target_device name 170 Examples: 171 tool_take_bt_snoop_log connection_error 172 tool_take_bt_snoop_log 173 """ 174 self.pri_dut.take_bt_snoop_log(custom_name) 175 176 def do_tool_refresh_unique_id(self, line): 177 """ 178 Description: Refresh command line tool mac unique id. 179 Usage: 180 Examples: 181 tool_refresh_unique_id 182 """ 183 try: 184 self._find_unique_id_over_le() 185 except Exception as err: 186 self.log.error( 187 "Failed to scan or find scan result: {}".format(err)) 188 189 def do_tool_refresh_unique_id_using_bt_control(self, line): 190 """ 191 Description: Refresh command line tool mac unique id. 192 Usage: 193 Examples: 194 tool_refresh_unique_id_using_bt_control 195 """ 196 try: 197 self._find_unique_id_over_bt_control() 198 except Exception as err: 199 self.log.error( 200 "Failed to scan or find scan result: {}".format(err)) 201 202 def do_tool_set_target_device_name(self, line): 203 """ 204 Description: Reset the target device name. 205 Input(s): 206 device_name: Required. The advertising name to connect to. 207 Usage: tool_set_target_device_name new_target_device name 208 Examples: 209 tool_set_target_device_name le_watch 210 """ 211 self.log.info("Setting target_device_name to: {}".format(line)) 212 self.target_device_name = line 213 214 def do_tool_set_unique_mac_addr_id(self, line): 215 """ 216 Description: Sets the unique mac address id (Specific to Fuchsia) 217 Input(s): 218 device_id: Required. The id to set the unique mac address id to 219 Usage: tool_set_unique_mac_addr_id device_id 220 Examples: 221 tool_set_unique_mac_addr_id 7fb2cae53aad9e0d 222 """ 223 self.unique_mac_addr_id = line 224 225 """Begin BLE advertise wrappers""" 226 227 def complete_ble_adv_data_include_name(self, text, line, begidx, endidx): 228 roles = ["true", "false"] 229 if not text: 230 completions = roles 231 else: 232 completions = [s for s in roles if s.startswith(text)] 233 return completions 234 235 def do_ble_adv_data_include_name(self, line): 236 cmd = "Include name in the advertisement." 237 try: 238 self.ble_adv_include_name = self.str_to_bool(line) 239 except Exception as err: 240 self.log.error(FAILURE.format(cmd, err)) 241 242 def do_ble_adv_data_set_name(self, line): 243 cmd = "Set the name to be included in the advertisement." 244 try: 245 self.ble_adv_name = line 246 except Exception as err: 247 self.log.error(FAILURE.format(cmd, err)) 248 249 def complete_ble_adv_data_set_appearance(self, text, line, begidx, endidx): 250 if not text: 251 completions = list(sig_appearance_constants.keys()) 252 else: 253 completions = [ 254 s for s in sig_appearance_constants.keys() 255 if s.startswith(text) 256 ] 257 return completions 258 259 def do_ble_adv_data_set_appearance(self, line): 260 cmd = "Set the appearance to known SIG values." 261 try: 262 self.ble_adv_appearance = line 263 except Exception as err: 264 self.log.error(FAILURE.format(cmd, err)) 265 266 def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx, 267 endidx): 268 options = ['true', 'false'] 269 if not text: 270 completions = list(options)[:] 271 else: 272 completions = [s for s in options if s.startswith(text)] 273 return completions 274 275 def do_ble_adv_data_include_tx_power_level(self, line): 276 """Include the tx_power_level in the advertising data. 277 Description: Adds tx_power_level to the advertisement data to the BLE 278 advertisement. 279 Input(s): 280 value: Required. True or False 281 Usage: ble_adv_data_include_tx_power_level bool_value 282 Examples: 283 ble_adv_data_include_tx_power_level true 284 ble_adv_data_include_tx_power_level false 285 """ 286 cmd = "Include tx_power_level in advertisement." 287 try: 288 self.ble_adv_data_include_tx_power_level = self.str_to_bool(line) 289 except Exception as err: 290 self.log.info(FAILURE.format(cmd, err)) 291 292 def complete_ble_adv_include_scan_response(self, text, line, begidx, 293 endidx): 294 options = ['true', 'false'] 295 if not text: 296 completions = list(options)[:] 297 else: 298 completions = [s for s in options if s.startswith(text)] 299 return completions 300 301 def do_ble_adv_include_scan_response(self, line): 302 """Include scan response in advertisement. inputs: [true|false] 303 Note: Currently just sets the scan response data to the 304 Advertisement data. 305 """ 306 cmd = "Include tx_power_level in advertisement." 307 try: 308 self.ble_adv_include_scan_response = self.str_to_bool(line) 309 except Exception as err: 310 self.log.info(FAILURE.format(cmd, err)) 311 312 def do_ble_adv_data_add_manufacturer_data(self, line): 313 """Include manufacturer id and data to the advertisment 314 Description: Adds manufacturer data to the BLE advertisement. 315 Input(s): 316 id: Required. The int representing the manufacturer id. 317 data: Required. The string representing the data. 318 Usage: ble_adv_data_add_manufacturer_data id data 319 Examples: 320 ble_adv_data_add_manufacturer_data 1 test 321 """ 322 cmd = "Include manufacturer id and data to the advertisment." 323 try: 324 325 info = line.split() 326 if self.ble_adv_data_manufacturer_data is None: 327 self.ble_adv_data_manufacturer_data = [] 328 self.ble_adv_data_manufacturer_data.append({ 329 "id": int(info[0]), 330 "data": info[1] 331 }) 332 except Exception as err: 333 self.log.info(FAILURE.format(cmd, err)) 334 335 def do_ble_adv_data_add_service_data(self, line): 336 """Include service data to the advertisment 337 Description: Adds service data to the BLE advertisement. 338 Input(s): 339 uuid: Required. The string representing the uuid. 340 data: Required. The string representing the data. 341 Usage: ble_adv_data_add_service_data uuid data 342 Examples: 343 ble_adv_data_add_service_data 00001801-0000-1000-8000-00805f9b34fb test 344 """ 345 cmd = "Include manufacturer id and data to the advertisment." 346 try: 347 info = line.split() 348 if self.ble_adv_data_service_data is None: 349 self.ble_adv_data_service_data = [] 350 self.ble_adv_data_service_data.append({ 351 "uuid": info[0], 352 "data": info[1] 353 }) 354 except Exception as err: 355 self.log.info(FAILURE.format(cmd, err)) 356 357 def do_ble_adv_add_service_uuid_list(self, line): 358 """Include a list of service uuids to the advertisment: 359 Description: Adds service uuid list to the BLE advertisement. 360 Input(s): 361 uuid: Required. A list of N string UUIDs to add. 362 Usage: ble_adv_add_service_uuid_list uuid0 uuid1 ... uuidN 363 Examples: 364 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 365 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 00001802-0000-1000-8000-00805f9b34fb 366 """ 367 cmd = "Include service uuid list to the advertisment data." 368 try: 369 self.ble_adv_data_service_uuid_list = line 370 except Exception as err: 371 self.log.info(FAILURE.format(cmd, err)) 372 373 def do_ble_adv_data_set_uris(self, uris): 374 """Set the URIs of the LE advertisement data: 375 Description: Adds list of String UIRs 376 See (RFC 3986 1.1.2 https://tools.ietf.org/html/rfc3986) 377 Valid URI examples: 378 ftp://ftp.is.co.za/rfc/rfc1808.txt 379 http://www.ietf.org/rfc/rfc2396.txt 380 ldap://[2001:db8::7]/c=GB?objectClass?one 381 mailto:John.Doe@example.com 382 news:comp.infosystems.www.servers.unix 383 tel:+1-816-555-1212 384 telnet://192.0.2.16:80/ 385 urn:oasis:names:specification:docbook:dtd:xml:4.1.2 386 Input(s): 387 uris: Required. A list of URIs to add. 388 Usage: ble_adv_data_set_uris uri0 uri1 ... uriN 389 Examples: 390 ble_adv_data_set_uris telnet://192.0.2.16:80/ 391 ble_adv_data_set_uris tel:+1-816-555-1212 392 """ 393 cmd = "Set the appearance to known SIG values." 394 try: 395 self.ble_adv_data_uris = uris.split() 396 except Exception as err: 397 self.log.error(FAILURE.format(cmd, err)) 398 399 def start_advertisement(self, connectable): 400 """ Handle setting advertising data and the advertisement 401 Note: After advertisement is successful, clears values set for 402 * Manufacturer data 403 * Appearance information 404 * Scan Response 405 * Service UUIDs 406 * URI list 407 Args: 408 connectable: Bool of whether to start a connectable 409 advertisement or not. 410 """ 411 adv_data_name = self.ble_adv_name 412 if not self.ble_adv_include_name: 413 adv_data_name = None 414 415 manufacturer_data = self.ble_adv_data_manufacturer_data 416 417 tx_power_level = None 418 if self.ble_adv_data_include_tx_power_level: 419 tx_power_level = 1 # Not yet implemented so set to 1 420 421 scan_response = self.ble_adv_include_scan_response 422 423 adv_data = { 424 "name": adv_data_name, 425 "appearance": self.ble_adv_appearance, 426 "service_data": self.ble_adv_data_service_data, 427 "tx_power_level": tx_power_level, 428 "service_uuids": self.ble_adv_data_service_uuid_list, 429 "manufacturer_data": manufacturer_data, 430 "uris": self.ble_adv_data_uris, 431 } 432 433 if not self.ble_adv_include_scan_response: 434 scan_response = None 435 else: 436 scan_response = adv_data 437 438 result = self.pri_dut.ble_lib.bleStartBleAdvertising( 439 adv_data, scan_response, self.ble_adv_interval, connectable) 440 self.log.info("Result of starting advertisement: {}".format(result)) 441 self.ble_adv_data_manufacturer_data = None 442 self.ble_adv_appearance = None 443 self.ble_adv_include_scan_response = False 444 self.ble_adv_data_service_uuid_list = None 445 self.ble_adv_data_uris = None 446 self.ble_adv_data_service_data = None 447 448 def do_ble_start_generic_connectable_advertisement(self, line): 449 """ 450 Description: Start a connectable LE advertisement 451 452 Usage: ble_start_generic_connectable_advertisement 453 """ 454 cmd = "Start a connectable LE advertisement" 455 try: 456 connectable = True 457 self.start_advertisement(connectable) 458 except Exception as err: 459 self.log.error(FAILURE.format(cmd, err)) 460 461 def do_ble_start_generic_nonconnectable_advertisement(self, line): 462 """ 463 Description: Start a non-connectable LE advertisement 464 465 Usage: ble_start_generic_nonconnectable_advertisement 466 """ 467 cmd = "Start a nonconnectable LE advertisement" 468 try: 469 connectable = False 470 self.start_advertisement(connectable) 471 except Exception as err: 472 self.log.error(FAILURE.format(cmd, err)) 473 474 def do_ble_stop_advertisement(self, line): 475 """ 476 Description: Stop a BLE advertisement. 477 Usage: ble_stop_advertisement 478 """ 479 cmd = "Stop a connectable LE advertisement" 480 try: 481 self.pri_dut.ble_lib.bleStopBleAdvertising() 482 except Exception as err: 483 self.log.error(FAILURE.format(cmd, err)) 484 485 """End BLE advertise wrappers""" 486 """Begin GATT client wrappers""" 487 488 def complete_gattc_connect_by_id(self, text, line, begidx, endidx): 489 if not text: 490 completions = list(self.le_ids)[:] 491 else: 492 completions = [s for s in self.le_ids if s.startswith(text)] 493 return completions 494 495 def do_gattc_connect_by_id(self, line): 496 """ 497 Description: Connect to a LE peripheral. 498 Input(s): 499 device_id: Required. The unique device ID from Fuchsia 500 discovered devices. 501 Usage: 502 Examples: 503 gattc_connect device_id 504 """ 505 cmd = "Connect to a LE peripheral by input ID." 506 try: 507 508 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 509 line) 510 self.log.info("Connection status: {}".format( 511 pprint.pformat(connection_status))) 512 except Exception as err: 513 self.log.error(FAILURE.format(cmd, err)) 514 515 def do_gattc_connect(self, line): 516 """ 517 Description: Connect to a LE peripheral. 518 Optional input: device_name 519 Input(s): 520 device_name: Optional. The peripheral ID to connect to. 521 Usage: 522 Examples: 523 gattc_connect 524 gattc_connect eddystone_123 525 """ 526 cmd = "Connect to a LE peripheral." 527 try: 528 if len(line) > 0: 529 self.target_device_name = line 530 self.unique_mac_addr_id = None 531 if not self.unique_mac_addr_id: 532 try: 533 self._find_unique_id() 534 except Exception as err: 535 self.log.info("Failed to scan or find device.") 536 return 537 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 538 self.unique_mac_addr_id) 539 self.log.info("Connection status: {}".format( 540 pprint.pformat(connection_status))) 541 except Exception as err: 542 self.log.error(FAILURE.format(cmd, err)) 543 544 def do_gattc_connect_disconnect_iterations(self, line): 545 """ 546 Description: Connect then disconnect to a LE peripheral multiple times. 547 Input(s): 548 iterations: Required. The number of iterations to run. 549 Usage: 550 Examples: 551 gattc_connect_disconnect_iterations 10 552 """ 553 cmd = "Connect to a LE peripheral." 554 try: 555 if not self.unique_mac_addr_id: 556 try: 557 self._find_unique_id() 558 except Exception as err: 559 self.log.info("Failed to scan or find device.") 560 return 561 for i in range(int(line)): 562 self.log.info("Running iteration {}".format(i + 1)) 563 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 564 self.unique_mac_addr_id) 565 self.log.info("Connection status: {}".format( 566 pprint.pformat(connection_status))) 567 time.sleep(4) 568 disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 569 self.unique_mac_addr_id) 570 self.log.info("Disconnect status: {}".format(disc_status)) 571 time.sleep(3) 572 except Exception as err: 573 self.log.error(FAILURE.format(cmd, err)) 574 575 def do_gattc_disconnect(self, line): 576 """ 577 Description: Disconnect from LE peripheral. 578 Assumptions: Already connected to a peripheral. 579 Usage: 580 Examples: 581 gattc_disconnect 582 """ 583 cmd = "Disconenct from LE peripheral." 584 try: 585 disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 586 self.unique_mac_addr_id) 587 self.log.info("Disconnect status: {}".format(disconnect_status)) 588 except Exception as err: 589 self.log.error(FAILURE.format(cmd, err)) 590 591 def do_gattc_list_services(self, discover_chars): 592 """ 593 Description: List services from LE peripheral. 594 Assumptions: Already connected to a peripheral. 595 Input(s): 596 discover_chars: Optional. An optional input to discover all 597 characteristics on the service. 598 Usage: 599 Examples: 600 gattc_list_services 601 gattc_list_services true 602 """ 603 cmd = "List services from LE peripheral." 604 try: 605 606 services = self.pri_dut.gattc_lib.listServices( 607 self.unique_mac_addr_id) 608 self.log.info("Discovered Services: \n{}".format( 609 pprint.pformat(services))) 610 discover_characteristics = self.str_to_bool(discover_chars) 611 if discover_chars: 612 for service in services.get('result'): 613 self.pri_dut.gattc_lib.connectToService( 614 self.unique_mac_addr_id, service.get('id')) 615 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 616 self.log.info("Discovered chars:\n{}".format( 617 pprint.pformat(chars))) 618 619 except Exception as err: 620 self.log.error(FAILURE.format(cmd, err)) 621 622 def do_gattc_connect_to_service(self, line): 623 """ 624 Description: Connect to Peripheral GATT server service. 625 Assumptions: Already connected to peripheral. 626 Input(s): 627 service_id: Required. The service id reference on the GATT server. 628 Usage: 629 Examples: 630 gattc_connect_to_service service_id 631 """ 632 cmd = "GATT client connect to GATT server service." 633 try: 634 self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id, 635 int(line)) 636 except Exception as err: 637 self.log.error(FAILURE.format(cmd, err)) 638 639 def do_gattc_discover_characteristics(self, line): 640 """ 641 Description: Discover characteristics from a connected service. 642 Assumptions: Already connected to a GATT server service. 643 Usage: 644 Examples: 645 gattc_discover_characteristics 646 """ 647 cmd = "Discover and list characteristics from a GATT server." 648 try: 649 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 650 self.log.info("Discovered chars:\n{}".format( 651 pprint.pformat(chars))) 652 except Exception as err: 653 self.log.error(FAILURE.format(cmd, err)) 654 655 def do_gattc_notify_all_chars(self, line): 656 """ 657 Description: Enable all notifications on all Characteristics on 658 a GATT server. 659 Assumptions: Basic GATT connection made. 660 Usage: 661 Examples: 662 gattc_notify_all_chars 663 """ 664 cmd = "Read all characteristics from the GATT service." 665 try: 666 services = self.pri_dut.gattc_lib.listServices( 667 self.unique_mac_addr_id) 668 for service in services['result']: 669 service_id = service['id'] 670 service_uuid = service['uuid_type'] 671 self.pri_dut.gattc_lib.connectToService( 672 self.unique_mac_addr_id, service_id) 673 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 674 print("Reading chars in service uuid: {}".format(service_uuid)) 675 676 for char in chars['result']: 677 char_id = char['id'] 678 char_uuid = char['uuid_type'] 679 # quick char filter for apple-4 test... remove later 680 print("found uuid {}".format(char_uuid)) 681 try: 682 self.pri_dut.gattc_lib.enableNotifyCharacteristic( 683 char_id) 684 except Exception as err: 685 print("error enabling notification") 686 except Exception as err: 687 self.log.error(FAILURE.format(cmd, err)) 688 689 def do_gattc_read_all_chars(self, line): 690 """ 691 Description: Read all Characteristic values from a GATT server across 692 all services. 693 Assumptions: Basic GATT connection made. 694 Usage: 695 Examples: 696 gattc_read_all_chars 697 """ 698 cmd = "Read all characteristics from the GATT service." 699 try: 700 services = self.pri_dut.gattc_lib.listServices( 701 self.unique_mac_addr_id) 702 for service in services['result']: 703 service_id = service['id'] 704 service_uuid = service['uuid_type'] 705 self.pri_dut.gattc_lib.connectToService( 706 self.unique_mac_addr_id, service_id) 707 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 708 print("Reading chars in service uuid: {}".format(service_uuid)) 709 710 for char in chars['result']: 711 char_id = char['id'] 712 char_uuid = char['uuid_type'] 713 try: 714 read_val = \ 715 self.pri_dut.gattc_lib.readCharacteristicById( 716 char_id) 717 print(" Characteristic uuid / Value: {} / {}".format( 718 char_uuid, read_val['result'])) 719 str_value = "" 720 for val in read_val['result']: 721 str_value += chr(val) 722 print(" str val: {}".format(str_value)) 723 except Exception as err: 724 print(err) 725 pass 726 except Exception as err: 727 self.log.error(FAILURE.format(cmd, err)) 728 729 def do_gattc_read_all_desc(self, line): 730 """ 731 Description: Read all Descriptors values from a GATT server across 732 all services. 733 Assumptions: Basic GATT connection made. 734 Usage: 735 Examples: 736 gattc_read_all_chars 737 """ 738 cmd = "Read all descriptors from the GATT service." 739 try: 740 services = self.pri_dut.gattc_lib.listServices( 741 self.unique_mac_addr_id) 742 for service in services['result']: 743 service_id = service['id'] 744 service_uuid = service['uuid_type'] 745 self.pri_dut.gattc_lib.connectToService( 746 self.unique_mac_addr_id, service_id) 747 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 748 print("Reading descs in service uuid: {}".format(service_uuid)) 749 750 for char in chars['result']: 751 char_id = char['id'] 752 char_uuid = char['uuid_type'] 753 descriptors = char['descriptors'] 754 print(" Reading descs in char uuid: {}".format(char_uuid)) 755 for desc in descriptors: 756 desc_id = desc["id"] 757 desc_uuid = desc["uuid_type"] 758 try: 759 read_val = self.pri_dut.gattc_lib.readDescriptorById( 760 desc_id) 761 print(" Descriptor uuid / Value: {} / {}".format( 762 desc_uuid, read_val['result'])) 763 except Exception as err: 764 pass 765 except Exception as err: 766 self.log.error(FAILURE.format(cmd, err)) 767 768 def do_gattc_write_all_desc(self, line): 769 """ 770 Description: Write a value to all Descriptors on the GATT server. 771 Assumptions: Basic GATT connection made. 772 Input(s): 773 offset: Required. The offset to start writing to. 774 size: Required. The size of bytes to write (value will be generated). 775 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 776 Usage: 777 Examples: 778 gattc_write_all_desc 0 100 779 gattc_write_all_desc 10 2 780 """ 781 cmd = "Read all descriptors from the GATT service." 782 try: 783 args = line.split() 784 if len(args) != 2: 785 self.log.info("2 Arguments required: [Offset] [Size]") 786 return 787 offset = int(args[0]) 788 size = args[1] 789 write_value = [] 790 for i in range(int(size)): 791 write_value.append(i % 256) 792 services = self.pri_dut.gattc_lib.listServices( 793 self.unique_mac_addr_id) 794 for service in services['result']: 795 service_id = service['id'] 796 service_uuid = service['uuid_type'] 797 self.pri_dut.gattc_lib.connectToService( 798 self.unique_mac_addr_id, service_id) 799 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 800 print("Writing descs in service uuid: {}".format(service_uuid)) 801 802 for char in chars['result']: 803 char_id = char['id'] 804 char_uuid = char['uuid_type'] 805 descriptors = char['descriptors'] 806 print(" Reading descs in char uuid: {}".format(char_uuid)) 807 for desc in descriptors: 808 desc_id = desc["id"] 809 desc_uuid = desc["uuid_type"] 810 try: 811 write_val = self.pri_dut.gattc_lib.writeDescriptorById( 812 desc_id, offset, write_value) 813 print(" Descriptor uuid / Result: {} / {}".format( 814 desc_uuid, write_val['result'])) 815 except Exception as err: 816 pass 817 except Exception as err: 818 self.log.error(FAILURE.format(cmd, err)) 819 820 def do_gattc_read_all_long_desc(self, line): 821 """ 822 Description: Read all long Characteristic Descriptors 823 Assumptions: Basic GATT connection made. 824 Input(s): 825 offset: Required. The offset to start reading from. 826 max_bytes: Required. The max size of bytes to return. 827 Usage: 828 Examples: 829 gattc_read_all_long_desc 0 100 830 gattc_read_all_long_desc 10 20 831 """ 832 cmd = "Read all long descriptors from the GATT service." 833 try: 834 args = line.split() 835 if len(args) != 2: 836 self.log.info("2 Arguments required: [Offset] [Size]") 837 return 838 offset = int(args[0]) 839 max_bytes = int(args[1]) 840 services = self.pri_dut.ble_lib.bleListServices( 841 self.unique_mac_addr_id) 842 for service in services['result']: 843 service_id = service['id'] 844 service_uuid = service['uuid_type'] 845 self.pri_dut.gattc_lib.connectToService( 846 self.unique_mac_addr_id, service_id) 847 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 848 print("Reading descs in service uuid: {}".format(service_uuid)) 849 850 for char in chars['result']: 851 char_id = char['id'] 852 char_uuid = char['uuid_type'] 853 descriptors = char['descriptors'] 854 print(" Reading descs in char uuid: {}".format(char_uuid)) 855 for desc in descriptors: 856 desc_id = desc["id"] 857 desc_uuid = desc["uuid_type"] 858 try: 859 read_val = self.pri_dut.gattc_lib.readLongDescriptorById( 860 desc_id, offset, max_bytes) 861 print(" Descriptor uuid / Result: {} / {}".format( 862 desc_uuid, read_val['result'])) 863 except Exception as err: 864 pass 865 except Exception as err: 866 self.log.error(FAILURE.format(cmd, err)) 867 868 def do_gattc_read_all_long_char(self, line): 869 """ 870 Description: Read all long Characteristic 871 Assumptions: Basic GATT connection made. 872 Input(s): 873 offset: Required. The offset to start reading from. 874 max_bytes: Required. The max size of bytes to return. 875 Usage: 876 Examples: 877 gattc_read_all_long_char 0 100 878 gattc_read_all_long_char 10 20 879 """ 880 cmd = "Read all long Characteristics from the GATT service." 881 try: 882 args = line.split() 883 if len(args) != 2: 884 self.log.info("2 Arguments required: [Offset] [Size]") 885 return 886 offset = int(args[0]) 887 max_bytes = int(args[1]) 888 services = self.pri_dut.ble_lib.bleListServices( 889 self.unique_mac_addr_id) 890 for service in services['result']: 891 service_id = service['id'] 892 service_uuid = service['uuid_type'] 893 self.pri_dut.gattc_lib.connectToService( 894 self.unique_mac_addr_id, service_id) 895 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 896 print("Reading chars in service uuid: {}".format(service_uuid)) 897 898 for char in chars['result']: 899 char_id = char['id'] 900 char_uuid = char['uuid_type'] 901 try: 902 read_val = self.pri_dut.gattc_lib.readLongCharacteristicById( 903 char_id, offset, max_bytes) 904 print(" Char uuid / Result: {} / {}".format( 905 char_uuid, read_val['result'])) 906 except Exception as err: 907 pass 908 except Exception as err: 909 self.log.error(FAILURE.format(cmd, err)) 910 911 def do_gattc_write_all_chars(self, line): 912 """ 913 Description: Write all characteristic values from a GATT server across 914 all services. 915 Assumptions: Basic GATT connection made. 916 Input(s): 917 offset: Required. The offset to start writing on. 918 size: The write value size (value will be generated) 919 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 920 Usage: 921 Examples: 922 gattc_write_all_chars 0 10 923 gattc_write_all_chars 10 1 924 """ 925 cmd = "Read all characteristics from the GATT service." 926 try: 927 args = line.split() 928 if len(args) != 2: 929 self.log.info("2 Arguments required: [Offset] [Size]") 930 return 931 offset = int(args[0]) 932 size = int(args[1]) 933 write_value = [] 934 for i in range(size): 935 write_value.append(i % 256) 936 services = self.pri_dut.gattc_lib.listServices( 937 self.unique_mac_addr_id) 938 for service in services['result']: 939 service_id = service['id'] 940 service_uuid = service['uuid_type'] 941 self.pri_dut.gattc_lib.connectToService( 942 self.unique_mac_addr_id, service_id) 943 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 944 print("Writing chars in service uuid: {}".format(service_uuid)) 945 946 for char in chars['result']: 947 char_id = char['id'] 948 char_uuid = char['uuid_type'] 949 try: 950 write_result = self.pri_dut.gattc_lib.writeCharById( 951 char_id, offset, write_value) 952 print(" Characteristic uuid write result: {} / {}". 953 format(char_uuid, write_result['result'])) 954 except Exception as err: 955 print("error writing char {}".format(err)) 956 pass 957 except Exception as err: 958 self.log.error(FAILURE.format(cmd, err)) 959 960 def do_gattc_write_all_chars_without_response(self, line): 961 """ 962 Description: Write all characteristic values from a GATT server across 963 all services. 964 Assumptions: Basic GATT connection made. 965 Input(s): 966 size: The write value size (value will be generated). 967 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 968 Usage: 969 Examples: 970 gattc_write_all_chars_without_response 100 971 """ 972 cmd = "Read all characteristics from the GATT service." 973 try: 974 args = line.split() 975 if len(args) != 1: 976 self.log.info("1 Arguments required: [Size]") 977 return 978 size = int(args[0]) 979 write_value = [] 980 for i in range(size): 981 write_value.append(i % 256) 982 services = self.pri_dut.gattc_lib.listServices( 983 self.unique_mac_addr_id) 984 for service in services['result']: 985 service_id = service['id'] 986 service_uuid = service['uuid_type'] 987 self.pri_dut.gattc_lib.connectToService( 988 self.unique_mac_addr_id, service_id) 989 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 990 print("Reading chars in service uuid: {}".format(service_uuid)) 991 992 for char in chars['result']: 993 char_id = char['id'] 994 char_uuid = char['uuid_type'] 995 try: 996 write_result = \ 997 self.pri_dut.gattc_lib.writeCharByIdWithoutResponse( 998 char_id, write_value) 999 print(" Characteristic uuid write result: {} / {}". 1000 format(char_uuid, write_result['result'])) 1001 except Exception as err: 1002 pass 1003 except Exception as err: 1004 self.log.error(FAILURE.format(cmd, err)) 1005 1006 def do_gattc_write_char_by_id(self, line): 1007 """ 1008 Description: Write char by characteristic id reference. 1009 Assumptions: Already connected to a GATT server service. 1010 Input(s): 1011 characteristic_id: The characteristic id reference on the GATT 1012 service 1013 offset: The offset value to use 1014 size: Function will generate random bytes by input size. 1015 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1016 Usage: 1017 Examples: 1018 gattc_write_char_by_id char_id 0 5 1019 gattc_write_char_by_id char_id 20 1 1020 """ 1021 cmd = "Write to GATT server characteristic ." 1022 try: 1023 args = line.split() 1024 if len(args) != 3: 1025 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1026 return 1027 id = int(args[0], 16) 1028 offset = int(args[1]) 1029 size = int(args[2]) 1030 write_value = [] 1031 for i in range(size): 1032 write_value.append(i % 256) 1033 self.test_dut.gatt_client_write_characteristic_by_handle( 1034 self.unique_mac_addr_id, id, offset, write_value) 1035 except Exception as err: 1036 self.log.error(FAILURE.format(cmd, err)) 1037 1038 def do_gattc_write_long_char_by_id(self, line): 1039 """ 1040 Description: Write long char by characteristic id reference. 1041 Assumptions: Already connected to a GATT server service. 1042 Input(s): 1043 characteristic_id: The characteristic id reference on the GATT 1044 service 1045 offset: The offset value to use 1046 size: Function will generate random bytes by input size. 1047 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1048 reliable_mode: Optional: Reliable writes represented as bool 1049 Usage: 1050 Examples: 1051 gattc_write_long_char_by_id char_id 0 5 1052 gattc_write_long_char_by_id char_id 20 1 1053 gattc_write_long_char_by_id char_id 20 1 true 1054 gattc_write_long_char_by_id char_id 20 1 false 1055 """ 1056 cmd = "Long Write to GATT server characteristic ." 1057 try: 1058 args = line.split() 1059 if len(args) < 3: 1060 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1061 return 1062 id = int(args[0], 16) 1063 offset = int(args[1]) 1064 size = int(args[2]) 1065 reliable_mode = False 1066 if len(args) > 3: 1067 reliable_mode = self.str_to_bool(args[3]) 1068 write_value = [] 1069 for i in range(size): 1070 write_value.append(i % 256) 1071 self.test_dut.gatt_client_write_long_characteristic_by_handle( 1072 self.unique_mac_addr_id, id, offset, write_value, 1073 reliable_mode) 1074 except Exception as err: 1075 self.log.error(FAILURE.format(cmd, err)) 1076 1077 def do_gattc_write_long_desc_by_id(self, line): 1078 """ 1079 Description: Write long char by descrioptor id reference. 1080 Assumptions: Already connected to a GATT server service. 1081 Input(s): 1082 characteristic_id: The characteristic id reference on the GATT 1083 service 1084 offset: The offset value to use 1085 size: Function will generate random bytes by input size. 1086 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1087 Usage: 1088 Examples: 1089 gattc_write_long_desc_by_id char_id 0 5 1090 gattc_write_long_desc_by_id char_id 20 1 1091 """ 1092 cmd = "Long Write to GATT server descriptor ." 1093 try: 1094 args = line.split() 1095 if len(args) != 3: 1096 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1097 return 1098 id = int(args[0], 16) 1099 offset = int(args[1]) 1100 size = int(args[2]) 1101 write_value = [] 1102 for i in range(size): 1103 write_value.append(i % 256) 1104 self.test_dut.gatt_client_write_long_descriptor_by_handle( 1105 self.unique_mac_addr_id, id, offset, write_value) 1106 except Exception as err: 1107 self.log.error(FAILURE.format(cmd, err)) 1108 1109 def do_gattc_write_char_by_id_without_response(self, line): 1110 """ 1111 Description: Write char by characteristic id reference without response. 1112 Assumptions: Already connected to a GATT server service. 1113 Input(s): 1114 characteristic_id: The characteristic id reference on the GATT 1115 service 1116 size: Function will generate random bytes by input size. 1117 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1118 Usage: 1119 Examples: 1120 gattc_write_char_by_id_without_response char_id 5 1121 """ 1122 cmd = "Write characteristic by id without response." 1123 try: 1124 args = line.split() 1125 if len(args) != 2: 1126 self.log.info("2 Arguments required: [Id] [Size]") 1127 return 1128 id = int(args[0], 16) 1129 size = args[1] 1130 write_value = [] 1131 for i in range(int(size)): 1132 write_value.append(i % 256) 1133 self.test_dut.gatt_client_write_characteristic_without_response_by_handle( 1134 self.unique_mac_addr_id, id, write_value) 1135 except Exception as err: 1136 self.log.error(FAILURE.format(cmd, err)) 1137 1138 def do_gattc_enable_notify_char_by_id(self, line): 1139 """ 1140 Description: Enable Characteristic notification on Characteristic ID. 1141 Assumptions: Already connected to a GATT server service. 1142 Input(s): 1143 characteristic_id: The characteristic id reference on the GATT 1144 service 1145 Usage: 1146 Examples: 1147 gattc_enable_notify_char_by_id char_id 1148 """ 1149 cmd = "Enable notifications by Characteristic id." 1150 try: 1151 id = int(line, 16) 1152 self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle( 1153 self.unique_mac_addr_id, id) 1154 except Exception as err: 1155 self.log.error(FAILURE.format(cmd, err)) 1156 1157 def do_gattc_disable_notify_char_by_id(self, line): 1158 """ 1159 Description: Disable Characteristic notification on Characteristic ID. 1160 Assumptions: Already connected to a GATT server service. 1161 Input(s): 1162 characteristic_id: The characteristic id reference on the GATT 1163 service 1164 Usage: 1165 Examples: 1166 gattc_disable_notify_char_by_id char_id 1167 """ 1168 cmd = "Disable notify Characteristic by id." 1169 try: 1170 id = int(line, 16) 1171 self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle( 1172 self.unique_mac_addr_id, id) 1173 except Exception as err: 1174 self.log.error(FAILURE.format(cmd, err)) 1175 1176 def do_gattc_read_char_by_id(self, line): 1177 """ 1178 Description: Read Characteristic by ID. 1179 Assumptions: Already connected to a GATT server service. 1180 Input(s): 1181 characteristic_id: The characteristic id reference on the GATT 1182 service 1183 Usage: 1184 Examples: 1185 gattc_read_char_by_id char_id 1186 """ 1187 cmd = "Read Characteristic value by ID." 1188 try: 1189 id = int(line, 16) 1190 read_val = self.test_dut.gatt_client_read_characteristic_by_handle( 1191 self.unique_mac_addr_id, id) 1192 self.log.info("Characteristic Value with id {}: {}".format( 1193 id, read_val)) 1194 except Exception as err: 1195 self.log.error(FAILURE.format(cmd, err)) 1196 1197 def do_gattc_read_char_by_uuid(self, characteristic_uuid): 1198 """ 1199 Description: Read Characteristic by UUID (read by type). 1200 Assumptions: Already connected to a GATT server service. 1201 Input(s): 1202 characteristic_uuid: The characteristic id reference on the GATT 1203 service 1204 Usage: 1205 Examples: 1206 gattc_read_char_by_id char_id 1207 """ 1208 cmd = "Read Characteristic value by ID." 1209 try: 1210 short_uuid_len = 4 1211 if len(characteristic_uuid) == short_uuid_len: 1212 characteristic_uuid = BASE_UUID.format(characteristic_uuid) 1213 1214 read_val = self.test_dut.gatt_client_read_characteristic_by_uuid( 1215 self.unique_mac_addr_id, characteristic_uuid) 1216 self.log.info("Characteristic Value with id {}: {}".format( 1217 id, read_val)) 1218 except Exception as err: 1219 self.log.error(FAILURE.format(cmd, err)) 1220 1221 def do_gattc_write_desc_by_id(self, line): 1222 """ 1223 Description: Write Descriptor by characteristic id reference. 1224 Assumptions: Already connected to a GATT server service. 1225 Input(s): 1226 descriptor_id: The Descriptor id reference on the GATT service 1227 offset: The offset value to use 1228 size: Function will generate random bytes by input size. 1229 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1230 Usage: 1231 Examples: 1232 gattc_write_desc_by_id desc_id 0 5 1233 gattc_write_desc_by_id desc_id 20 1 1234 """ 1235 cmd = "Write Descriptor by id." 1236 try: 1237 args = line.split() 1238 id = int(args[0], 16) 1239 offset = int(args[1]) 1240 size = args[2] 1241 write_value = [] 1242 for i in range(int(size)): 1243 write_value.append(i % 256) 1244 write_result = self.test_dut.gatt_client_write_descriptor_by_handle( 1245 self.unique_mac_addr_id, id, offset, write_value) 1246 self.log.info("Descriptor Write result {}: {}".format( 1247 id, write_result)) 1248 except Exception as err: 1249 self.log.error(FAILURE.format(cmd, err)) 1250 1251 def do_gattc_read_desc_by_id(self, line): 1252 """ 1253 Description: Read Descriptor by ID. 1254 Assumptions: Already connected to a GATT server service. 1255 Input(s): 1256 descriptor_id: The Descriptor id reference on the GATT service 1257 Usage: 1258 Examples: 1259 gattc_read_desc_by_id desc_id 1260 """ 1261 cmd = "Read Descriptor by ID." 1262 try: 1263 id = int(line, 16) 1264 read_val = self.test_dut.gatt_client_read_descriptor_by_handle( 1265 self.unique_mac_addr_id, id) 1266 self.log.info("Descriptor Value with id {}: {}".format( 1267 id, read_val)) 1268 except Exception as err: 1269 self.log.error(FAILURE.format(cmd, err)) 1270 1271 def do_gattc_read_long_char_by_id(self, line): 1272 """ 1273 Description: Read long Characteristic value by id. 1274 Assumptions: Already connected to a GATT server service. 1275 Input(s): 1276 characteristic_id: The characteristic id reference on the GATT 1277 service 1278 offset: The offset value to use. 1279 max_bytes: The max bytes size to return. 1280 Usage: 1281 Examples: 1282 gattc_read_long_char_by_id char_id 0 10 1283 gattc_read_long_char_by_id char_id 20 1 1284 """ 1285 cmd = "Read long Characteristic value by id." 1286 try: 1287 args = line.split() 1288 if len(args) != 3: 1289 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1290 return 1291 id = int(args[0], 16) 1292 offset = int(args[1]) 1293 max_bytes = int(args[2]) 1294 read_val = self.test_dut.gatt_client_read_long_characteristic_by_handle( 1295 self.unique_mac_addr_id, id, offset, max_bytes) 1296 self.log.info("Characteristic Value with id {}: {}".format( 1297 id, read_val['result'])) 1298 1299 except Exception as err: 1300 self.log.error(FAILURE.format(cmd, err)) 1301 1302 """End GATT client wrappers""" 1303 """Begin LE scan wrappers""" 1304 1305 def _update_scan_results(self, scan_results): 1306 self.le_ids = [] 1307 for scan in scan_results['result']: 1308 self.le_ids.append(scan['id']) 1309 1310 def do_ble_start_scan(self, line): 1311 """ 1312 Description: Perform a BLE scan. 1313 Default filter name: "" 1314 Optional input: filter_device_name 1315 Usage: 1316 Examples: 1317 ble_start_scan 1318 ble_start_scan eddystone 1319 """ 1320 cmd = "Perform a BLE scan and list discovered devices." 1321 try: 1322 scan_filter = {"name_substring": ""} 1323 if line: 1324 scan_filter = {"name_substring": line} 1325 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 1326 except Exception as err: 1327 self.log.error(FAILURE.format(cmd, err)) 1328 1329 def do_ble_stop_scan(self, line): 1330 """ 1331 Description: Stops a BLE scan and returns discovered devices. 1332 Usage: 1333 Examples: 1334 ble_stop_scan 1335 """ 1336 cmd = "Stops a BLE scan and returns discovered devices." 1337 try: 1338 scan_results = self.pri_dut.gattc_lib.bleStopBleScan() 1339 self._update_scan_results(scan_results) 1340 self.log.info(pprint.pformat(scan_results)) 1341 except Exception as err: 1342 self.log.error(FAILURE.format(cmd, err)) 1343 1344 def do_ble_get_discovered_devices(self, line): 1345 """ 1346 Description: Get discovered LE devices of an active scan. 1347 Usage: 1348 Examples: 1349 ble_stop_scan 1350 """ 1351 cmd = "Get discovered LE devices of an active scan." 1352 try: 1353 scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices() 1354 self._update_scan_results(scan_results) 1355 self.log.info(pprint.pformat(scan_results)) 1356 except Exception as err: 1357 self.log.error(FAILURE.format(cmd, err)) 1358 1359 """End LE scan wrappers""" 1360 """Begin GATT Server wrappers""" 1361 1362 def do_gatts_close(self, line): 1363 """ 1364 Description: Close active GATT server. 1365 1366 Usage: 1367 Examples: 1368 gatts_close 1369 """ 1370 cmd = "Close active GATT server." 1371 try: 1372 result = self.pri_dut.gatts_lib.closeServer() 1373 self.log.info(result) 1374 except Exception as err: 1375 self.log.error(FAILURE.format(cmd, err)) 1376 1377 def complete_gatts_setup_database(self, text, line, begidx, endidx): 1378 if not text: 1379 completions = list( 1380 gatt_test_database.GATT_SERVER_DB_MAPPING.keys()) 1381 else: 1382 completions = [ 1383 s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys() 1384 if s.startswith(text) 1385 ] 1386 return completions 1387 1388 def do_gatts_setup_database(self, line): 1389 """ 1390 Description: Setup a Gatt server database based on pre-defined inputs. 1391 Supports Tab Autocomplete. 1392 Input(s): 1393 descriptor_db_name: The descriptor db name that matches one in 1394 acts.test_utils.bt.gatt_test_database 1395 Usage: 1396 Examples: 1397 gatts_setup_database LARGE_DB_1 1398 """ 1399 cmd = "Setup GATT Server Database Based of pre-defined dictionaries" 1400 try: 1401 scan_results = self.pri_dut.gatts_lib.publishServer( 1402 gatt_test_database.GATT_SERVER_DB_MAPPING.get(line)) 1403 self.log.info(scan_results) 1404 except Exception as err: 1405 self.log.error(FAILURE.format(cmd, err)) 1406 1407 """End GATT Server wrappers""" 1408 """Begin Bluetooth Controller wrappers""" 1409 1410 def complete_btc_pair(self, text, line, begidx, endidx): 1411 """ Provides auto-complete for btc_pair cmd. 1412 1413 See Cmd module for full description. 1414 """ 1415 arg_completion = len(line.split(" ")) - 1 1416 pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE'] 1417 non_bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE'] 1418 transport_options = ['BREDR', 'LE'] 1419 if arg_completion == 1: 1420 if not text: 1421 completions = pairing_security_level_options 1422 else: 1423 completions = [ 1424 s for s in pairing_security_level_options 1425 if s.startswith(text) 1426 ] 1427 return completions 1428 if arg_completion == 2: 1429 if not text: 1430 completions = non_bondable_options 1431 else: 1432 completions = [ 1433 s for s in non_bondable_options if s.startswith(text) 1434 ] 1435 return completions 1436 if arg_completion == 3: 1437 if not text: 1438 completions = transport_options 1439 else: 1440 completions = [ 1441 s for s in transport_options if s.startswith(text) 1442 ] 1443 return completions 1444 1445 def do_btc_pair(self, line): 1446 """ 1447 Description: Sends an outgoing pairing request. 1448 1449 Input(s): 1450 pairing security level: ENCRYPTED, AUTHENTICATED, or NONE 1451 non_bondable: BONDABLE, NON_BONDABLE, or NONE 1452 transport: BREDR or LE 1453 1454 Usage: 1455 Examples: 1456 btc_pair NONE NONE BREDR 1457 btc_pair ENCRYPTED NONE LE 1458 btc_pair AUTHENTICATED NONE LE 1459 btc_pair NONE NON_BONDABLE BREDR 1460 """ 1461 cmd = "Send an outgoing pairing request." 1462 pairing_security_level_mapping = { 1463 "ENCRYPTED": 1, 1464 "AUTHENTICATED": 2, 1465 "NONE": None, 1466 } 1467 1468 non_bondable_mapping = { 1469 "BONDABLE": False, # Note: Reversed on purpose 1470 "NON_BONDABLE": True, # Note: Reversed on purpose 1471 "NONE": None, 1472 } 1473 1474 transport_mapping = { 1475 "BREDR": 1, 1476 "LE": 2, 1477 } 1478 1479 try: 1480 options = line.split(" ") 1481 result = self.test_dut.init_pair( 1482 self.unique_mac_addr_id, 1483 pairing_security_level_mapping.get(options[0]), 1484 non_bondable_mapping.get(options[1]), 1485 transport_mapping.get(options[2]), 1486 ) 1487 self.log.info(result) 1488 except Exception as err: 1489 self.log.error(FAILURE.format(cmd, err)) 1490 1491 def complete_btc_set_io_capabilities(self, text, line, begidx, endidx): 1492 """ Provides auto-complete for btc_set_io_capabilities cmd. 1493 1494 See Cmd module for full description. 1495 """ 1496 arg_completion = len(line.split(" ")) - 1 1497 input_options = ['NONE', 'CONFIRMATION', 'KEYBOARD'] 1498 output_options = ['NONE', 'DISPLAY'] 1499 if arg_completion == 1: 1500 if not text: 1501 completions = input_options 1502 else: 1503 completions = [s for s in input_options if s.startswith(text)] 1504 return completions 1505 if arg_completion == 2: 1506 if not text: 1507 completions = output_options 1508 else: 1509 completions = [s for s in output_options if s.startswith(text)] 1510 return completions 1511 1512 def do_btc_set_io_capabilities(self, line): 1513 """ 1514 Description: Sets the IO capabilities during pairing 1515 1516 Input(s): 1517 input: String - The input I/O capabilities to use 1518 Available Values: 1519 NONE - Input capability type None 1520 CONFIRMATION - Input capability type confirmation 1521 KEYBOARD - Input capability type Keyboard 1522 output: String - The output I/O Capabilities to use 1523 Available Values: 1524 NONE - Output capability type None 1525 DISPLAY - output capability type Display 1526 1527 Usage: 1528 Examples: 1529 btc_set_io_capabilities NONE DISPLAY 1530 btc_set_io_capabilities NONE NONE 1531 btc_set_io_capabilities KEYBOARD DISPLAY 1532 """ 1533 cmd = "Send an outgoing pairing request." 1534 1535 try: 1536 options = line.split(" ") 1537 result = self.pri_dut.btc_lib.setIOCapabilities( 1538 options[0], options[1]) 1539 self.log.info(result) 1540 except Exception as err: 1541 self.log.error(FAILURE.format(cmd, err)) 1542 1543 def do_btc_accept_pairing(self, line): 1544 """ 1545 Description: Accept all incoming pairing requests. 1546 1547 Usage: 1548 Examples: 1549 btc_accept_pairing 1550 """ 1551 cmd = "Accept incoming pairing requests" 1552 try: 1553 result = self.pri_dut.btc_lib.acceptPairing() 1554 self.log.info(result) 1555 except Exception as err: 1556 self.log.error(FAILURE.format(cmd, err)) 1557 1558 def do_btc_forget_device(self, line): 1559 """ 1560 Description: Forget pairing of the current device under test. 1561 Current device under test is the device found by 1562 tool_refresh_unique_id from custom user param. This function 1563 will also perform a clean disconnect if actively connected. 1564 1565 Usage: 1566 Examples: 1567 btc_forget_device 1568 """ 1569 cmd = "For pairing of the current device under test." 1570 try: 1571 self.log.info("Forgetting device id: {}".format( 1572 self.unique_mac_addr_id)) 1573 result = self.pri_dut.btc_lib.forgetDevice(self.unique_mac_addr_id) 1574 self.log.info(result) 1575 except Exception as err: 1576 self.log.error(FAILURE.format(cmd, err)) 1577 1578 def do_btc_set_discoverable(self, discoverable): 1579 """ 1580 Description: Change Bluetooth Controller discoverablility. 1581 Input(s): 1582 discoverable: true to set discoverable 1583 false to set non-discoverable 1584 Usage: 1585 Examples: 1586 btc_set_discoverable true 1587 btc_set_discoverable false 1588 """ 1589 cmd = "Change Bluetooth Controller discoverablility." 1590 try: 1591 result = self.test_dut.set_discoverable( 1592 self.str_to_bool(discoverable)) 1593 self.log.info(result) 1594 except Exception as err: 1595 self.log.error(FAILURE.format(cmd, err)) 1596 1597 def do_btc_set_name(self, name): 1598 """ 1599 Description: Change Bluetooth Controller local name. 1600 Input(s): 1601 name: The name to set the Bluetooth Controller name to. 1602 1603 Usage: 1604 Examples: 1605 btc_set_name fs_test 1606 """ 1607 cmd = "Change Bluetooth Controller local name." 1608 try: 1609 result = self.test_dut.set_bluetooth_local_name(name) 1610 self.log.info(result) 1611 except Exception as err: 1612 self.log.error(FAILURE.format(cmd, err)) 1613 1614 def do_btc_request_discovery(self, discover): 1615 """ 1616 Description: Change whether the Bluetooth Controller is in active. 1617 discovery or not. 1618 Input(s): 1619 discover: true to start discovery 1620 false to end discovery 1621 Usage: 1622 Examples: 1623 btc_request_discovery true 1624 btc_request_discovery false 1625 """ 1626 cmd = "Change whether the Bluetooth Controller is in active." 1627 try: 1628 result = self.pri_dut.btc_lib.requestDiscovery( 1629 self.str_to_bool(discover)) 1630 self.log.info(result) 1631 except Exception as err: 1632 self.log.error(FAILURE.format(cmd, err)) 1633 1634 def do_btc_get_known_remote_devices(self, line): 1635 """ 1636 Description: Get a list of known devices. 1637 1638 Usage: 1639 Examples: 1640 btc_get_known_remote_devices 1641 """ 1642 cmd = "Get a list of known devices." 1643 self.bt_control_devices = [] 1644 try: 1645 device_list = self.pri_dut.btc_lib.getKnownRemoteDevices( 1646 )['result'] 1647 for id_dict in device_list: 1648 device = device_list[id_dict] 1649 self.bt_control_devices.append(device) 1650 self.log.info("Device found {}".format(device)) 1651 1652 except Exception as err: 1653 self.log.error(FAILURE.format(cmd, err)) 1654 1655 def do_btc_forget_all_known_devices(self, line): 1656 """ 1657 Description: Forget all known devices. 1658 1659 Usage: 1660 Examples: 1661 btc_forget_all_known_devices 1662 """ 1663 cmd = "Forget all known devices." 1664 try: 1665 device_list = self.pri_dut.btc_lib.getKnownRemoteDevices( 1666 )['result'] 1667 for device in device_list: 1668 d = device_list[device] 1669 if d['bonded'] or d['connected']: 1670 self.log.info("Unbonding deivce: {}".format(d)) 1671 self.log.info( 1672 self.pri_dut.btc_lib.forgetDevice(d['id'])['result']) 1673 except Exception as err: 1674 self.log.error(FAILURE.format(cmd, err)) 1675 1676 def do_btc_connect_device(self, line): 1677 """ 1678 Description: Connect to device under test. 1679 Device under test is specified by either user params 1680 or 1681 tool_set_target_device_name <name> 1682 do_tool_refresh_unique_id_using_bt_control 1683 1684 Usage: 1685 Examples: 1686 btc_connect_device 1687 """ 1688 cmd = "Connect to device under test." 1689 try: 1690 result = self.pri_dut.btc_lib.connectDevice( 1691 self.unique_mac_addr_id) 1692 self.log.info(result) 1693 except Exception as err: 1694 self.log.error(FAILURE.format(cmd, err)) 1695 1696 def complete_btc_connect_device_by_id(self, text, line, begidx, endidx): 1697 if not text: 1698 completions = list(self.bt_control_ids)[:] 1699 else: 1700 completions = [ 1701 s for s in self.bt_control_ids if s.startswith(text) 1702 ] 1703 return completions 1704 1705 def do_btc_connect_device_by_id(self, device_id): 1706 """ 1707 Description: Connect to device id based on pre-defined inputs. 1708 Supports Tab Autocomplete. 1709 Input(s): 1710 device_id: The device id to connect to. 1711 1712 Usage: 1713 Examples: 1714 btc_connect_device_by_id <device_id> 1715 """ 1716 cmd = "Connect to device id based on pre-defined inputs." 1717 try: 1718 result = self.pri_dut.btc_lib.connectDevice(device_id) 1719 self.log.info(result) 1720 except Exception as err: 1721 self.log.error(FAILURE.format(cmd, err)) 1722 1723 def complete_btc_connect_device_by_name(self, text, line, begidx, endidx): 1724 if not text: 1725 completions = list(self.bt_control_names)[:] 1726 else: 1727 completions = [ 1728 s for s in self.bt_control_names if s.startswith(text) 1729 ] 1730 return completions 1731 1732 def do_btc_connect_device_by_name(self, device_name): 1733 """ 1734 Description: Connect to device id based on pre-defined inputs. 1735 Supports Tab Autocomplete. 1736 Input(s): 1737 device_id: The device id to connect to. 1738 1739 Usage: 1740 Examples: 1741 btc_connect_device_by_name <device_id> 1742 """ 1743 cmd = "Connect to device name based on pre-defined inputs." 1744 try: 1745 for device in self.bt_control_devices: 1746 if device_name is device['name']: 1747 1748 result = self.pri_dut.btc_lib.connectDevice(device['id']) 1749 self.log.info(result) 1750 except Exception as err: 1751 self.log.error(FAILURE.format(cmd, err)) 1752 1753 def do_btc_disconnect_device(self, line): 1754 """ 1755 Description: Disconnect to device under test. 1756 Device under test is specified by either user params 1757 or 1758 tool_set_target_device_name <name> 1759 do_tool_refresh_unique_id_using_bt_control 1760 1761 Usage: 1762 Examples: 1763 btc_disconnect_device 1764 """ 1765 cmd = "Disconnect to device under test." 1766 try: 1767 result = self.pri_dut.btc_lib.disconnectDevice( 1768 self.unique_mac_addr_id) 1769 self.log.info(result) 1770 except Exception as err: 1771 self.log.error(FAILURE.format(cmd, err)) 1772 1773 def do_btc_init_bluetooth_control(self, line): 1774 """ 1775 Description: Initialize the Bluetooth Controller. 1776 1777 Usage: 1778 Examples: 1779 btc_init_bluetooth_control 1780 """ 1781 cmd = "Initialize the Bluetooth Controller." 1782 try: 1783 result = self.test_dut.initialize_bluetooth_controller() 1784 self.log.info(result) 1785 except Exception as err: 1786 self.log.error(FAILURE.format(cmd, err)) 1787 1788 def do_btc_get_local_address(self, line): 1789 """ 1790 Description: Get the local BR/EDR address of the Bluetooth Controller. 1791 1792 Usage: 1793 Examples: 1794 btc_get_local_address 1795 """ 1796 cmd = "Get the local BR/EDR address of the Bluetooth Controller." 1797 try: 1798 result = self.test_dut.get_local_bluetooth_address() 1799 self.log.info(result) 1800 except Exception as err: 1801 self.log.error(FAILURE.format(cmd, err)) 1802 1803 def do_btc_input_pairing_pin(self, line): 1804 """ 1805 Description: Sends a pairing pin to SL4F's Bluetooth Control's 1806 Pairing Delegate. 1807 1808 Usage: 1809 Examples: 1810 btc_input_pairing_pin 123456 1811 """ 1812 cmd = "Input pairing pin to the Fuchsia device." 1813 try: 1814 result = self.pri_dut.btc_lib.inputPairingPin(line)['result'] 1815 self.log.info(result) 1816 except Exception as err: 1817 self.log.error(FAILURE.format(cmd, err)) 1818 1819 def do_btc_get_pairing_pin(self, line): 1820 """ 1821 Description: Gets the pairing pin from SL4F's Bluetooth Control's 1822 Pairing Delegate. 1823 1824 Usage: 1825 Examples: 1826 btc_get_pairing_pin 1827 """ 1828 cmd = "Get the pairing pin from the Fuchsia device." 1829 try: 1830 result = self.pri_dut.btc_lib.getPairingPin()['result'] 1831 self.log.info(result) 1832 except Exception as err: 1833 self.log.error(FAILURE.format(cmd, err)) 1834 1835 """End Bluetooth Control wrappers""" 1836 """Begin Profile Server wrappers""" 1837 1838 def do_sdp_pts_example(self, num_of_records): 1839 """ 1840 Description: An example of how to setup a generic SDP record 1841 and SDP search capabilities. This example will pass a few 1842 SDP tests. 1843 1844 Input(s): 1845 num_of_records: The number of records to add. 1846 1847 Usage: 1848 Examples: 1849 sdp_pts_example 1 1850 sdp pts_example 10 1851 """ 1852 cmd = "Setup SDP for PTS testing." 1853 1854 attributes = [ 1855 bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'], 1856 bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'], 1857 bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'], 1858 bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'], 1859 ] 1860 1861 try: 1862 self.pri_dut.sdp_lib.addSearch( 1863 attributes, int(sig_uuid_constants['AudioSource'], 16)) 1864 self.pri_dut.sdp_lib.addSearch( 1865 attributes, int(sig_uuid_constants['A/V_RemoteControl'], 16)) 1866 self.pri_dut.sdp_lib.addSearch(attributes, 1867 int(sig_uuid_constants['PANU'], 16)) 1868 self.pri_dut.sdp_lib.addSearch( 1869 attributes, int(sig_uuid_constants['SerialPort'], 16)) 1870 self.pri_dut.sdp_lib.addSearch( 1871 attributes, int(sig_uuid_constants['DialupNetworking'], 16)) 1872 self.pri_dut.sdp_lib.addSearch( 1873 attributes, int(sig_uuid_constants['OBEXObjectPush'], 16)) 1874 self.pri_dut.sdp_lib.addSearch( 1875 attributes, int(sig_uuid_constants['OBEXFileTransfer'], 16)) 1876 self.pri_dut.sdp_lib.addSearch( 1877 attributes, int(sig_uuid_constants['Headset'], 16)) 1878 self.pri_dut.sdp_lib.addSearch( 1879 attributes, int(sig_uuid_constants['HandsfreeAudioGateway'], 1880 16)) 1881 self.pri_dut.sdp_lib.addSearch( 1882 attributes, int(sig_uuid_constants['Handsfree'], 16)) 1883 self.pri_dut.sdp_lib.addSearch( 1884 attributes, int(sig_uuid_constants['SIM_Access'], 16)) 1885 for i in range(int(num_of_records)): 1886 result = self.pri_dut.sdp_lib.addService( 1887 sdp_pts_record_list[i]) 1888 self.log.info(result) 1889 except Exception as err: 1890 self.log.error(FAILURE.format(cmd, err)) 1891 1892 def do_sdp_cleanup(self, line): 1893 """ 1894 Description: Cleanup any existing SDP records 1895 1896 Usage: 1897 Examples: 1898 sdp_cleanup 1899 """ 1900 cmd = "Cleanup SDP objects." 1901 try: 1902 result = self.pri_dut.sdp_lib.cleanUp() 1903 self.log.info(result) 1904 except Exception as err: 1905 self.log.error(FAILURE.format(cmd, err)) 1906 1907 def do_sdp_init(self, line): 1908 """ 1909 Description: Init the profile proxy for setting up SDP records 1910 1911 Usage: 1912 Examples: 1913 sdp_init 1914 """ 1915 cmd = "Initialize profile proxy objects for adding SDP records" 1916 try: 1917 result = self.pri_dut.sdp_lib.init() 1918 self.log.info(result) 1919 except Exception as err: 1920 self.log.error(FAILURE.format(cmd, err)) 1921 1922 def do_sdp_connect_l2cap(self, line): 1923 """ 1924 Description: Send an l2cap connection request over an input psm value. 1925 1926 Note: Must be already connected to a peer. 1927 1928 Input(s): 1929 psm: The int hex value to connect over. Available PSMs: 1930 SDP 0x0001 See Bluetooth Service Discovery Protocol (SDP) 1931 RFCOMM 0x0003 See RFCOMM with TS 07.10 1932 TCS-BIN 0x0005 See Bluetooth Telephony Control Specification / 1933 TCS Binary 1934 TCS-BIN-CORDLESS 0x0007 See Bluetooth Telephony Control 1935 Specification / TCS Binary 1936 BNEP 0x000F See Bluetooth Network Encapsulation Protocol 1937 HID_Control 0x0011 See Human Interface Device 1938 HID_Interrupt 0x0013 See Human Interface Device 1939 UPnP 0x0015 See [ESDP] 1940 AVCTP 0x0017 See Audio/Video Control Transport Protocol 1941 AVDTP 0x0019 See Audio/Video Distribution Transport Protocol 1942 AVCTP_Browsing 0x001B See Audio/Video Remote Control Profile 1943 UDI_C-Plane 0x001D See the Unrestricted Digital Information 1944 Profile [UDI] 1945 ATT 0x001F See Bluetooth Core Specification 1946 3DSP 0x0021 See 3D Synchronization Profile. 1947 LE_PSM_IPSP 0x0023 See Internet Protocol Support Profile 1948 (IPSP) 1949 OTS 0x0025 See Object Transfer Service (OTS) 1950 EATT 0x0027 See Bluetooth Core Specification 1951 mode: String - The channel mode to connect to. Available values: 1952 Basic mode: BASIC 1953 Enhanced Retransmission mode: ERTM 1954 1955 Usage: 1956 Examples: 1957 sdp_connect_l2cap 0001 BASIC 1958 sdp_connect_l2cap 0019 ERTM 1959 """ 1960 cmd = "Connect l2cap" 1961 try: 1962 info = line.split() 1963 result = self.pri_dut.sdp_lib.connectL2cap(self.unique_mac_addr_id, 1964 int(info[0], 16), 1965 info[1]) 1966 self.log.info(result) 1967 except Exception as err: 1968 self.log.error(FAILURE.format(cmd, err)) 1969 1970 """End Profile Server wrappers""" 1971 """Begin AVDTP wrappers""" 1972 1973 def complete_avdtp_init(self, text, line, begidx, endidx): 1974 roles = ["sink", "source"] 1975 if not text: 1976 completions = roles 1977 else: 1978 completions = [s for s in roles if s.startswith(text)] 1979 return completions 1980 1981 def do_avdtp_init(self, role): 1982 """ 1983 Description: Init the AVDTP and A2DP service corresponding to the input 1984 role. 1985 1986 Input(s): 1987 role: The specified role. Either 'source' or 'sink'. 1988 1989 Usage: 1990 Examples: 1991 avdtp_init source 1992 avdtp_init sink 1993 """ 1994 cmd = "Initialize AVDTP proxy" 1995 try: 1996 result = self.pri_dut.avdtp_lib.init(role) 1997 self.log.info(result) 1998 except Exception as err: 1999 self.log.error(FAILURE.format(cmd, err)) 2000 2001 def do_avdtp_kill_a2dp_sink(self, line): 2002 """ 2003 Description: Quickly kill any A2DP sink service currently running on the 2004 device. 2005 2006 Usage: 2007 Examples: 2008 avdtp_kill_a2dp_sink 2009 """ 2010 cmd = "Killing A2DP sink" 2011 try: 2012 result = self.pri_dut.control_daemon("bt-a2dp-sink.cmx", "stop") 2013 self.log.info(result) 2014 except Exception as err: 2015 self.log.error(FAILURE.format(cmd, err)) 2016 2017 def do_avdtp_kill_a2dp_source(self, line): 2018 """ 2019 Description: Quickly kill any A2DP source service currently running on 2020 the device. 2021 2022 Usage: 2023 Examples: 2024 avdtp_kill_a2dp_source 2025 """ 2026 cmd = "Killing A2DP source" 2027 try: 2028 result = self.pri_dut.control_daemon("bt-a2dp-source.cmx", "stop") 2029 self.log.info(result) 2030 except Exception as err: 2031 self.log.error(FAILURE.format(cmd, err)) 2032 2033 def do_avdtp_get_connected_peers(self, line): 2034 """ 2035 Description: Get the connected peers for the AVDTP service 2036 2037 Usage: 2038 Examples: 2039 avdtp_get_connected_peers 2040 """ 2041 cmd = "AVDTP get connected peers" 2042 try: 2043 result = self.pri_dut.avdtp_lib.getConnectedPeers() 2044 self.log.info(result) 2045 except Exception as err: 2046 self.log.error(FAILURE.format(cmd, err)) 2047 2048 def do_avdtp_set_configuration(self, peer_id): 2049 """ 2050 Description: Send AVDTP command to connected peer: set configuration 2051 2052 Input(s): 2053 peer_id: The specified peer_id. 2054 2055 Usage: 2056 Examples: 2057 avdtp_set_configuration <peer_id> 2058 """ 2059 cmd = "Send AVDTP set configuration to connected peer" 2060 try: 2061 result = self.pri_dut.avdtp_lib.setConfiguration(int(peer_id)) 2062 self.log.info(result) 2063 except Exception as err: 2064 self.log.error(FAILURE.format(cmd, err)) 2065 2066 def do_avdtp_get_configuration(self, peer_id): 2067 """ 2068 Description: Send AVDTP command to connected peer: get configuration 2069 2070 Input(s): 2071 peer_id: The specified peer_id. 2072 2073 Usage: 2074 Examples: 2075 avdtp_get_configuration <peer_id> 2076 """ 2077 cmd = "Send AVDTP get configuration to connected peer" 2078 try: 2079 result = self.pri_dut.avdtp_lib.getConfiguration(int(peer_id)) 2080 self.log.info(result) 2081 except Exception as err: 2082 self.log.error(FAILURE.format(cmd, err)) 2083 2084 def do_avdtp_get_capabilities(self, peer_id): 2085 """ 2086 Description: Send AVDTP command to connected peer: get capabilities 2087 2088 Input(s): 2089 peer_id: The specified peer_id. 2090 2091 Usage: 2092 Examples: 2093 avdtp_get_capabilities <peer_id> 2094 """ 2095 cmd = "Send AVDTP get capabilities to connected peer" 2096 try: 2097 result = self.pri_dut.avdtp_lib.getCapabilities(int(peer_id)) 2098 self.log.info(result) 2099 except Exception as err: 2100 self.log.error(FAILURE.format(cmd, err)) 2101 2102 def do_avdtp_get_all_capabilities(self, peer_id): 2103 """ 2104 Description: Send AVDTP command to connected peer: get all capabilities 2105 2106 Input(s): 2107 peer_id: The specified peer_id. 2108 2109 Usage: 2110 Examples: 2111 avdtp_get_all_capabilities <peer_id> 2112 """ 2113 cmd = "Send AVDTP get all capabilities to connected peer" 2114 try: 2115 result = self.pri_dut.avdtp_lib.getAllCapabilities(int(peer_id)) 2116 self.log.info(result) 2117 except Exception as err: 2118 self.log.error(FAILURE.format(cmd, err)) 2119 2120 def do_avdtp_reconfigure_stream(self, peer_id): 2121 """ 2122 Description: Send AVDTP command to connected peer: reconfigure stream 2123 2124 Input(s): 2125 peer_id: The specified peer_id. 2126 2127 Usage: 2128 Examples: 2129 avdtp_reconfigure_stream <peer_id> 2130 """ 2131 cmd = "Send AVDTP reconfigure stream to connected peer" 2132 try: 2133 result = self.pri_dut.avdtp_lib.reconfigureStream(int(peer_id)) 2134 self.log.info(result) 2135 except Exception as err: 2136 self.log.error(FAILURE.format(cmd, err)) 2137 2138 def do_avdtp_suspend_stream(self, peer_id): 2139 """ 2140 Description: Send AVDTP command to connected peer: suspend stream 2141 2142 Input(s): 2143 peer_id: The specified peer_id. 2144 2145 Usage: 2146 Examples: 2147 avdtp_suspend_stream <peer_id> 2148 """ 2149 cmd = "Send AVDTP suspend stream to connected peer" 2150 try: 2151 result = self.pri_dut.avdtp_lib.suspendStream(int(peer_id)) 2152 self.log.info(result) 2153 except Exception as err: 2154 self.log.error(FAILURE.format(cmd, err)) 2155 2156 def do_avdtp_suspend_reconfigure(self, peer_id): 2157 """ 2158 Description: Send AVDTP command to connected peer: suspend reconfigure 2159 2160 Input(s): 2161 peer_id: The specified peer_id. 2162 2163 Usage: 2164 Examples: 2165 avdtp_suspend_reconfigure <peer_id> 2166 """ 2167 cmd = "Send AVDTP suspend reconfigure to connected peer" 2168 try: 2169 result = self.pri_dut.avdtp_lib.suspendAndReconfigure(int(peer_id)) 2170 self.log.info(result) 2171 except Exception as err: 2172 self.log.error(FAILURE.format(cmd, err)) 2173 2174 def do_avdtp_release_stream(self, peer_id): 2175 """ 2176 Description: Send AVDTP command to connected peer: release stream 2177 2178 Input(s): 2179 peer_id: The specified peer_id. 2180 2181 Usage: 2182 Examples: 2183 avdtp_release_stream <peer_id> 2184 """ 2185 cmd = "Send AVDTP release stream to connected peer" 2186 try: 2187 result = self.pri_dut.avdtp_lib.releaseStream(int(peer_id)) 2188 self.log.info(result) 2189 except Exception as err: 2190 self.log.error(FAILURE.format(cmd, err)) 2191 2192 def do_avdtp_establish_stream(self, peer_id): 2193 """ 2194 Description: Send AVDTP command to connected peer: establish stream 2195 2196 Input(s): 2197 peer_id: The specified peer_id. 2198 2199 Usage: 2200 Examples: 2201 avdtp_establish_stream <peer_id> 2202 """ 2203 cmd = "Send AVDTP establish stream to connected peer" 2204 try: 2205 result = self.pri_dut.avdtp_lib.establishStream(int(peer_id)) 2206 self.log.info(result) 2207 except Exception as err: 2208 self.log.error(FAILURE.format(cmd, err)) 2209 2210 def do_avdtp_start_stream(self, peer_id): 2211 """ 2212 Description: Send AVDTP command to connected peer: start stream 2213 2214 Input(s): 2215 peer_id: The specified peer_id. 2216 2217 Usage: 2218 Examples: 2219 avdtp_start_stream <peer_id> 2220 """ 2221 cmd = "Send AVDTP start stream to connected peer" 2222 try: 2223 result = self.pri_dut.avdtp_lib.startStream(int(peer_id)) 2224 self.log.info(result) 2225 except Exception as err: 2226 self.log.error(FAILURE.format(cmd, err)) 2227 2228 def do_avdtp_abort_stream(self, peer_id): 2229 """ 2230 Description: Send AVDTP command to connected peer: abort stream 2231 2232 Input(s): 2233 peer_id: The specified peer_id. 2234 2235 Usage: 2236 Examples: 2237 avdtp_abort_stream <peer_id> 2238 """ 2239 cmd = "Send AVDTP abort stream to connected peer" 2240 try: 2241 result = self.pri_dut.avdtp_lib.abortStream(int(peer_id)) 2242 self.log.info(result) 2243 except Exception as err: 2244 self.log.error(FAILURE.format(cmd, err)) 2245 2246 def do_avdtp_remove_service(self, line): 2247 """ 2248 Description: Removes the AVDTP service in use. 2249 2250 Usage: 2251 Examples: 2252 avdtp_establish_stream <peer_id> 2253 """ 2254 cmd = "Remove AVDTP service" 2255 try: 2256 result = self.pri_dut.avdtp_lib.removeService() 2257 self.log.info(result) 2258 except Exception as err: 2259 self.log.error(FAILURE.format(cmd, err)) 2260 2261 """End AVDTP wrappers""" 2262