1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script is for partial automation of LE devices
18
19This script requires these custom parameters in the config file:
20
21"ble_mac_address"
22"service_uuid"
23"notifiable_char_uuid"
24"""
25
26import pprint
27from queue import Empty
28import time
29
30from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
31from acts.test_utils.bt.bt_constants import ble_scan_settings_modes
32from acts.test_utils.bt.bt_constants import gatt_cb_err
33from acts.test_utils.bt.bt_constants import gatt_cb_strings
34from acts.test_utils.bt.bt_constants import gatt_descriptor
35from acts.test_utils.bt.bt_constants import gatt_transport
36from acts.test_utils.bt.bt_constants import scan_result
37from acts.test_utils.bt.bt_gatt_utils import GattTestUtilsError
38from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection
39from acts.test_utils.bt.bt_test_utils import generate_ble_scan_objects
40from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection
41from acts.test_utils.bt.bt_gatt_utils import log_gatt_server_uuids
42from acts.test_utils.bt.bt_test_utils import reset_bluetooth
43
44
45class GattToolTest(BluetoothBaseTest):
46    AUTOCONNECT = False
47    DEFAULT_TIMEOUT = 10
48    PAIRING_TIMEOUT = 20
49    adv_instances = []
50    timer_list = []
51
52    def setup_class(self):
53        super().setup_class()
54        # Central role Android device
55        self.cen_ad = self.android_devices[0]
56        self.ble_mac_address = self.user_params['ble_mac_address']
57        self.SERVICE_UUID = self.user_params['service_uuid']
58        self.NOTIFIABLE_CHAR_UUID = self.user_params['notifiable_char_uuid']
59        # CCC == Client Characteristic Configuration
60        self.CCC_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb"
61
62    def setup_test(self):
63        super(BluetoothBaseTest, self).setup_test()
64        if not self._is_peripheral_advertising():
65            input("Press enter when peripheral is advertising...")
66        return True
67
68    def teardown_test(self):
69        super(BluetoothBaseTest, self).teardown_test()
70        self.log_stats()
71        self.timer_list = []
72        return True
73
74    def _pair_with_peripheral(self):
75        self.cen_ad.droid.bluetoothDiscoverAndBond(self.ble_mac_address)
76        end_time = time.time() + self.PAIRING_TIMEOUT
77        self.log.info("Verifying devices are bonded")
78        while time.time() < end_time:
79            bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices()
80            if self.ble_mac_address in {d['address'] for d in bonded_devices}:
81                self.log.info("Successfully bonded to device")
82                return True
83        return False
84
85    def _is_peripheral_advertising(self):
86        self.cen_ad.droid.bleSetScanFilterDeviceAddress(self.ble_mac_address)
87        self.cen_ad.droid.bleSetScanSettingsScanMode(
88            ble_scan_settings_modes['low_latency'])
89        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
90            self.cen_ad.droid)
91        self.cen_ad.droid.bleBuildScanFilter(filter_list)
92
93        self.cen_ad.droid.bleStartBleScan(filter_list, scan_settings,
94                                          scan_callback)
95        expected_event_name = scan_result.format(scan_callback)
96        test_result = True
97        try:
98            self.cen_ad.ed.pop_event(expected_event_name, self.DEFAULT_TIMEOUT)
99            self.log.info(
100                "Peripheral found with event: {}".format(expected_event_name))
101        except Empty:
102            self.log.info("Peripheral not advertising or not found: {}".format(
103                self.ble_mac_address))
104            test_result = False
105        self.cen_ad.droid.bleStopBleScan(scan_callback)
106        return test_result
107
108    def _unbond_device(self):
109        self.cen_ad.droid.bluetoothUnbond(self.ble_mac_address)
110        time.sleep(2)  #Grace timeout for unbonding to finish
111        bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices()
112        if bonded_devices:
113            self.log.error(
114                "Failed to unbond device... found: {}".format(bonded_devices))
115            return False
116        return True
117
118    @BluetoothBaseTest.bt_test_wrap
119    def test_gatt_connect_without_scanning(self):
120        """Test the round trip speed of connecting to a peripheral
121
122        This test will prompt the user to press "Enter" when the
123        peripheral is in a connecable advertisement state. Once
124        the user presses enter, this script will measure the amount
125        of time it takes to establish a GATT connection to the
126        peripheral. The test will then disconnect
127
128        Steps:
129        1. Wait for user input to confirm peripheral is advertising.
130        2. Start timer
131        3. Perform GATT connection to peripheral
132        4. Upon successful connection, stop timer
133        5. Disconnect from peripheral
134
135        Expected Result:
136        Device should be connected successfully
137
138        Returns:
139          Pass if True
140          Fail if False
141
142        TAGS: LE, GATT
143        Priority: 1
144        """
145        self.AUTOCONNECT = False
146        start_time = self._get_time_in_milliseconds()
147        try:
148            bluetooth_gatt, gatt_callback = (setup_gatt_connection(
149                self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
150                gatt_transport['le']))
151        except GattTestUtilsError as err:
152            self.log.error(err)
153            return False
154        end_time = self._get_time_in_milliseconds()
155        self.log.info("Total time (ms): {}".format(end_time - start_time))
156        try:
157            disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
158                                       gatt_callback)
159            self.cen_ad.droid.gattClientClose(bluetooth_gatt)
160        except GattTestUtilsError as err:
161            self.log.error(err)
162            return False
163        self.cen_ad.droid.gattClientClose(bluetooth_gatt)
164
165    @BluetoothBaseTest.bt_test_wrap
166    def test_gatt_connect_stress(self):
167        """Test the round trip speed of connecting to a peripheral many times
168
169        This test will prompt the user to press "Enter" when the
170        peripheral is in a connecable advertisement state. Once
171        the user presses enter, this script will measure the amount
172        of time it takes to establish a GATT connection to the
173        peripheral. The test will then disconnect. It will attempt to
174        repeat this process multiple times.
175
176        Steps:
177        1. Wait for user input to confirm peripheral is advertising.
178        2. Start timer
179        3. Perform GATT connection to peripheral
180        4. Upon successful connection, stop timer
181        5. Disconnect from peripheral
182        6. Repeat steps 2-5 1000 times.
183
184        Expected Result:
185        Test should measure 1000 iterations of connect/disconnect cycles.
186
187        Returns:
188          Pass if True
189          Fail if False
190
191        TAGS: LE, GATT
192        Priority: 2
193        """
194        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
195            self.cen_ad.droid)
196        self.cen_ad.droid.bleStartBleScan(filter_list, scan_settings,
197                                          scan_callback)
198        self.AUTOCONNECT = False
199        iterations = 1000
200        n = 0
201        while n < iterations:
202            self.start_timer()
203            try:
204                bluetooth_gatt, gatt_callback = (setup_gatt_connection(
205                    self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
206                    gatt_transport['le']))
207            except GattTestUtilsError as err:
208                self.log.error(err)
209                return False
210            self.log.info("Total time (ms): {}".format(self.end_timer()))
211            try:
212                disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
213                                           gatt_callback)
214                self.cen_ad.droid.gattClientClose(bluetooth_gatt)
215            except GattTestUtilsError as err:
216                self.log.error(err)
217                return False
218            n += 1
219        return True
220
221    @BluetoothBaseTest.bt_test_wrap
222    def test_gatt_connect_iterate_uuids(self):
223        """Test the discovery of uuids of a peripheral
224
225        This test will prompt the user to press "Enter" when the
226        peripheral is in a connecable advertisement state. Once
227        the user presses enter, this script connects an Android device
228        to the periphal and attempt to discover all services,
229        characteristics, and descriptors.
230
231        Steps:
232        1. Wait for user input to confirm peripheral is advertising.
233        2. Perform GATT connection to peripheral
234        3. Upon successful connection, iterate through all services,
235        characteristics, and descriptors.
236        5. Disconnect from peripheral
237
238        Expected Result:
239        Device services, characteristics, and descriptors should all
240        be read.
241
242        Returns:
243          Pass if True
244          Fail if False
245
246        TAGS: LE, GATT
247        Priority: 2
248        """
249        try:
250            bluetooth_gatt, gatt_callback = (setup_gatt_connection(
251                self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
252                gatt_transport['le']))
253        except GattTestUtilsError as err:
254            self.log.error(err)
255            return False
256        if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt):
257            expected_event = gatt_cb_strings['gatt_serv_disc'].format(
258                gatt_callback)
259            try:
260                event = self.cen_ad.ed.pop_event(expected_event,
261                                                 self.DEFAULT_TIMEOUT)
262                discovered_services_index = event['data']['ServicesIndex']
263            except Empty:
264                self.log.error(
265                    gatt_cb_err['gatt_serv_disc'].format(expected_event))
266                return False
267            log_gatt_server_uuids(self.cen_ad, discovered_services_index)
268        try:
269            disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
270                                       gatt_callback)
271            self.cen_ad.droid.gattClientClose(bluetooth_gatt)
272        except GattTestUtilsError as err:
273            self.log.error(err)
274            return False
275        self.cen_ad.droid.gattClientClose(bluetooth_gatt)
276        return True
277
278    @BluetoothBaseTest.bt_test_wrap
279    def test_pairing(self):
280        """Test pairing to a GATT mac address
281
282        This test will prompt the user to press "Enter" when the
283        peripheral is in a connecable advertisement state. Once
284        the user presses enter, this script will bond the Android device
285        to the peripheral.
286
287        Steps:
288        1. Wait for user input to confirm peripheral is advertising.
289        2. Perform Bluetooth pairing to GATT mac address
290        3. Upon successful bonding.
291        4. Unbond from device
292
293        Expected Result:
294        Device services, characteristics, and descriptors should all
295        be read.
296
297        Returns:
298          Pass if True
299          Fail if False
300
301        TAGS: LE, GATT
302        Priority: 1
303        """
304        if not self._pair_with_peripheral():
305            return False
306        self.cen_ad.droid.bluetoothUnbond(self.ble_mac_address)
307        return self._unbond_device()
308
309    @BluetoothBaseTest.bt_test_wrap
310    def test_pairing_stress(self):
311        """Test the round trip speed of pairing to a peripheral many times
312
313        This test will prompt the user to press "Enter" when the
314        peripheral is in a connecable advertisement state. Once
315        the user presses enter, this script will measure the amount
316        of time it takes to establish a pairing with a BLE device.
317
318        Steps:
319        1. Wait for user input to confirm peripheral is advertising.
320        2. Start timer
321        3. Perform Bluetooth pairing to GATT mac address
322        4. Upon successful bonding, stop timer.
323        5. Unbond from device
324        6. Repeat steps 2-5 100 times.
325
326        Expected Result:
327        Test should measure 100 iterations of bonding.
328
329        Returns:
330          Pass if True
331          Fail if False
332
333        TAGS: LE, GATT
334        Priority: 3
335        """
336        iterations = 100
337        for _ in range(iterations):
338            start_time = self.start_timer()
339            if not self._pair_with_peripheral():
340                return False
341            self.log.info("Total time (ms): {}".format(self.end_timer()))
342            if not self._unbond_device():
343                return False
344        return True
345
346    @BluetoothBaseTest.bt_test_wrap
347    def test_gatt_notification_longev(self):
348        """Test GATT characterisitic notifications for long periods of time
349
350        This test will prompt the user to press "Enter" when the
351        peripheral is in a connecable advertisement state. Once
352        the user presses enter, this script aims to set characteristic
353        notification to true on the config file's SERVICE_UUID,
354        NOTIFIABLE_CHAR_UUID, and CCC_DESC_UUID. This test assumes
355        the peripheral will constantly write data to a notifiable
356        characteristic.
357
358        Steps:
359        1. Wait for user input to confirm peripheral is advertising.
360        2. Perform Bluetooth pairing to GATT mac address
361        3. Perform a GATT connection to the periheral
362        4. Get the discovered service uuid that matches the user's input
363        in the config file
364        4. Write to the CCC descriptor to enable notifications
365        5. Enable notifications on the user's input Characteristic UUID
366        6. Continuously wait for Characteristic Changed events which
367        equate to recieving notifications for 15 minutes.
368
369        Expected Result:
370        There should be no disconnects and we should constantly receive
371        Characteristic Changed information. Values should vary upon user
372        interaction with the peripheral.
373
374        Returns:
375          Pass if True
376          Fail if False
377
378        TAGS: LE, GATT
379        Priority: 1
380        """
381        #pair devices
382        if not self._pair_with_peripheral():
383            return False
384        try:
385            bluetooth_gatt, gatt_callback = (setup_gatt_connection(
386                self.cen_ad, self.ble_mac_address, self.AUTOCONNECT,
387                gatt_transport['le']))
388        except GattTestUtilsError as err:
389            self.log.error(err)
390            return False
391        if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt):
392            expected_event = gatt_cb_strings['gatt_serv_disc'].format(
393                gatt_callback)
394            try:
395                event = self.cen_ad.ed.pop_event(expected_event,
396                                                 self.DEFAULT_TIMEOUT)
397                discovered_services_index = event['data']['ServicesIndex']
398            except Empty:
399                self.log.error(
400                    gatt_cb_err['gatt_serv_disc'].format(expected_event))
401                return False
402        # TODO: in setup save service_cound and discovered_services_index
403        # programatically
404        services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount(
405            discovered_services_index)
406        test_service_index = None
407        for i in range(services_count):
408            disc_service_uuid = (
409                self.cen_ad.droid.gattClientGetDiscoveredServiceUuid(
410                    discovered_services_index, i))
411            if disc_service_uuid == self.SERVICE_UUID:
412                test_service_index = i
413                break
414        if not test_service_index:
415            self.log.error("Service not found.")
416            return False
417
418        self.cen_ad.droid.gattClientDescriptorSetValue(
419            bluetooth_gatt, discovered_services_index, test_service_index,
420            self.NOTIFIABLE_CHAR_UUID, self.CCC_DESC_UUID,
421            gatt_descriptor['enable_notification_value'])
422
423        self.cen_ad.droid.gattClientWriteDescriptor(
424            bluetooth_gatt, discovered_services_index, test_service_index,
425            self.NOTIFIABLE_CHAR_UUID, self.CCC_DESC_UUID)
426
427        self.cen_ad.droid.gattClientSetCharacteristicNotification(
428            bluetooth_gatt, discovered_services_index, test_service_index,
429            self.NOTIFIABLE_CHAR_UUID, True)
430
431        # set 15 minute notification test time
432        notification_test_time = 900
433        end_time = time.time() + notification_test_time
434        expected_event = gatt_cb_strings['char_change'].format(gatt_callback)
435        while time.time() < end_time:
436            try:
437                event = self.cen_ad.ed.pop_event(expected_event,
438                                                 self.DEFAULT_TIMEOUT)
439                self.log.info(event)
440            except Empty as err:
441                print(err)
442                self.log.error(
443                    gatt_cb_err['char_change_err'].format(expected_event))
444                return False
445        return True
446