1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Python script for wrappers to various libraries.
18
19Class CmdInput inherts from the cmd library.
20
21Functions that start with "do_" have a method
22signature that doesn't match the actual command
23line command and that is intended. This is so the
24"help" command knows what to display (in this case
25the documentation of the command itself).
26
27For example:
28Looking at the function "do_tool_set_target_device_name"
29has the inputs self and line which is expected of this type
30of method signature. When the "help" command is done on the
31method name you get the function documentation as such:
32
33(Cmd) help tool_set_target_device_name
34
35        Description: Reset the target device name.
36        Input(s):
37            device_name: Required. The advertising name to connect to.
38        Usage: tool_set_target_device_name new_target_device name
39          Examples:
40            tool_set_target_device_name le_watch
41
42This is all to say this documentation pattern is expected.
43
44"""
45
46from acts.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device
47from acts.test_utils.bt.bt_constants import bt_attribute_values
48from acts.test_utils.bt.bt_constants import sig_appearance_constants
49from acts.test_utils.bt.bt_constants import sig_uuid_constants
50from acts.test_utils.fuchsia.sdp_records import sdp_pts_record_list
51
52import acts.test_utils.bt.gatt_test_database as gatt_test_database
53
54import cmd
55import pprint
56import time
57"""Various Global Strings"""
58BASE_UUID = sig_uuid_constants['BASE_UUID']
59CMD_LOG = "CMD {} result: {}"
60FAILURE = "CMD {} threw exception: {}"
61BASIC_ADV_NAME = "fs_test"
62
63
64class CommandInput(cmd.Cmd):
65    ble_adv_interval = 1000
66    ble_adv_appearance = None
67    ble_adv_data_include_tx_power_level = False
68    ble_adv_include_name = True
69    ble_adv_include_scan_response = False
70    ble_adv_name = "fs_test"
71    ble_adv_data_manufacturer_data = None
72    ble_adv_data_service_data = None
73    ble_adv_data_service_uuid_list = None
74    ble_adv_data_uris = None
75
76    bt_control_ids = []
77    bt_control_names = []
78    bt_control_devices = []
79    bt_scan_poll_timer = 0.5
80    target_device_name = ""
81    le_ids = []
82    unique_mac_addr_id = None
83
84    def setup_vars(self, dut, target_device_name, log):
85        self.pri_dut = dut
86        # Note: test_dut is the start of a slow conversion from a Fuchsia specific
87        # Tool to an abstract_device tool. Only commands that use test_dut will work
88        # Otherwise this tool is primarially targeted at Fuchsia devices.
89        self.test_dut = create_bluetooth_device(self.pri_dut)
90        self.test_dut.initialize_bluetooth_controller()
91        self.target_device_name = target_device_name
92        self.log = log
93
94    def emptyline(self):
95        pass
96
97    def do_EOF(self, line):
98        "End Script"
99        return True
100
101    """ Useful Helper functions and cmd line tooling """
102
103    def str_to_bool(self, s):
104        if s.lower() == 'true':
105            return True
106        elif s.lower() == 'false':
107            return False
108
109    def _find_unique_id_over_le(self):
110        scan_filter = {"name_substring": self.target_device_name}
111        self.unique_mac_addr_id = None
112        self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
113        tries = 10
114        for i in range(tries):
115            time.sleep(self.bt_scan_poll_timer)
116            scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices(
117            )['result']
118            for device in scan_res:
119                name, did, connectable = device["name"], device["id"], device[
120                    "connectable"]
121                if (self.target_device_name in name):
122                    self.unique_mac_addr_id = did
123                    self.log.info(
124                        "Successfully found device: name, id: {}, {}".format(
125                            name, did))
126                    break
127            if self.unique_mac_addr_id:
128                break
129        self.pri_dut.gattc_lib.bleStopBleScan()
130
131    def _find_unique_id_over_bt_control(self):
132        self.unique_mac_addr_id = None
133        self.bt_control_devices = []
134        self.pri_dut.btc_lib.requestDiscovery(True)
135        tries = 10
136        for i in range(tries):
137            if self.unique_mac_addr_id:
138                break
139            time.sleep(self.bt_scan_poll_timer)
140            device_list = self.pri_dut.btc_lib.getKnownRemoteDevices(
141            )['result']
142            for id_dict in device_list:
143                device = device_list[id_dict]
144                self.bt_control_devices.append(device)
145                name = None
146                if device['name'] is not None:
147                    name = device['name']
148                did, address = device['id'], device['address']
149
150                self.bt_control_ids.append(did)
151                if name is not None:
152                    self.bt_control_names.append(name)
153                    if self.target_device_name in name:
154                        self.unique_mac_addr_id = did
155                        self.log.info(
156                            "Successfully found device: name, id, address: {}, {}, {}"
157                            .format(name, did, address))
158                        break
159        self.pri_dut.btc_lib.requestDiscovery(False)
160
161    def do_tool_take_bt_snoop_log(self, custom_name):
162        """
163        Description: Takes the bt snoop log from the Fuchsia device.
164        Logs will show up in your config files' logpath directory.
165
166        Input(s):
167            custom_name: Optional. Override the default pcap file name.
168
169        Usage: tool_set_target_device_name new_target_device name
170          Examples:
171            tool_take_bt_snoop_log connection_error
172            tool_take_bt_snoop_log
173        """
174        self.pri_dut.take_bt_snoop_log(custom_name)
175
176    def do_tool_refresh_unique_id(self, line):
177        """
178        Description: Refresh command line tool mac unique id.
179        Usage:
180          Examples:
181            tool_refresh_unique_id
182        """
183        try:
184            self._find_unique_id_over_le()
185        except Exception as err:
186            self.log.error(
187                "Failed to scan or find scan result: {}".format(err))
188
189    def do_tool_refresh_unique_id_using_bt_control(self, line):
190        """
191        Description: Refresh command line tool mac unique id.
192        Usage:
193          Examples:
194            tool_refresh_unique_id_using_bt_control
195        """
196        try:
197            self._find_unique_id_over_bt_control()
198        except Exception as err:
199            self.log.error(
200                "Failed to scan or find scan result: {}".format(err))
201
202    def do_tool_set_target_device_name(self, line):
203        """
204        Description: Reset the target device name.
205        Input(s):
206            device_name: Required. The advertising name to connect to.
207        Usage: tool_set_target_device_name new_target_device name
208          Examples:
209            tool_set_target_device_name le_watch
210        """
211        self.log.info("Setting target_device_name to: {}".format(line))
212        self.target_device_name = line
213
214    def do_tool_set_unique_mac_addr_id(self, line):
215        """
216        Description: Sets the unique mac address id (Specific to Fuchsia)
217        Input(s):
218            device_id: Required. The id to set the unique mac address id to
219        Usage: tool_set_unique_mac_addr_id device_id
220          Examples:
221            tool_set_unique_mac_addr_id 7fb2cae53aad9e0d
222        """
223        self.unique_mac_addr_id = line
224
225    """Begin BLE advertise wrappers"""
226
227    def complete_ble_adv_data_include_name(self, text, line, begidx, endidx):
228        roles = ["true", "false"]
229        if not text:
230            completions = roles
231        else:
232            completions = [s for s in roles if s.startswith(text)]
233        return completions
234
235    def do_ble_adv_data_include_name(self, line):
236        cmd = "Include name in the advertisement."
237        try:
238            self.ble_adv_include_name = self.str_to_bool(line)
239        except Exception as err:
240            self.log.error(FAILURE.format(cmd, err))
241
242    def do_ble_adv_data_set_name(self, line):
243        cmd = "Set the name to be included in the advertisement."
244        try:
245            self.ble_adv_name = line
246        except Exception as err:
247            self.log.error(FAILURE.format(cmd, err))
248
249    def complete_ble_adv_data_set_appearance(self, text, line, begidx, endidx):
250        if not text:
251            completions = list(sig_appearance_constants.keys())
252        else:
253            completions = [
254                s for s in sig_appearance_constants.keys()
255                if s.startswith(text)
256            ]
257        return completions
258
259    def do_ble_adv_data_set_appearance(self, line):
260        cmd = "Set the appearance to known SIG values."
261        try:
262            self.ble_adv_appearance = line
263        except Exception as err:
264            self.log.error(FAILURE.format(cmd, err))
265
266    def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx,
267                                                     endidx):
268        options = ['true', 'false']
269        if not text:
270            completions = list(options)[:]
271        else:
272            completions = [s for s in options if s.startswith(text)]
273        return completions
274
275    def do_ble_adv_data_include_tx_power_level(self, line):
276        """Include the tx_power_level in the advertising data.
277        Description: Adds tx_power_level to the advertisement data to the BLE
278            advertisement.
279        Input(s):
280            value: Required. True or False
281        Usage: ble_adv_data_include_tx_power_level bool_value
282          Examples:
283            ble_adv_data_include_tx_power_level true
284            ble_adv_data_include_tx_power_level false
285        """
286        cmd = "Include tx_power_level in advertisement."
287        try:
288            self.ble_adv_data_include_tx_power_level = self.str_to_bool(line)
289        except Exception as err:
290            self.log.info(FAILURE.format(cmd, err))
291
292    def complete_ble_adv_include_scan_response(self, text, line, begidx,
293                                               endidx):
294        options = ['true', 'false']
295        if not text:
296            completions = list(options)[:]
297        else:
298            completions = [s for s in options if s.startswith(text)]
299        return completions
300
301    def do_ble_adv_include_scan_response(self, line):
302        """Include scan response in advertisement. inputs: [true|false]
303            Note: Currently just sets the scan response data to the
304                Advertisement data.
305        """
306        cmd = "Include tx_power_level in advertisement."
307        try:
308            self.ble_adv_include_scan_response = self.str_to_bool(line)
309        except Exception as err:
310            self.log.info(FAILURE.format(cmd, err))
311
312    def do_ble_adv_data_add_manufacturer_data(self, line):
313        """Include manufacturer id and data to the advertisment
314        Description: Adds manufacturer data to the BLE advertisement.
315        Input(s):
316            id: Required. The int representing the manufacturer id.
317            data: Required. The string representing the data.
318        Usage: ble_adv_data_add_manufacturer_data id data
319          Examples:
320            ble_adv_data_add_manufacturer_data 1 test
321        """
322        cmd = "Include manufacturer id and data to the advertisment."
323        try:
324
325            info = line.split()
326            if self.ble_adv_data_manufacturer_data is None:
327                self.ble_adv_data_manufacturer_data = []
328            self.ble_adv_data_manufacturer_data.append({
329                "id": int(info[0]),
330                "data": info[1]
331            })
332        except Exception as err:
333            self.log.info(FAILURE.format(cmd, err))
334
335    def do_ble_adv_data_add_service_data(self, line):
336        """Include service data to the advertisment
337        Description: Adds service data to the BLE advertisement.
338        Input(s):
339            uuid: Required. The string representing the uuid.
340            data: Required. The string representing the data.
341        Usage: ble_adv_data_add_service_data uuid data
342          Examples:
343            ble_adv_data_add_service_data 00001801-0000-1000-8000-00805f9b34fb test
344        """
345        cmd = "Include manufacturer id and data to the advertisment."
346        try:
347            info = line.split()
348            if self.ble_adv_data_service_data is None:
349                self.ble_adv_data_service_data = []
350            self.ble_adv_data_service_data.append({
351                "uuid": info[0],
352                "data": info[1]
353            })
354        except Exception as err:
355            self.log.info(FAILURE.format(cmd, err))
356
357    def do_ble_adv_add_service_uuid_list(self, line):
358        """Include a list of service uuids to the advertisment:
359        Description: Adds service uuid list to the BLE advertisement.
360        Input(s):
361            uuid: Required. A list of N string UUIDs to add.
362        Usage: ble_adv_add_service_uuid_list uuid0 uuid1 ... uuidN
363          Examples:
364            ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb
365            ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 00001802-0000-1000-8000-00805f9b34fb
366        """
367        cmd = "Include service uuid list to the advertisment data."
368        try:
369            self.ble_adv_data_service_uuid_list = line
370        except Exception as err:
371            self.log.info(FAILURE.format(cmd, err))
372
373    def do_ble_adv_data_set_uris(self, uris):
374        """Set the URIs of the LE advertisement data:
375        Description: Adds list of String UIRs
376          See (RFC 3986 1.1.2 https://tools.ietf.org/html/rfc3986)
377          Valid URI examples:
378            ftp://ftp.is.co.za/rfc/rfc1808.txt
379            http://www.ietf.org/rfc/rfc2396.txt
380            ldap://[2001:db8::7]/c=GB?objectClass?one
381            mailto:John.Doe@example.com
382            news:comp.infosystems.www.servers.unix
383            tel:+1-816-555-1212
384            telnet://192.0.2.16:80/
385            urn:oasis:names:specification:docbook:dtd:xml:4.1.2
386        Input(s):
387            uris: Required. A list of URIs to add.
388        Usage: ble_adv_data_set_uris uri0 uri1 ... uriN
389          Examples:
390            ble_adv_data_set_uris telnet://192.0.2.16:80/
391            ble_adv_data_set_uris tel:+1-816-555-1212
392        """
393        cmd = "Set the appearance to known SIG values."
394        try:
395            self.ble_adv_data_uris = uris.split()
396        except Exception as err:
397            self.log.error(FAILURE.format(cmd, err))
398
399    def start_advertisement(self, connectable):
400        """ Handle setting advertising data and the advertisement
401            Note: After advertisement is successful, clears values set for
402                * Manufacturer data
403                * Appearance information
404                * Scan Response
405                * Service UUIDs
406                * URI list
407            Args:
408                connectable: Bool of whether to start a connectable
409                    advertisement or not.
410        """
411        adv_data_name = self.ble_adv_name
412        if not self.ble_adv_include_name:
413            adv_data_name = None
414
415        manufacturer_data = self.ble_adv_data_manufacturer_data
416
417        tx_power_level = None
418        if self.ble_adv_data_include_tx_power_level:
419            tx_power_level = 1  # Not yet implemented so set to 1
420
421        scan_response = self.ble_adv_include_scan_response
422
423        adv_data = {
424            "name": adv_data_name,
425            "appearance": self.ble_adv_appearance,
426            "service_data": self.ble_adv_data_service_data,
427            "tx_power_level": tx_power_level,
428            "service_uuids": self.ble_adv_data_service_uuid_list,
429            "manufacturer_data": manufacturer_data,
430            "uris": self.ble_adv_data_uris,
431        }
432
433        if not self.ble_adv_include_scan_response:
434            scan_response = None
435        else:
436            scan_response = adv_data
437
438        result = self.pri_dut.ble_lib.bleStartBleAdvertising(
439            adv_data, scan_response, self.ble_adv_interval, connectable)
440        self.log.info("Result of starting advertisement: {}".format(result))
441        self.ble_adv_data_manufacturer_data = None
442        self.ble_adv_appearance = None
443        self.ble_adv_include_scan_response = False
444        self.ble_adv_data_service_uuid_list = None
445        self.ble_adv_data_uris = None
446        self.ble_adv_data_service_data = None
447
448    def do_ble_start_generic_connectable_advertisement(self, line):
449        """
450        Description: Start a connectable LE advertisement
451
452        Usage: ble_start_generic_connectable_advertisement
453        """
454        cmd = "Start a connectable LE advertisement"
455        try:
456            connectable = True
457            self.start_advertisement(connectable)
458        except Exception as err:
459            self.log.error(FAILURE.format(cmd, err))
460
461    def do_ble_start_generic_nonconnectable_advertisement(self, line):
462        """
463        Description: Start a non-connectable LE advertisement
464
465        Usage: ble_start_generic_nonconnectable_advertisement
466        """
467        cmd = "Start a nonconnectable LE advertisement"
468        try:
469            connectable = False
470            self.start_advertisement(connectable)
471        except Exception as err:
472            self.log.error(FAILURE.format(cmd, err))
473
474    def do_ble_stop_advertisement(self, line):
475        """
476        Description: Stop a BLE advertisement.
477        Usage: ble_stop_advertisement
478        """
479        cmd = "Stop a connectable LE advertisement"
480        try:
481            self.pri_dut.ble_lib.bleStopBleAdvertising()
482        except Exception as err:
483            self.log.error(FAILURE.format(cmd, err))
484
485    """End BLE advertise wrappers"""
486    """Begin GATT client wrappers"""
487
488    def complete_gattc_connect_by_id(self, text, line, begidx, endidx):
489        if not text:
490            completions = list(self.le_ids)[:]
491        else:
492            completions = [s for s in self.le_ids if s.startswith(text)]
493        return completions
494
495    def do_gattc_connect_by_id(self, line):
496        """
497        Description: Connect to a LE peripheral.
498        Input(s):
499            device_id: Required. The unique device ID from Fuchsia
500                discovered devices.
501        Usage:
502          Examples:
503            gattc_connect device_id
504        """
505        cmd = "Connect to a LE peripheral by input ID."
506        try:
507
508            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
509                line)
510            self.log.info("Connection status: {}".format(
511                pprint.pformat(connection_status)))
512        except Exception as err:
513            self.log.error(FAILURE.format(cmd, err))
514
515    def do_gattc_connect(self, line):
516        """
517        Description: Connect to a LE peripheral.
518        Optional input: device_name
519        Input(s):
520            device_name: Optional. The peripheral ID to connect to.
521        Usage:
522          Examples:
523            gattc_connect
524            gattc_connect eddystone_123
525        """
526        cmd = "Connect to a LE peripheral."
527        try:
528            if len(line) > 0:
529                self.target_device_name = line
530                self.unique_mac_addr_id = None
531            if not self.unique_mac_addr_id:
532                try:
533                    self._find_unique_id()
534                except Exception as err:
535                    self.log.info("Failed to scan or find device.")
536                    return
537            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
538                self.unique_mac_addr_id)
539            self.log.info("Connection status: {}".format(
540                pprint.pformat(connection_status)))
541        except Exception as err:
542            self.log.error(FAILURE.format(cmd, err))
543
544    def do_gattc_connect_disconnect_iterations(self, line):
545        """
546        Description: Connect then disconnect to a LE peripheral multiple times.
547        Input(s):
548            iterations: Required. The number of iterations to run.
549        Usage:
550          Examples:
551            gattc_connect_disconnect_iterations 10
552        """
553        cmd = "Connect to a LE peripheral."
554        try:
555            if not self.unique_mac_addr_id:
556                try:
557                    self._find_unique_id()
558                except Exception as err:
559                    self.log.info("Failed to scan or find device.")
560                    return
561            for i in range(int(line)):
562                self.log.info("Running iteration {}".format(i + 1))
563                connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
564                    self.unique_mac_addr_id)
565                self.log.info("Connection status: {}".format(
566                    pprint.pformat(connection_status)))
567                time.sleep(4)
568                disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
569                    self.unique_mac_addr_id)
570                self.log.info("Disconnect status: {}".format(disc_status))
571                time.sleep(3)
572        except Exception as err:
573            self.log.error(FAILURE.format(cmd, err))
574
575    def do_gattc_disconnect(self, line):
576        """
577        Description: Disconnect from LE peripheral.
578        Assumptions: Already connected to a peripheral.
579        Usage:
580          Examples:
581            gattc_disconnect
582        """
583        cmd = "Disconenct from LE peripheral."
584        try:
585            disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
586                self.unique_mac_addr_id)
587            self.log.info("Disconnect status: {}".format(disconnect_status))
588        except Exception as err:
589            self.log.error(FAILURE.format(cmd, err))
590
591    def do_gattc_list_services(self, discover_chars):
592        """
593        Description: List services from LE peripheral.
594        Assumptions: Already connected to a peripheral.
595        Input(s):
596            discover_chars: Optional. An optional input to discover all
597                characteristics on the service.
598        Usage:
599          Examples:
600            gattc_list_services
601            gattc_list_services true
602        """
603        cmd = "List services from LE peripheral."
604        try:
605
606            services = self.pri_dut.gattc_lib.listServices(
607                self.unique_mac_addr_id)
608            self.log.info("Discovered Services: \n{}".format(
609                pprint.pformat(services)))
610            discover_characteristics = self.str_to_bool(discover_chars)
611            if discover_chars:
612                for service in services.get('result'):
613                    self.pri_dut.gattc_lib.connectToService(
614                        self.unique_mac_addr_id, service.get('id'))
615                    chars = self.pri_dut.gattc_lib.discoverCharacteristics()
616                    self.log.info("Discovered chars:\n{}".format(
617                        pprint.pformat(chars)))
618
619        except Exception as err:
620            self.log.error(FAILURE.format(cmd, err))
621
622    def do_gattc_connect_to_service(self, line):
623        """
624        Description: Connect to Peripheral GATT server service.
625        Assumptions: Already connected to peripheral.
626        Input(s):
627            service_id: Required. The service id reference on the GATT server.
628        Usage:
629          Examples:
630            gattc_connect_to_service service_id
631        """
632        cmd = "GATT client connect to GATT server service."
633        try:
634            self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id,
635                                                    int(line))
636        except Exception as err:
637            self.log.error(FAILURE.format(cmd, err))
638
639    def do_gattc_discover_characteristics(self, line):
640        """
641        Description: Discover characteristics from a connected service.
642        Assumptions: Already connected to a GATT server service.
643        Usage:
644          Examples:
645            gattc_discover_characteristics
646        """
647        cmd = "Discover and list characteristics from a GATT server."
648        try:
649            chars = self.pri_dut.gattc_lib.discoverCharacteristics()
650            self.log.info("Discovered chars:\n{}".format(
651                pprint.pformat(chars)))
652        except Exception as err:
653            self.log.error(FAILURE.format(cmd, err))
654
655    def do_gattc_notify_all_chars(self, line):
656        """
657        Description: Enable all notifications on all Characteristics on
658            a GATT server.
659        Assumptions: Basic GATT connection made.
660        Usage:
661          Examples:
662            gattc_notify_all_chars
663        """
664        cmd = "Read all characteristics from the GATT service."
665        try:
666            services = self.pri_dut.gattc_lib.listServices(
667                self.unique_mac_addr_id)
668            for service in services['result']:
669                service_id = service['id']
670                service_uuid = service['uuid_type']
671                self.pri_dut.gattc_lib.connectToService(
672                    self.unique_mac_addr_id, service_id)
673                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
674                print("Reading chars in service uuid: {}".format(service_uuid))
675
676                for char in chars['result']:
677                    char_id = char['id']
678                    char_uuid = char['uuid_type']
679                    # quick char filter for apple-4 test... remove later
680                    print("found uuid {}".format(char_uuid))
681                    try:
682                        self.pri_dut.gattc_lib.enableNotifyCharacteristic(
683                            char_id)
684                    except Exception as err:
685                        print("error enabling notification")
686        except Exception as err:
687            self.log.error(FAILURE.format(cmd, err))
688
689    def do_gattc_read_all_chars(self, line):
690        """
691        Description: Read all Characteristic values from a GATT server across
692            all services.
693        Assumptions: Basic GATT connection made.
694        Usage:
695          Examples:
696            gattc_read_all_chars
697        """
698        cmd = "Read all characteristics from the GATT service."
699        try:
700            services = self.pri_dut.gattc_lib.listServices(
701                self.unique_mac_addr_id)
702            for service in services['result']:
703                service_id = service['id']
704                service_uuid = service['uuid_type']
705                self.pri_dut.gattc_lib.connectToService(
706                    self.unique_mac_addr_id, service_id)
707                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
708                print("Reading chars in service uuid: {}".format(service_uuid))
709
710                for char in chars['result']:
711                    char_id = char['id']
712                    char_uuid = char['uuid_type']
713                    try:
714                        read_val =  \
715                            self.pri_dut.gattc_lib.readCharacteristicById(
716                                char_id)
717                        print("  Characteristic uuid / Value: {} / {}".format(
718                            char_uuid, read_val['result']))
719                        str_value = ""
720                        for val in read_val['result']:
721                            str_value += chr(val)
722                        print("    str val: {}".format(str_value))
723                    except Exception as err:
724                        print(err)
725                        pass
726        except Exception as err:
727            self.log.error(FAILURE.format(cmd, err))
728
729    def do_gattc_read_all_desc(self, line):
730        """
731        Description: Read all Descriptors values from a GATT server across
732            all services.
733        Assumptions: Basic GATT connection made.
734        Usage:
735          Examples:
736            gattc_read_all_chars
737        """
738        cmd = "Read all descriptors from the GATT service."
739        try:
740            services = self.pri_dut.gattc_lib.listServices(
741                self.unique_mac_addr_id)
742            for service in services['result']:
743                service_id = service['id']
744                service_uuid = service['uuid_type']
745                self.pri_dut.gattc_lib.connectToService(
746                    self.unique_mac_addr_id, service_id)
747                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
748                print("Reading descs in service uuid: {}".format(service_uuid))
749
750                for char in chars['result']:
751                    char_id = char['id']
752                    char_uuid = char['uuid_type']
753                    descriptors = char['descriptors']
754                    print("  Reading descs in char uuid: {}".format(char_uuid))
755                    for desc in descriptors:
756                        desc_id = desc["id"]
757                        desc_uuid = desc["uuid_type"]
758                    try:
759                        read_val = self.pri_dut.gattc_lib.readDescriptorById(
760                            desc_id)
761                        print("    Descriptor uuid / Value: {} / {}".format(
762                            desc_uuid, read_val['result']))
763                    except Exception as err:
764                        pass
765        except Exception as err:
766            self.log.error(FAILURE.format(cmd, err))
767
768    def do_gattc_write_all_desc(self, line):
769        """
770        Description: Write a value to all Descriptors on the GATT server.
771        Assumptions: Basic GATT connection made.
772        Input(s):
773            offset: Required. The offset to start writing to.
774            size: Required. The size of bytes to write (value will be generated).
775                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
776        Usage:
777          Examples:
778            gattc_write_all_desc 0 100
779            gattc_write_all_desc 10 2
780        """
781        cmd = "Read all descriptors from the GATT service."
782        try:
783            args = line.split()
784            if len(args) != 2:
785                self.log.info("2 Arguments required: [Offset] [Size]")
786                return
787            offset = int(args[0])
788            size = args[1]
789            write_value = []
790            for i in range(int(size)):
791                write_value.append(i % 256)
792            services = self.pri_dut.gattc_lib.listServices(
793                self.unique_mac_addr_id)
794            for service in services['result']:
795                service_id = service['id']
796                service_uuid = service['uuid_type']
797                self.pri_dut.gattc_lib.connectToService(
798                    self.unique_mac_addr_id, service_id)
799                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
800                print("Writing descs in service uuid: {}".format(service_uuid))
801
802                for char in chars['result']:
803                    char_id = char['id']
804                    char_uuid = char['uuid_type']
805                    descriptors = char['descriptors']
806                    print("  Reading descs in char uuid: {}".format(char_uuid))
807                    for desc in descriptors:
808                        desc_id = desc["id"]
809                        desc_uuid = desc["uuid_type"]
810                    try:
811                        write_val = self.pri_dut.gattc_lib.writeDescriptorById(
812                            desc_id, offset, write_value)
813                        print("    Descriptor uuid / Result: {} / {}".format(
814                            desc_uuid, write_val['result']))
815                    except Exception as err:
816                        pass
817        except Exception as err:
818            self.log.error(FAILURE.format(cmd, err))
819
820    def do_gattc_read_all_long_desc(self, line):
821        """
822        Description: Read all long Characteristic Descriptors
823        Assumptions: Basic GATT connection made.
824        Input(s):
825            offset: Required. The offset to start reading from.
826            max_bytes: Required. The max size of bytes to return.
827        Usage:
828          Examples:
829            gattc_read_all_long_desc 0 100
830            gattc_read_all_long_desc 10 20
831        """
832        cmd = "Read all long descriptors from the GATT service."
833        try:
834            args = line.split()
835            if len(args) != 2:
836                self.log.info("2 Arguments required: [Offset] [Size]")
837                return
838            offset = int(args[0])
839            max_bytes = int(args[1])
840            services = self.pri_dut.ble_lib.bleListServices(
841                self.unique_mac_addr_id)
842            for service in services['result']:
843                service_id = service['id']
844                service_uuid = service['uuid_type']
845                self.pri_dut.gattc_lib.connectToService(
846                    self.unique_mac_addr_id, service_id)
847                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
848                print("Reading descs in service uuid: {}".format(service_uuid))
849
850                for char in chars['result']:
851                    char_id = char['id']
852                    char_uuid = char['uuid_type']
853                    descriptors = char['descriptors']
854                    print("  Reading descs in char uuid: {}".format(char_uuid))
855                    for desc in descriptors:
856                        desc_id = desc["id"]
857                        desc_uuid = desc["uuid_type"]
858                    try:
859                        read_val = self.pri_dut.gattc_lib.readLongDescriptorById(
860                            desc_id, offset, max_bytes)
861                        print("    Descriptor uuid / Result: {} / {}".format(
862                            desc_uuid, read_val['result']))
863                    except Exception as err:
864                        pass
865        except Exception as err:
866            self.log.error(FAILURE.format(cmd, err))
867
868    def do_gattc_read_all_long_char(self, line):
869        """
870        Description: Read all long Characteristic
871        Assumptions: Basic GATT connection made.
872        Input(s):
873            offset: Required. The offset to start reading from.
874            max_bytes: Required. The max size of bytes to return.
875        Usage:
876          Examples:
877            gattc_read_all_long_char 0 100
878            gattc_read_all_long_char 10 20
879        """
880        cmd = "Read all long Characteristics from the GATT service."
881        try:
882            args = line.split()
883            if len(args) != 2:
884                self.log.info("2 Arguments required: [Offset] [Size]")
885                return
886            offset = int(args[0])
887            max_bytes = int(args[1])
888            services = self.pri_dut.ble_lib.bleListServices(
889                self.unique_mac_addr_id)
890            for service in services['result']:
891                service_id = service['id']
892                service_uuid = service['uuid_type']
893                self.pri_dut.gattc_lib.connectToService(
894                    self.unique_mac_addr_id, service_id)
895                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
896                print("Reading chars in service uuid: {}".format(service_uuid))
897
898                for char in chars['result']:
899                    char_id = char['id']
900                    char_uuid = char['uuid_type']
901                    try:
902                        read_val = self.pri_dut.gattc_lib.readLongCharacteristicById(
903                            char_id, offset, max_bytes)
904                        print("    Char uuid / Result: {} / {}".format(
905                            char_uuid, read_val['result']))
906                    except Exception as err:
907                        pass
908        except Exception as err:
909            self.log.error(FAILURE.format(cmd, err))
910
911    def do_gattc_write_all_chars(self, line):
912        """
913        Description: Write all characteristic values from a GATT server across
914            all services.
915        Assumptions: Basic GATT connection made.
916        Input(s):
917            offset: Required. The offset to start writing on.
918            size: The write value size (value will be generated)
919                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
920        Usage:
921          Examples:
922            gattc_write_all_chars 0 10
923            gattc_write_all_chars 10 1
924        """
925        cmd = "Read all characteristics from the GATT service."
926        try:
927            args = line.split()
928            if len(args) != 2:
929                self.log.info("2 Arguments required: [Offset] [Size]")
930                return
931            offset = int(args[0])
932            size = int(args[1])
933            write_value = []
934            for i in range(size):
935                write_value.append(i % 256)
936            services = self.pri_dut.gattc_lib.listServices(
937                self.unique_mac_addr_id)
938            for service in services['result']:
939                service_id = service['id']
940                service_uuid = service['uuid_type']
941                self.pri_dut.gattc_lib.connectToService(
942                    self.unique_mac_addr_id, service_id)
943                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
944                print("Writing chars in service uuid: {}".format(service_uuid))
945
946                for char in chars['result']:
947                    char_id = char['id']
948                    char_uuid = char['uuid_type']
949                    try:
950                        write_result = self.pri_dut.gattc_lib.writeCharById(
951                            char_id, offset, write_value)
952                        print("  Characteristic uuid write result: {} / {}".
953                              format(char_uuid, write_result['result']))
954                    except Exception as err:
955                        print("error writing char {}".format(err))
956                        pass
957        except Exception as err:
958            self.log.error(FAILURE.format(cmd, err))
959
960    def do_gattc_write_all_chars_without_response(self, line):
961        """
962        Description: Write all characteristic values from a GATT server across
963            all services.
964        Assumptions: Basic GATT connection made.
965        Input(s):
966            size: The write value size (value will be generated).
967                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
968        Usage:
969          Examples:
970            gattc_write_all_chars_without_response 100
971        """
972        cmd = "Read all characteristics from the GATT service."
973        try:
974            args = line.split()
975            if len(args) != 1:
976                self.log.info("1 Arguments required: [Size]")
977                return
978            size = int(args[0])
979            write_value = []
980            for i in range(size):
981                write_value.append(i % 256)
982            services = self.pri_dut.gattc_lib.listServices(
983                self.unique_mac_addr_id)
984            for service in services['result']:
985                service_id = service['id']
986                service_uuid = service['uuid_type']
987                self.pri_dut.gattc_lib.connectToService(
988                    self.unique_mac_addr_id, service_id)
989                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
990                print("Reading chars in service uuid: {}".format(service_uuid))
991
992                for char in chars['result']:
993                    char_id = char['id']
994                    char_uuid = char['uuid_type']
995                    try:
996                        write_result = \
997                            self.pri_dut.gattc_lib.writeCharByIdWithoutResponse(
998                                char_id, write_value)
999                        print("  Characteristic uuid write result: {} / {}".
1000                              format(char_uuid, write_result['result']))
1001                    except Exception as err:
1002                        pass
1003        except Exception as err:
1004            self.log.error(FAILURE.format(cmd, err))
1005
1006    def do_gattc_write_char_by_id(self, line):
1007        """
1008        Description: Write char by characteristic id reference.
1009        Assumptions: Already connected to a GATT server service.
1010        Input(s):
1011            characteristic_id: The characteristic id reference on the GATT
1012                service
1013            offset: The offset value to use
1014            size: Function will generate random bytes by input size.
1015                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1016        Usage:
1017          Examples:
1018            gattc_write_char_by_id char_id 0 5
1019            gattc_write_char_by_id char_id 20 1
1020        """
1021        cmd = "Write to GATT server characteristic ."
1022        try:
1023            args = line.split()
1024            if len(args) != 3:
1025                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1026                return
1027            id = int(args[0], 16)
1028            offset = int(args[1])
1029            size = int(args[2])
1030            write_value = []
1031            for i in range(size):
1032                write_value.append(i % 256)
1033            self.test_dut.gatt_client_write_characteristic_by_handle(
1034                self.unique_mac_addr_id, id, offset, write_value)
1035        except Exception as err:
1036            self.log.error(FAILURE.format(cmd, err))
1037
1038    def do_gattc_write_long_char_by_id(self, line):
1039        """
1040        Description: Write long char by characteristic id reference.
1041        Assumptions: Already connected to a GATT server service.
1042        Input(s):
1043            characteristic_id: The characteristic id reference on the GATT
1044                service
1045            offset: The offset value to use
1046            size: Function will generate random bytes by input size.
1047                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1048            reliable_mode: Optional: Reliable writes represented as bool
1049        Usage:
1050          Examples:
1051            gattc_write_long_char_by_id char_id 0 5
1052            gattc_write_long_char_by_id char_id 20 1
1053            gattc_write_long_char_by_id char_id 20 1 true
1054            gattc_write_long_char_by_id char_id 20 1 false
1055        """
1056        cmd = "Long Write to GATT server characteristic ."
1057        try:
1058            args = line.split()
1059            if len(args) < 3:
1060                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1061                return
1062            id = int(args[0], 16)
1063            offset = int(args[1])
1064            size = int(args[2])
1065            reliable_mode = False
1066            if len(args) > 3:
1067                reliable_mode = self.str_to_bool(args[3])
1068            write_value = []
1069            for i in range(size):
1070                write_value.append(i % 256)
1071            self.test_dut.gatt_client_write_long_characteristic_by_handle(
1072                self.unique_mac_addr_id, id, offset, write_value,
1073                reliable_mode)
1074        except Exception as err:
1075            self.log.error(FAILURE.format(cmd, err))
1076
1077    def do_gattc_write_long_desc_by_id(self, line):
1078        """
1079        Description: Write long char by descrioptor id reference.
1080        Assumptions: Already connected to a GATT server service.
1081        Input(s):
1082            characteristic_id: The characteristic id reference on the GATT
1083                service
1084            offset: The offset value to use
1085            size: Function will generate random bytes by input size.
1086                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1087        Usage:
1088          Examples:
1089            gattc_write_long_desc_by_id char_id 0 5
1090            gattc_write_long_desc_by_id char_id 20 1
1091        """
1092        cmd = "Long Write to GATT server descriptor ."
1093        try:
1094            args = line.split()
1095            if len(args) != 3:
1096                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1097                return
1098            id = int(args[0], 16)
1099            offset = int(args[1])
1100            size = int(args[2])
1101            write_value = []
1102            for i in range(size):
1103                write_value.append(i % 256)
1104            self.test_dut.gatt_client_write_long_descriptor_by_handle(
1105                self.unique_mac_addr_id, id, offset, write_value)
1106        except Exception as err:
1107            self.log.error(FAILURE.format(cmd, err))
1108
1109    def do_gattc_write_char_by_id_without_response(self, line):
1110        """
1111        Description: Write char by characteristic id reference without response.
1112        Assumptions: Already connected to a GATT server service.
1113        Input(s):
1114            characteristic_id: The characteristic id reference on the GATT
1115                service
1116            size: Function will generate random bytes by input size.
1117                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1118        Usage:
1119          Examples:
1120            gattc_write_char_by_id_without_response char_id 5
1121        """
1122        cmd = "Write characteristic by id without response."
1123        try:
1124            args = line.split()
1125            if len(args) != 2:
1126                self.log.info("2 Arguments required: [Id] [Size]")
1127                return
1128            id = int(args[0], 16)
1129            size = args[1]
1130            write_value = []
1131            for i in range(int(size)):
1132                write_value.append(i % 256)
1133            self.test_dut.gatt_client_write_characteristic_without_response_by_handle(
1134                self.unique_mac_addr_id, id, write_value)
1135        except Exception as err:
1136            self.log.error(FAILURE.format(cmd, err))
1137
1138    def do_gattc_enable_notify_char_by_id(self, line):
1139        """
1140        Description: Enable Characteristic notification on Characteristic ID.
1141        Assumptions: Already connected to a GATT server service.
1142        Input(s):
1143            characteristic_id: The characteristic id reference on the GATT
1144                service
1145        Usage:
1146          Examples:
1147            gattc_enable_notify_char_by_id char_id
1148        """
1149        cmd = "Enable notifications by Characteristic id."
1150        try:
1151            id = int(line, 16)
1152            self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle(
1153                self.unique_mac_addr_id, id)
1154        except Exception as err:
1155            self.log.error(FAILURE.format(cmd, err))
1156
1157    def do_gattc_disable_notify_char_by_id(self, line):
1158        """
1159        Description: Disable Characteristic notification on Characteristic ID.
1160        Assumptions: Already connected to a GATT server service.
1161        Input(s):
1162            characteristic_id: The characteristic id reference on the GATT
1163                service
1164        Usage:
1165          Examples:
1166            gattc_disable_notify_char_by_id char_id
1167        """
1168        cmd = "Disable notify Characteristic by id."
1169        try:
1170            id = int(line, 16)
1171            self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle(
1172                self.unique_mac_addr_id, id)
1173        except Exception as err:
1174            self.log.error(FAILURE.format(cmd, err))
1175
1176    def do_gattc_read_char_by_id(self, line):
1177        """
1178        Description: Read Characteristic by ID.
1179        Assumptions: Already connected to a GATT server service.
1180        Input(s):
1181            characteristic_id: The characteristic id reference on the GATT
1182                service
1183        Usage:
1184          Examples:
1185            gattc_read_char_by_id char_id
1186        """
1187        cmd = "Read Characteristic value by ID."
1188        try:
1189            id = int(line, 16)
1190            read_val = self.test_dut.gatt_client_read_characteristic_by_handle(
1191                self.unique_mac_addr_id, id)
1192            self.log.info("Characteristic Value with id {}: {}".format(
1193                id, read_val))
1194        except Exception as err:
1195            self.log.error(FAILURE.format(cmd, err))
1196
1197    def do_gattc_read_char_by_uuid(self, characteristic_uuid):
1198        """
1199        Description: Read Characteristic by UUID (read by type).
1200        Assumptions: Already connected to a GATT server service.
1201        Input(s):
1202            characteristic_uuid: The characteristic id reference on the GATT
1203                service
1204        Usage:
1205          Examples:
1206            gattc_read_char_by_id char_id
1207        """
1208        cmd = "Read Characteristic value by ID."
1209        try:
1210            short_uuid_len = 4
1211            if len(characteristic_uuid) == short_uuid_len:
1212                characteristic_uuid = BASE_UUID.format(characteristic_uuid)
1213
1214            read_val = self.test_dut.gatt_client_read_characteristic_by_uuid(
1215                self.unique_mac_addr_id, characteristic_uuid)
1216            self.log.info("Characteristic Value with id {}: {}".format(
1217                id, read_val))
1218        except Exception as err:
1219            self.log.error(FAILURE.format(cmd, err))
1220
1221    def do_gattc_write_desc_by_id(self, line):
1222        """
1223        Description: Write Descriptor by characteristic id reference.
1224        Assumptions: Already connected to a GATT server service.
1225        Input(s):
1226            descriptor_id: The Descriptor id reference on the GATT service
1227            offset: The offset value to use
1228            size: Function will generate random bytes by input size.
1229                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1230        Usage:
1231          Examples:
1232            gattc_write_desc_by_id desc_id 0 5
1233            gattc_write_desc_by_id desc_id 20 1
1234        """
1235        cmd = "Write Descriptor by id."
1236        try:
1237            args = line.split()
1238            id = int(args[0], 16)
1239            offset = int(args[1])
1240            size = args[2]
1241            write_value = []
1242            for i in range(int(size)):
1243                write_value.append(i % 256)
1244            write_result = self.test_dut.gatt_client_write_descriptor_by_handle(
1245                self.unique_mac_addr_id, id, offset, write_value)
1246            self.log.info("Descriptor Write result {}: {}".format(
1247                id, write_result))
1248        except Exception as err:
1249            self.log.error(FAILURE.format(cmd, err))
1250
1251    def do_gattc_read_desc_by_id(self, line):
1252        """
1253        Description: Read Descriptor by ID.
1254        Assumptions: Already connected to a GATT server service.
1255        Input(s):
1256            descriptor_id: The Descriptor id reference on the GATT service
1257        Usage:
1258          Examples:
1259            gattc_read_desc_by_id desc_id
1260        """
1261        cmd = "Read Descriptor by ID."
1262        try:
1263            id = int(line, 16)
1264            read_val = self.test_dut.gatt_client_read_descriptor_by_handle(
1265                self.unique_mac_addr_id, id)
1266            self.log.info("Descriptor Value with id {}: {}".format(
1267                id, read_val))
1268        except Exception as err:
1269            self.log.error(FAILURE.format(cmd, err))
1270
1271    def do_gattc_read_long_char_by_id(self, line):
1272        """
1273        Description: Read long Characteristic value by id.
1274        Assumptions: Already connected to a GATT server service.
1275        Input(s):
1276            characteristic_id: The characteristic id reference on the GATT
1277                service
1278            offset: The offset value to use.
1279            max_bytes: The max bytes size to return.
1280        Usage:
1281          Examples:
1282            gattc_read_long_char_by_id char_id 0 10
1283            gattc_read_long_char_by_id char_id 20 1
1284        """
1285        cmd = "Read long Characteristic value by id."
1286        try:
1287            args = line.split()
1288            if len(args) != 3:
1289                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1290                return
1291            id = int(args[0], 16)
1292            offset = int(args[1])
1293            max_bytes = int(args[2])
1294            read_val = self.test_dut.gatt_client_read_long_characteristic_by_handle(
1295                self.unique_mac_addr_id, id, offset, max_bytes)
1296            self.log.info("Characteristic Value with id {}: {}".format(
1297                id, read_val['result']))
1298
1299        except Exception as err:
1300            self.log.error(FAILURE.format(cmd, err))
1301
1302    """End GATT client wrappers"""
1303    """Begin LE scan wrappers"""
1304
1305    def _update_scan_results(self, scan_results):
1306        self.le_ids = []
1307        for scan in scan_results['result']:
1308            self.le_ids.append(scan['id'])
1309
1310    def do_ble_start_scan(self, line):
1311        """
1312        Description: Perform a BLE scan.
1313        Default filter name: ""
1314        Optional input: filter_device_name
1315        Usage:
1316          Examples:
1317            ble_start_scan
1318            ble_start_scan eddystone
1319        """
1320        cmd = "Perform a BLE scan and list discovered devices."
1321        try:
1322            scan_filter = {"name_substring": ""}
1323            if line:
1324                scan_filter = {"name_substring": line}
1325            self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
1326        except Exception as err:
1327            self.log.error(FAILURE.format(cmd, err))
1328
1329    def do_ble_stop_scan(self, line):
1330        """
1331        Description: Stops a BLE scan and returns discovered devices.
1332        Usage:
1333          Examples:
1334            ble_stop_scan
1335        """
1336        cmd = "Stops a BLE scan and returns discovered devices."
1337        try:
1338            scan_results = self.pri_dut.gattc_lib.bleStopBleScan()
1339            self._update_scan_results(scan_results)
1340            self.log.info(pprint.pformat(scan_results))
1341        except Exception as err:
1342            self.log.error(FAILURE.format(cmd, err))
1343
1344    def do_ble_get_discovered_devices(self, line):
1345        """
1346        Description: Get discovered LE devices of an active scan.
1347        Usage:
1348          Examples:
1349            ble_stop_scan
1350        """
1351        cmd = "Get discovered LE devices of an active scan."
1352        try:
1353            scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices()
1354            self._update_scan_results(scan_results)
1355            self.log.info(pprint.pformat(scan_results))
1356        except Exception as err:
1357            self.log.error(FAILURE.format(cmd, err))
1358
1359    """End LE scan wrappers"""
1360    """Begin GATT Server wrappers"""
1361
1362    def do_gatts_close(self, line):
1363        """
1364        Description: Close active GATT server.
1365
1366        Usage:
1367          Examples:
1368            gatts_close
1369        """
1370        cmd = "Close active GATT server."
1371        try:
1372            result = self.pri_dut.gatts_lib.closeServer()
1373            self.log.info(result)
1374        except Exception as err:
1375            self.log.error(FAILURE.format(cmd, err))
1376
1377    def complete_gatts_setup_database(self, text, line, begidx, endidx):
1378        if not text:
1379            completions = list(
1380                gatt_test_database.GATT_SERVER_DB_MAPPING.keys())
1381        else:
1382            completions = [
1383                s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys()
1384                if s.startswith(text)
1385            ]
1386        return completions
1387
1388    def do_gatts_setup_database(self, line):
1389        """
1390        Description: Setup a Gatt server database based on pre-defined inputs.
1391            Supports Tab Autocomplete.
1392        Input(s):
1393            descriptor_db_name: The descriptor db name that matches one in
1394                acts.test_utils.bt.gatt_test_database
1395        Usage:
1396          Examples:
1397            gatts_setup_database LARGE_DB_1
1398        """
1399        cmd = "Setup GATT Server Database Based of pre-defined dictionaries"
1400        try:
1401            scan_results = self.pri_dut.gatts_lib.publishServer(
1402                gatt_test_database.GATT_SERVER_DB_MAPPING.get(line))
1403            self.log.info(scan_results)
1404        except Exception as err:
1405            self.log.error(FAILURE.format(cmd, err))
1406
1407    """End GATT Server wrappers"""
1408    """Begin Bluetooth Controller wrappers"""
1409
1410    def complete_btc_pair(self, text, line, begidx, endidx):
1411        """ Provides auto-complete for btc_pair cmd.
1412
1413        See Cmd module for full description.
1414        """
1415        arg_completion = len(line.split(" ")) - 1
1416        pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE']
1417        non_bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE']
1418        transport_options = ['BREDR', 'LE']
1419        if arg_completion == 1:
1420            if not text:
1421                completions = pairing_security_level_options
1422            else:
1423                completions = [
1424                    s for s in pairing_security_level_options
1425                    if s.startswith(text)
1426                ]
1427            return completions
1428        if arg_completion == 2:
1429            if not text:
1430                completions = non_bondable_options
1431            else:
1432                completions = [
1433                    s for s in non_bondable_options if s.startswith(text)
1434                ]
1435            return completions
1436        if arg_completion == 3:
1437            if not text:
1438                completions = transport_options
1439            else:
1440                completions = [
1441                    s for s in transport_options if s.startswith(text)
1442                ]
1443            return completions
1444
1445    def do_btc_pair(self, line):
1446        """
1447        Description: Sends an outgoing pairing request.
1448
1449        Input(s):
1450            pairing security level: ENCRYPTED, AUTHENTICATED, or NONE
1451            non_bondable: BONDABLE, NON_BONDABLE, or NONE
1452            transport: BREDR or LE
1453
1454        Usage:
1455          Examples:
1456            btc_pair NONE NONE BREDR
1457            btc_pair ENCRYPTED NONE LE
1458            btc_pair AUTHENTICATED NONE LE
1459            btc_pair NONE NON_BONDABLE BREDR
1460        """
1461        cmd = "Send an outgoing pairing request."
1462        pairing_security_level_mapping = {
1463            "ENCRYPTED": 1,
1464            "AUTHENTICATED": 2,
1465            "NONE": None,
1466        }
1467
1468        non_bondable_mapping = {
1469            "BONDABLE": False,  # Note: Reversed on purpose
1470            "NON_BONDABLE": True,  # Note: Reversed on purpose
1471            "NONE": None,
1472        }
1473
1474        transport_mapping = {
1475            "BREDR": 1,
1476            "LE": 2,
1477        }
1478
1479        try:
1480            options = line.split(" ")
1481            result = self.test_dut.init_pair(
1482                self.unique_mac_addr_id,
1483                pairing_security_level_mapping.get(options[0]),
1484                non_bondable_mapping.get(options[1]),
1485                transport_mapping.get(options[2]),
1486            )
1487            self.log.info(result)
1488        except Exception as err:
1489            self.log.error(FAILURE.format(cmd, err))
1490
1491    def complete_btc_set_io_capabilities(self, text, line, begidx, endidx):
1492        """ Provides auto-complete for btc_set_io_capabilities cmd.
1493
1494        See Cmd module for full description.
1495        """
1496        arg_completion = len(line.split(" ")) - 1
1497        input_options = ['NONE', 'CONFIRMATION', 'KEYBOARD']
1498        output_options = ['NONE', 'DISPLAY']
1499        if arg_completion == 1:
1500            if not text:
1501                completions = input_options
1502            else:
1503                completions = [s for s in input_options if s.startswith(text)]
1504            return completions
1505        if arg_completion == 2:
1506            if not text:
1507                completions = output_options
1508            else:
1509                completions = [s for s in output_options if s.startswith(text)]
1510            return completions
1511
1512    def do_btc_set_io_capabilities(self, line):
1513        """
1514        Description: Sets the IO capabilities during pairing
1515
1516        Input(s):
1517            input: String - The input I/O capabilities to use
1518                Available Values:
1519                NONE - Input capability type None
1520                CONFIRMATION - Input capability type confirmation
1521                KEYBOARD - Input capability type Keyboard
1522            output: String - The output I/O Capabilities to use
1523                Available Values:
1524                NONE - Output capability type None
1525                DISPLAY - output capability type Display
1526
1527        Usage:
1528          Examples:
1529            btc_set_io_capabilities NONE DISPLAY
1530            btc_set_io_capabilities NONE NONE
1531            btc_set_io_capabilities KEYBOARD DISPLAY
1532        """
1533        cmd = "Send an outgoing pairing request."
1534
1535        try:
1536            options = line.split(" ")
1537            result = self.pri_dut.btc_lib.setIOCapabilities(
1538                options[0], options[1])
1539            self.log.info(result)
1540        except Exception as err:
1541            self.log.error(FAILURE.format(cmd, err))
1542
1543    def do_btc_accept_pairing(self, line):
1544        """
1545        Description: Accept all incoming pairing requests.
1546
1547        Usage:
1548          Examples:
1549            btc_accept_pairing
1550        """
1551        cmd = "Accept incoming pairing requests"
1552        try:
1553            result = self.pri_dut.btc_lib.acceptPairing()
1554            self.log.info(result)
1555        except Exception as err:
1556            self.log.error(FAILURE.format(cmd, err))
1557
1558    def do_btc_forget_device(self, line):
1559        """
1560        Description: Forget pairing of the current device under test.
1561            Current device under test is the device found by
1562            tool_refresh_unique_id from custom user param. This function
1563            will also perform a clean disconnect if actively connected.
1564
1565        Usage:
1566          Examples:
1567            btc_forget_device
1568        """
1569        cmd = "For pairing of the current device under test."
1570        try:
1571            self.log.info("Forgetting device id: {}".format(
1572                self.unique_mac_addr_id))
1573            result = self.pri_dut.btc_lib.forgetDevice(self.unique_mac_addr_id)
1574            self.log.info(result)
1575        except Exception as err:
1576            self.log.error(FAILURE.format(cmd, err))
1577
1578    def do_btc_set_discoverable(self, discoverable):
1579        """
1580        Description: Change Bluetooth Controller discoverablility.
1581        Input(s):
1582            discoverable: true to set discoverable
1583                          false to set non-discoverable
1584        Usage:
1585          Examples:
1586            btc_set_discoverable true
1587            btc_set_discoverable false
1588        """
1589        cmd = "Change Bluetooth Controller discoverablility."
1590        try:
1591            result = self.test_dut.set_discoverable(
1592                self.str_to_bool(discoverable))
1593            self.log.info(result)
1594        except Exception as err:
1595            self.log.error(FAILURE.format(cmd, err))
1596
1597    def do_btc_set_name(self, name):
1598        """
1599        Description: Change Bluetooth Controller local name.
1600        Input(s):
1601            name: The name to set the Bluetooth Controller name to.
1602
1603        Usage:
1604          Examples:
1605            btc_set_name fs_test
1606        """
1607        cmd = "Change Bluetooth Controller local name."
1608        try:
1609            result = self.test_dut.set_bluetooth_local_name(name)
1610            self.log.info(result)
1611        except Exception as err:
1612            self.log.error(FAILURE.format(cmd, err))
1613
1614    def do_btc_request_discovery(self, discover):
1615        """
1616        Description: Change whether the Bluetooth Controller is in active.
1617            discovery or not.
1618        Input(s):
1619            discover: true to start discovery
1620                      false to end discovery
1621        Usage:
1622          Examples:
1623            btc_request_discovery true
1624            btc_request_discovery false
1625        """
1626        cmd = "Change whether the Bluetooth Controller is in active."
1627        try:
1628            result = self.pri_dut.btc_lib.requestDiscovery(
1629                self.str_to_bool(discover))
1630            self.log.info(result)
1631        except Exception as err:
1632            self.log.error(FAILURE.format(cmd, err))
1633
1634    def do_btc_get_known_remote_devices(self, line):
1635        """
1636        Description: Get a list of known devices.
1637
1638        Usage:
1639          Examples:
1640            btc_get_known_remote_devices
1641        """
1642        cmd = "Get a list of known devices."
1643        self.bt_control_devices = []
1644        try:
1645            device_list = self.pri_dut.btc_lib.getKnownRemoteDevices(
1646            )['result']
1647            for id_dict in device_list:
1648                device = device_list[id_dict]
1649                self.bt_control_devices.append(device)
1650                self.log.info("Device found {}".format(device))
1651
1652        except Exception as err:
1653            self.log.error(FAILURE.format(cmd, err))
1654
1655    def do_btc_forget_all_known_devices(self, line):
1656        """
1657        Description: Forget all known devices.
1658
1659        Usage:
1660          Examples:
1661            btc_forget_all_known_devices
1662        """
1663        cmd = "Forget all known devices."
1664        try:
1665            device_list = self.pri_dut.btc_lib.getKnownRemoteDevices(
1666            )['result']
1667            for device in device_list:
1668                d = device_list[device]
1669                if d['bonded'] or d['connected']:
1670                    self.log.info("Unbonding deivce: {}".format(d))
1671                    self.log.info(
1672                        self.pri_dut.btc_lib.forgetDevice(d['id'])['result'])
1673        except Exception as err:
1674            self.log.error(FAILURE.format(cmd, err))
1675
1676    def do_btc_connect_device(self, line):
1677        """
1678        Description: Connect to device under test.
1679            Device under test is specified by either user params
1680            or
1681                tool_set_target_device_name <name>
1682                do_tool_refresh_unique_id_using_bt_control
1683
1684        Usage:
1685          Examples:
1686            btc_connect_device
1687        """
1688        cmd = "Connect to device under test."
1689        try:
1690            result = self.pri_dut.btc_lib.connectDevice(
1691                self.unique_mac_addr_id)
1692            self.log.info(result)
1693        except Exception as err:
1694            self.log.error(FAILURE.format(cmd, err))
1695
1696    def complete_btc_connect_device_by_id(self, text, line, begidx, endidx):
1697        if not text:
1698            completions = list(self.bt_control_ids)[:]
1699        else:
1700            completions = [
1701                s for s in self.bt_control_ids if s.startswith(text)
1702            ]
1703        return completions
1704
1705    def do_btc_connect_device_by_id(self, device_id):
1706        """
1707        Description: Connect to device id based on pre-defined inputs.
1708            Supports Tab Autocomplete.
1709        Input(s):
1710            device_id: The device id to connect to.
1711
1712        Usage:
1713          Examples:
1714            btc_connect_device_by_id <device_id>
1715        """
1716        cmd = "Connect to device id based on pre-defined inputs."
1717        try:
1718            result = self.pri_dut.btc_lib.connectDevice(device_id)
1719            self.log.info(result)
1720        except Exception as err:
1721            self.log.error(FAILURE.format(cmd, err))
1722
1723    def complete_btc_connect_device_by_name(self, text, line, begidx, endidx):
1724        if not text:
1725            completions = list(self.bt_control_names)[:]
1726        else:
1727            completions = [
1728                s for s in self.bt_control_names if s.startswith(text)
1729            ]
1730        return completions
1731
1732    def do_btc_connect_device_by_name(self, device_name):
1733        """
1734        Description: Connect to device id based on pre-defined inputs.
1735            Supports Tab Autocomplete.
1736        Input(s):
1737            device_id: The device id to connect to.
1738
1739        Usage:
1740          Examples:
1741            btc_connect_device_by_name <device_id>
1742        """
1743        cmd = "Connect to device name based on pre-defined inputs."
1744        try:
1745            for device in self.bt_control_devices:
1746                if device_name is device['name']:
1747
1748                    result = self.pri_dut.btc_lib.connectDevice(device['id'])
1749                    self.log.info(result)
1750        except Exception as err:
1751            self.log.error(FAILURE.format(cmd, err))
1752
1753    def do_btc_disconnect_device(self, line):
1754        """
1755        Description: Disconnect to device under test.
1756            Device under test is specified by either user params
1757            or
1758                tool_set_target_device_name <name>
1759                do_tool_refresh_unique_id_using_bt_control
1760
1761        Usage:
1762          Examples:
1763            btc_disconnect_device
1764        """
1765        cmd = "Disconnect to device under test."
1766        try:
1767            result = self.pri_dut.btc_lib.disconnectDevice(
1768                self.unique_mac_addr_id)
1769            self.log.info(result)
1770        except Exception as err:
1771            self.log.error(FAILURE.format(cmd, err))
1772
1773    def do_btc_init_bluetooth_control(self, line):
1774        """
1775        Description: Initialize the Bluetooth Controller.
1776
1777        Usage:
1778          Examples:
1779            btc_init_bluetooth_control
1780        """
1781        cmd = "Initialize the Bluetooth Controller."
1782        try:
1783            result = self.test_dut.initialize_bluetooth_controller()
1784            self.log.info(result)
1785        except Exception as err:
1786            self.log.error(FAILURE.format(cmd, err))
1787
1788    def do_btc_get_local_address(self, line):
1789        """
1790        Description: Get the local BR/EDR address of the Bluetooth Controller.
1791
1792        Usage:
1793          Examples:
1794            btc_get_local_address
1795        """
1796        cmd = "Get the local BR/EDR address of the Bluetooth Controller."
1797        try:
1798            result = self.test_dut.get_local_bluetooth_address()
1799            self.log.info(result)
1800        except Exception as err:
1801            self.log.error(FAILURE.format(cmd, err))
1802
1803    def do_btc_input_pairing_pin(self, line):
1804        """
1805        Description: Sends a pairing pin to SL4F's Bluetooth Control's
1806        Pairing Delegate.
1807
1808        Usage:
1809          Examples:
1810            btc_input_pairing_pin 123456
1811        """
1812        cmd = "Input pairing pin to the Fuchsia device."
1813        try:
1814            result = self.pri_dut.btc_lib.inputPairingPin(line)['result']
1815            self.log.info(result)
1816        except Exception as err:
1817            self.log.error(FAILURE.format(cmd, err))
1818
1819    def do_btc_get_pairing_pin(self, line):
1820        """
1821        Description: Gets the pairing pin from SL4F's Bluetooth Control's
1822        Pairing Delegate.
1823
1824        Usage:
1825          Examples:
1826            btc_get_pairing_pin
1827        """
1828        cmd = "Get the pairing pin from the Fuchsia device."
1829        try:
1830            result = self.pri_dut.btc_lib.getPairingPin()['result']
1831            self.log.info(result)
1832        except Exception as err:
1833            self.log.error(FAILURE.format(cmd, err))
1834
1835    """End Bluetooth Control wrappers"""
1836    """Begin Profile Server wrappers"""
1837
1838    def do_sdp_pts_example(self, num_of_records):
1839        """
1840        Description: An example of how to setup a generic SDP record
1841            and SDP search capabilities. This example will pass a few
1842            SDP tests.
1843
1844        Input(s):
1845            num_of_records: The number of records to add.
1846
1847        Usage:
1848          Examples:
1849            sdp_pts_example 1
1850            sdp pts_example 10
1851        """
1852        cmd = "Setup SDP for PTS testing."
1853
1854        attributes = [
1855            bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'],
1856            bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'],
1857            bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'],
1858            bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'],
1859        ]
1860
1861        try:
1862            self.pri_dut.sdp_lib.addSearch(
1863                attributes, int(sig_uuid_constants['AudioSource'], 16))
1864            self.pri_dut.sdp_lib.addSearch(
1865                attributes, int(sig_uuid_constants['A/V_RemoteControl'], 16))
1866            self.pri_dut.sdp_lib.addSearch(attributes,
1867                                           int(sig_uuid_constants['PANU'], 16))
1868            self.pri_dut.sdp_lib.addSearch(
1869                attributes, int(sig_uuid_constants['SerialPort'], 16))
1870            self.pri_dut.sdp_lib.addSearch(
1871                attributes, int(sig_uuid_constants['DialupNetworking'], 16))
1872            self.pri_dut.sdp_lib.addSearch(
1873                attributes, int(sig_uuid_constants['OBEXObjectPush'], 16))
1874            self.pri_dut.sdp_lib.addSearch(
1875                attributes, int(sig_uuid_constants['OBEXFileTransfer'], 16))
1876            self.pri_dut.sdp_lib.addSearch(
1877                attributes, int(sig_uuid_constants['Headset'], 16))
1878            self.pri_dut.sdp_lib.addSearch(
1879                attributes, int(sig_uuid_constants['HandsfreeAudioGateway'],
1880                                16))
1881            self.pri_dut.sdp_lib.addSearch(
1882                attributes, int(sig_uuid_constants['Handsfree'], 16))
1883            self.pri_dut.sdp_lib.addSearch(
1884                attributes, int(sig_uuid_constants['SIM_Access'], 16))
1885            for i in range(int(num_of_records)):
1886                result = self.pri_dut.sdp_lib.addService(
1887                    sdp_pts_record_list[i])
1888                self.log.info(result)
1889        except Exception as err:
1890            self.log.error(FAILURE.format(cmd, err))
1891
1892    def do_sdp_cleanup(self, line):
1893        """
1894        Description: Cleanup any existing SDP records
1895
1896        Usage:
1897          Examples:
1898            sdp_cleanup
1899        """
1900        cmd = "Cleanup SDP objects."
1901        try:
1902            result = self.pri_dut.sdp_lib.cleanUp()
1903            self.log.info(result)
1904        except Exception as err:
1905            self.log.error(FAILURE.format(cmd, err))
1906
1907    def do_sdp_init(self, line):
1908        """
1909        Description: Init the profile proxy for setting up SDP records
1910
1911        Usage:
1912          Examples:
1913            sdp_init
1914        """
1915        cmd = "Initialize profile proxy objects for adding SDP records"
1916        try:
1917            result = self.pri_dut.sdp_lib.init()
1918            self.log.info(result)
1919        except Exception as err:
1920            self.log.error(FAILURE.format(cmd, err))
1921
1922    def do_sdp_connect_l2cap(self, line):
1923        """
1924        Description: Send an l2cap connection request over an input psm value.
1925
1926        Note: Must be already connected to a peer.
1927
1928        Input(s):
1929            psm: The int hex value to connect over. Available PSMs:
1930                SDP 0x0001  See Bluetooth Service Discovery Protocol (SDP)
1931                RFCOMM  0x0003  See RFCOMM with TS 07.10
1932                TCS-BIN 0x0005  See Bluetooth Telephony Control Specification /
1933                    TCS Binary
1934                TCS-BIN-CORDLESS    0x0007  See Bluetooth Telephony Control
1935                    Specification / TCS Binary
1936                BNEP    0x000F  See Bluetooth Network Encapsulation Protocol
1937                HID_Control 0x0011  See Human Interface Device
1938                HID_Interrupt   0x0013  See Human Interface Device
1939                UPnP    0x0015  See [ESDP]
1940                AVCTP   0x0017  See Audio/Video Control Transport Protocol
1941                AVDTP   0x0019  See Audio/Video Distribution Transport Protocol
1942                AVCTP_Browsing  0x001B  See Audio/Video Remote Control Profile
1943                UDI_C-Plane 0x001D  See the Unrestricted Digital Information
1944                    Profile [UDI]
1945                ATT 0x001F  See Bluetooth Core Specification​
1946                ​3DSP   0x0021​ ​​See 3D Synchronization Profile.
1947                ​LE_PSM_IPSP    ​0x0023 ​See Internet Protocol Support Profile
1948                    (IPSP)
1949                OTS 0x0025  See Object Transfer Service (OTS)
1950                EATT    0x0027  See Bluetooth Core Specification
1951            mode: String - The channel mode to connect to. Available values:
1952                Basic mode: BASIC
1953                Enhanced Retransmission mode: ERTM
1954
1955        Usage:
1956          Examples:
1957            sdp_connect_l2cap 0001 BASIC
1958            sdp_connect_l2cap 0019 ERTM
1959        """
1960        cmd = "Connect l2cap"
1961        try:
1962            info = line.split()
1963            result = self.pri_dut.sdp_lib.connectL2cap(self.unique_mac_addr_id,
1964                                                       int(info[0], 16),
1965                                                       info[1])
1966            self.log.info(result)
1967        except Exception as err:
1968            self.log.error(FAILURE.format(cmd, err))
1969
1970    """End Profile Server wrappers"""
1971    """Begin AVDTP wrappers"""
1972
1973    def complete_avdtp_init(self, text, line, begidx, endidx):
1974        roles = ["sink", "source"]
1975        if not text:
1976            completions = roles
1977        else:
1978            completions = [s for s in roles if s.startswith(text)]
1979        return completions
1980
1981    def do_avdtp_init(self, role):
1982        """
1983        Description: Init the AVDTP and A2DP service corresponding to the input
1984        role.
1985
1986        Input(s):
1987            role: The specified role. Either 'source' or 'sink'.
1988
1989        Usage:
1990          Examples:
1991            avdtp_init source
1992            avdtp_init sink
1993        """
1994        cmd = "Initialize AVDTP proxy"
1995        try:
1996            result = self.pri_dut.avdtp_lib.init(role)
1997            self.log.info(result)
1998        except Exception as err:
1999            self.log.error(FAILURE.format(cmd, err))
2000
2001    def do_avdtp_kill_a2dp_sink(self, line):
2002        """
2003        Description: Quickly kill any A2DP sink service currently running on the
2004        device.
2005
2006        Usage:
2007          Examples:
2008            avdtp_kill_a2dp_sink
2009        """
2010        cmd = "Killing A2DP sink"
2011        try:
2012            result = self.pri_dut.control_daemon("bt-a2dp-sink.cmx", "stop")
2013            self.log.info(result)
2014        except Exception as err:
2015            self.log.error(FAILURE.format(cmd, err))
2016
2017    def do_avdtp_kill_a2dp_source(self, line):
2018        """
2019        Description: Quickly kill any A2DP source service currently running on
2020        the device.
2021
2022        Usage:
2023          Examples:
2024            avdtp_kill_a2dp_source
2025        """
2026        cmd = "Killing A2DP source"
2027        try:
2028            result = self.pri_dut.control_daemon("bt-a2dp-source.cmx", "stop")
2029            self.log.info(result)
2030        except Exception as err:
2031            self.log.error(FAILURE.format(cmd, err))
2032
2033    def do_avdtp_get_connected_peers(self, line):
2034        """
2035        Description: Get the connected peers for the AVDTP service
2036
2037        Usage:
2038          Examples:
2039            avdtp_get_connected_peers
2040        """
2041        cmd = "AVDTP get connected peers"
2042        try:
2043            result = self.pri_dut.avdtp_lib.getConnectedPeers()
2044            self.log.info(result)
2045        except Exception as err:
2046            self.log.error(FAILURE.format(cmd, err))
2047
2048    def do_avdtp_set_configuration(self, peer_id):
2049        """
2050        Description: Send AVDTP command to connected peer: set configuration
2051
2052        Input(s):
2053            peer_id: The specified peer_id.
2054
2055        Usage:
2056          Examples:
2057            avdtp_set_configuration <peer_id>
2058        """
2059        cmd = "Send AVDTP set configuration to connected peer"
2060        try:
2061            result = self.pri_dut.avdtp_lib.setConfiguration(int(peer_id))
2062            self.log.info(result)
2063        except Exception as err:
2064            self.log.error(FAILURE.format(cmd, err))
2065
2066    def do_avdtp_get_configuration(self, peer_id):
2067        """
2068        Description: Send AVDTP command to connected peer: get configuration
2069
2070        Input(s):
2071            peer_id: The specified peer_id.
2072
2073        Usage:
2074          Examples:
2075            avdtp_get_configuration <peer_id>
2076        """
2077        cmd = "Send AVDTP get configuration to connected peer"
2078        try:
2079            result = self.pri_dut.avdtp_lib.getConfiguration(int(peer_id))
2080            self.log.info(result)
2081        except Exception as err:
2082            self.log.error(FAILURE.format(cmd, err))
2083
2084    def do_avdtp_get_capabilities(self, peer_id):
2085        """
2086        Description: Send AVDTP command to connected peer: get capabilities
2087
2088        Input(s):
2089            peer_id: The specified peer_id.
2090
2091        Usage:
2092          Examples:
2093            avdtp_get_capabilities <peer_id>
2094        """
2095        cmd = "Send AVDTP get capabilities to connected peer"
2096        try:
2097            result = self.pri_dut.avdtp_lib.getCapabilities(int(peer_id))
2098            self.log.info(result)
2099        except Exception as err:
2100            self.log.error(FAILURE.format(cmd, err))
2101
2102    def do_avdtp_get_all_capabilities(self, peer_id):
2103        """
2104        Description: Send AVDTP command to connected peer: get all capabilities
2105
2106        Input(s):
2107            peer_id: The specified peer_id.
2108
2109        Usage:
2110          Examples:
2111            avdtp_get_all_capabilities <peer_id>
2112        """
2113        cmd = "Send AVDTP get all capabilities to connected peer"
2114        try:
2115            result = self.pri_dut.avdtp_lib.getAllCapabilities(int(peer_id))
2116            self.log.info(result)
2117        except Exception as err:
2118            self.log.error(FAILURE.format(cmd, err))
2119
2120    def do_avdtp_reconfigure_stream(self, peer_id):
2121        """
2122        Description: Send AVDTP command to connected peer: reconfigure stream
2123
2124        Input(s):
2125            peer_id: The specified peer_id.
2126
2127        Usage:
2128          Examples:
2129            avdtp_reconfigure_stream <peer_id>
2130        """
2131        cmd = "Send AVDTP reconfigure stream to connected peer"
2132        try:
2133            result = self.pri_dut.avdtp_lib.reconfigureStream(int(peer_id))
2134            self.log.info(result)
2135        except Exception as err:
2136            self.log.error(FAILURE.format(cmd, err))
2137
2138    def do_avdtp_suspend_stream(self, peer_id):
2139        """
2140        Description: Send AVDTP command to connected peer: suspend stream
2141
2142        Input(s):
2143            peer_id: The specified peer_id.
2144
2145        Usage:
2146          Examples:
2147            avdtp_suspend_stream <peer_id>
2148        """
2149        cmd = "Send AVDTP suspend stream to connected peer"
2150        try:
2151            result = self.pri_dut.avdtp_lib.suspendStream(int(peer_id))
2152            self.log.info(result)
2153        except Exception as err:
2154            self.log.error(FAILURE.format(cmd, err))
2155
2156    def do_avdtp_suspend_reconfigure(self, peer_id):
2157        """
2158        Description: Send AVDTP command to connected peer: suspend reconfigure
2159
2160        Input(s):
2161            peer_id: The specified peer_id.
2162
2163        Usage:
2164          Examples:
2165            avdtp_suspend_reconfigure <peer_id>
2166        """
2167        cmd = "Send AVDTP suspend reconfigure to connected peer"
2168        try:
2169            result = self.pri_dut.avdtp_lib.suspendAndReconfigure(int(peer_id))
2170            self.log.info(result)
2171        except Exception as err:
2172            self.log.error(FAILURE.format(cmd, err))
2173
2174    def do_avdtp_release_stream(self, peer_id):
2175        """
2176        Description: Send AVDTP command to connected peer: release stream
2177
2178        Input(s):
2179            peer_id: The specified peer_id.
2180
2181        Usage:
2182          Examples:
2183            avdtp_release_stream <peer_id>
2184        """
2185        cmd = "Send AVDTP release stream to connected peer"
2186        try:
2187            result = self.pri_dut.avdtp_lib.releaseStream(int(peer_id))
2188            self.log.info(result)
2189        except Exception as err:
2190            self.log.error(FAILURE.format(cmd, err))
2191
2192    def do_avdtp_establish_stream(self, peer_id):
2193        """
2194        Description: Send AVDTP command to connected peer: establish stream
2195
2196        Input(s):
2197            peer_id: The specified peer_id.
2198
2199        Usage:
2200          Examples:
2201            avdtp_establish_stream <peer_id>
2202        """
2203        cmd = "Send AVDTP establish stream to connected peer"
2204        try:
2205            result = self.pri_dut.avdtp_lib.establishStream(int(peer_id))
2206            self.log.info(result)
2207        except Exception as err:
2208            self.log.error(FAILURE.format(cmd, err))
2209
2210    def do_avdtp_start_stream(self, peer_id):
2211        """
2212        Description: Send AVDTP command to connected peer: start stream
2213
2214        Input(s):
2215            peer_id: The specified peer_id.
2216
2217        Usage:
2218          Examples:
2219            avdtp_start_stream <peer_id>
2220        """
2221        cmd = "Send AVDTP start stream to connected peer"
2222        try:
2223            result = self.pri_dut.avdtp_lib.startStream(int(peer_id))
2224            self.log.info(result)
2225        except Exception as err:
2226            self.log.error(FAILURE.format(cmd, err))
2227
2228    def do_avdtp_abort_stream(self, peer_id):
2229        """
2230        Description: Send AVDTP command to connected peer: abort stream
2231
2232        Input(s):
2233            peer_id: The specified peer_id.
2234
2235        Usage:
2236          Examples:
2237            avdtp_abort_stream <peer_id>
2238        """
2239        cmd = "Send AVDTP abort stream to connected peer"
2240        try:
2241            result = self.pri_dut.avdtp_lib.abortStream(int(peer_id))
2242            self.log.info(result)
2243        except Exception as err:
2244            self.log.error(FAILURE.format(cmd, err))
2245
2246    def do_avdtp_remove_service(self, line):
2247        """
2248        Description: Removes the AVDTP service in use.
2249
2250        Usage:
2251          Examples:
2252            avdtp_establish_stream <peer_id>
2253        """
2254        cmd = "Remove AVDTP service"
2255        try:
2256            result = self.pri_dut.avdtp_lib.removeService()
2257            self.log.info(result)
2258        except Exception as err:
2259            self.log.error(FAILURE.format(cmd, err))
2260
2261    """End AVDTP wrappers"""
2262