1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This is base class for tests that exercises different GATT procedures between two connected devices.
18Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery.
19"""
20
21from queue import Empty
22
23from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
24from acts.test_utils.bt.bt_constants import gatt_characteristic
25from acts.test_utils.bt.bt_constants import gatt_descriptor
26from acts.test_utils.bt.bt_constants import gatt_service_types
27from acts.test_utils.bt.bt_constants import gatt_event
28from acts.test_utils.bt.bt_constants import gatt_cb_err
29from acts.test_utils.bt.bt_constants import gatt_cb_strings
30from acts.test_utils.bt.bt_constants import gatt_mtu_size
31from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection
32from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection
33from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics
34from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors
35from acts.test_utils.bt.bt_constants import gatt_char_desc_uuids
36from acts.test_utils.bt.bt_constants import bt_default_timeout
37
38
39class GattConnectedBaseTest(BluetoothBaseTest):
40
41    TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B"
42    READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8"
43    READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
44    WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
45    WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32"
46    NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77"
47
48    def setup_class(self):
49        super().setup_class()
50        self.cen_ad = self.android_devices[0]
51        self.per_ad = self.android_devices[1]
52
53    def setup_test(self):
54        super(GattConnectedBaseTest, self).setup_test()
55
56        self.gatt_server_callback, self.gatt_server = self._setup_multiple_services(
57        )
58        if not self.gatt_server_callback or not self.gatt_server:
59            raise AssertionError('Service setup failed')
60
61        self.bluetooth_gatt, self.gatt_callback, self.adv_callback = (
62            orchestrate_gatt_connection(self.cen_ad, self.per_ad))
63        self.per_ad.droid.bleStopBleAdvertising(self.adv_callback)
64
65        self.mtu = gatt_mtu_size['min']
66
67        if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt):
68            event = self._client_wait(gatt_event['gatt_serv_disc'])
69            self.discovered_services_index = event['data']['ServicesIndex']
70        services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount(
71            self.discovered_services_index)
72        self.test_service_index = None
73        for i in range(services_count):
74            disc_service_uuid = (
75                self.cen_ad.droid.gattClientGetDiscoveredServiceUuid(
76                    self.discovered_services_index, i).upper())
77            if disc_service_uuid == self.TEST_SERVICE_UUID:
78                self.test_service_index = i
79                break
80
81        if not self.test_service_index:
82            print("Service not found")
83            return False
84
85        connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices(
86            self.gatt_server)
87        if len(connected_device_list) == 0:
88            self.log.info("No devices connected from peripheral.")
89            return False
90
91        return True
92
93    def teardown_test(self):
94        self.per_ad.droid.gattServerClearServices(self.gatt_server)
95        self.per_ad.droid.gattServerClose(self.gatt_server)
96
97        del self.gatt_server_callback
98        del self.gatt_server
99
100        self._orchestrate_gatt_disconnection(self.bluetooth_gatt,
101                                             self.gatt_callback)
102
103        return super(GattConnectedBaseTest, self).teardown_test()
104
105    def _server_wait(self, gatt_event):
106        return self._timed_pop(gatt_event, self.per_ad,
107                               self.gatt_server_callback)
108
109    def _client_wait(self, gatt_event):
110        return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback)
111
112    def _timed_pop(self, gatt_event, droid, gatt_callback):
113        expected_event = gatt_event["evt"].format(gatt_callback)
114        try:
115            return droid.ed.pop_event(expected_event, bt_default_timeout)
116        except Empty as emp:
117            raise AssertionError(gatt_event["err"].format(expected_event))
118
119    def _setup_characteristics_and_descriptors(self, droid):
120        characteristic_input = [
121            {
122                'uuid': self.WRITABLE_CHAR_UUID,
123                'property': gatt_characteristic['property_write'] |
124                gatt_characteristic['property_write_no_response'],
125                'permission': gatt_characteristic['permission_write']
126            },
127            {
128                'uuid': self.READABLE_CHAR_UUID,
129                'property': gatt_characteristic['property_read'],
130                'permission': gatt_characteristic['permission_read']
131            },
132            {
133                'uuid': self.NOTIFIABLE_CHAR_UUID,
134                'property': gatt_characteristic['property_notify'] |
135                gatt_characteristic['property_indicate'],
136                'permission': gatt_characteristic['permission_read']
137            },
138        ]
139        descriptor_input = [{
140            'uuid': self.WRITABLE_DESC_UUID,
141            'property': gatt_descriptor['permission_read'] |
142            gatt_characteristic['permission_write'],
143        }, {
144            'uuid': self.READABLE_DESC_UUID,
145            'property': gatt_descriptor['permission_read'] |
146            gatt_descriptor['permission_write'],
147        }, {
148            'uuid': gatt_char_desc_uuids['client_char_cfg'],
149            'property': gatt_descriptor['permission_read'] |
150            gatt_descriptor['permission_write'],
151        }]
152        characteristic_list = setup_gatt_characteristics(droid,
153                                                         characteristic_input)
154        self.notifiable_char_index = characteristic_list[2]
155        descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
156        return characteristic_list, descriptor_list
157
158    def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback):
159        self.log.info("Disconnecting from peripheral device.")
160        try:
161            disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
162                                       gatt_callback)
163        except GattTestUtilsError as err:
164            log.error(err)
165            return False
166        self.cen_ad.droid.gattClientClose(bluetooth_gatt)
167        return True
168
169    def _find_service_added_event(self, gatt_server_callback, uuid):
170        expected_event = gatt_cb_strings['serv_added'].format(
171            gatt_server_callback)
172        try:
173            event = self.per_ad.ed.pop_event(expected_event,
174                                             bt_default_timeout)
175        except Empty:
176            self.log.error(gatt_cb_err['serv_added_err'].format(
177                expected_event))
178            return False
179        if event['data']['serviceUuid'].lower() != uuid.lower():
180            self.log.error("Uuid mismatch. Found: {}, Expected {}.".format(
181                event['data']['serviceUuid'], uuid))
182            return False
183        return True
184
185    def _setup_multiple_services(self):
186        gatt_server_callback = (
187            self.per_ad.droid.gattServerCreateGattServerCallback())
188        gatt_server = self.per_ad.droid.gattServerOpenGattServer(
189            gatt_server_callback)
190        characteristic_list, descriptor_list = (
191            self._setup_characteristics_and_descriptors(self.per_ad.droid))
192        self.per_ad.droid.gattServerCharacteristicAddDescriptor(
193            characteristic_list[0], descriptor_list[0])
194        self.per_ad.droid.gattServerCharacteristicAddDescriptor(
195            characteristic_list[1], descriptor_list[1])
196        self.per_ad.droid.gattServerCharacteristicAddDescriptor(
197            characteristic_list[2], descriptor_list[2])
198        gatt_service3 = self.per_ad.droid.gattServerCreateService(
199            self.TEST_SERVICE_UUID, gatt_service_types['primary'])
200        for characteristic in characteristic_list:
201            self.per_ad.droid.gattServerAddCharacteristicToService(
202                gatt_service3, characteristic)
203        self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3)
204        result = self._find_service_added_event(gatt_server_callback,
205                                                self.TEST_SERVICE_UUID)
206        if not result:
207            return False, False
208        return gatt_server_callback, gatt_server
209
210    def assertEqual(self, first, second, msg=None):
211        if not first == second:
212            if not msg:
213                raise AssertionError('%r != %r' % (first, second))
214            else:
215                raise AssertionError(msg + ' %r != %r' % (first, second))
216