1#!/usr/bin/env python3
2#
3# Copyright (C) 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script exercises set PHY and read PHY procedures.
18"""
19
20from queue import Empty
21
22from acts.test_decorators import test_tracker_info
23from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
24from acts.test_utils.bt.GattConnectedBaseTest import GattConnectedBaseTest
25from acts.test_utils.bt.bt_constants import gatt_connection_priority
26from acts.test_utils.bt.bt_constants import gatt_event
27from acts.test_utils.bt.bt_constants import gatt_phy
28from acts import signals
29
30CONNECTION_PRIORITY_HIGH = gatt_connection_priority['high']
31PHY_LE_1M = gatt_phy['1m']
32PHY_LE_2M = gatt_phy['2m']
33
34
35def lfmt(txPhy, rxPhy):
36    return '(' + list(gatt_phy.keys())[list(gatt_phy.values()).index(
37        txPhy)] + ', ' + list(gatt_phy.keys())[list(gatt_phy.values()).index(
38            rxPhy)] + ')'
39
40
41class PhyTest(GattConnectedBaseTest):
42    def setup_class(self):
43        super(PhyTest, self).setup_class()
44        if not self.cen_ad.droid.bluetoothIsLe2MPhySupported():
45            raise signals.TestAbortClass(
46                "Central device does not support LE 2M PHY")
47
48        if not self.per_ad.droid.bluetoothIsLe2MPhySupported():
49            raise signals.TestAbortClass(
50                "Peripheral device does not support LE 2M PHY")
51
52    # Some controllers auto-update PHY to 2M, and both client and server
53    # might receive PHY Update event right after connection, if the
54    # connection was established over 1M PHY. We will ignore this event, but
55    # must pop it from queue.
56    def pop_initial_phy_update(self):
57        try:
58            maybe_event = gatt_event['phy_update']['evt'].format(
59                self.gatt_callback)
60            self.cen_ad.ed.pop_event(maybe_event, 0)
61        except Empty:
62            pass
63
64        try:
65            maybe_event = gatt_event['serv_phy_update']['evt'].format(
66                self.gatt_server_callback)
67            self.per_ad.ed.pop_event(maybe_event, 0)
68        except Empty:
69            pass
70
71    # this helper method checks wether both client and server received PHY
72    # update event with proper txPhy and rxPhy
73    def ensure_both_updated_phy(self, clientTxPhy, clientRxPhy):
74        event = self._client_wait(gatt_event['phy_update'])
75        txPhy = event['data']['TxPhy']
76        rxPhy = event['data']['RxPhy']
77        self.log.info("\tClient PHY updated: " + lfmt(txPhy, rxPhy))
78        self.assertEqual(0, event['data']['Status'], "Status should be 0")
79        self.assertEqual(clientTxPhy, event['data']['TxPhy'])
80        self.assertEqual(clientRxPhy, event['data']['RxPhy'])
81
82        bt_device_id = 0
83        event = self._server_wait(gatt_event['serv_phy_update'])
84        txPhy = event['data']['TxPhy']
85        rxPhy = event['data']['RxPhy']
86        self.log.info("\tServer PHY updated: " + lfmt(txPhy, rxPhy))
87        self.assertEqual(0, event['data']['Status'], "Status should be 0")
88        self.assertEqual(clientRxPhy, event['data']['TxPhy'])
89        self.assertEqual(clientTxPhy, event['data']['RxPhy'])
90
91    # read the client phy, return (txPhy, rxPhy)
92    def read_client_phy(self):
93        self.cen_ad.droid.gattClientReadPhy(self.bluetooth_gatt)
94        event = self._client_wait(gatt_event['phy_read'])
95        self.assertEqual(0, event['data']['Status'], "Status should be 0")
96        return (event['data']['TxPhy'], event['data']['RxPhy'])
97
98    # read the server phy, return (txPhy, rxPhy)
99    def read_server_phy(self):
100        bt_device_id = 0
101        self.per_ad.droid.gattServerReadPhy(self.gatt_server, bt_device_id)
102        event = self._server_wait(gatt_event['serv_phy_read'])
103        self.assertEqual(0, event['data']['Status'], "Status should be 0")
104        return (event['data']['TxPhy'], event['data']['RxPhy'])
105
106    @BluetoothBaseTest.bt_test_wrap
107    @test_tracker_info(uuid='edb95ae1-97e5-4337-9a60-1e113aa43a4d')
108    def test_phy_read(self):
109        """Test LE read PHY.
110
111        Test LE read PHY.
112
113        Steps:
114        1. Central, Peripheral : read PHY, make sure values are same.
115        2. Central: update PHY.
116        3. Ensure both Central and Peripheral received PHY update event.
117        4. Central, Peripheral: read PHY, make sure values are same.
118
119        Expected Result:
120        Verify that read PHY works properly.
121
122        Returns:
123          Pass if True
124          Fail if False
125
126        TAGS: LE, PHY
127        Priority: 0
128        """
129        self.cen_ad.droid.gattClientRequestConnectionPriority(
130            self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH)
131        self.pop_initial_phy_update()
132
133        # read phy from client and server, make sure they're same
134        cTxPhy, cRxPhy = self.read_client_phy()
135        sTxPhy, sRxPhy = self.read_server_phy()
136        self.assertEqual(cTxPhy, sTxPhy)
137        self.assertEqual(cRxPhy, sRxPhy)
138
139        self.log.info("Initial connection PHY was: " + lfmt(cTxPhy, cRxPhy))
140
141        nextTxPhy = (cTxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
142        nextRxPhy = (cRxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
143
144        # try to update PHY from Client
145        self.log.info("Will try to set PHY to: " + lfmt(nextTxPhy, nextRxPhy))
146        self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
147                                                    nextTxPhy, nextRxPhy, 0)
148        self.ensure_both_updated_phy(nextTxPhy, nextRxPhy)
149
150        # read phy on client and server, make sure values are same and equal
151        # the newly set value
152        cTxPhy, cRxPhy = self.read_client_phy()
153        sTxPhy, sRxPhy = self.read_server_phy()
154        self.assertEqual(cTxPhy, sTxPhy)
155        self.assertEqual(cRxPhy, sRxPhy)
156
157        self.assertEqual(nextTxPhy, cTxPhy)
158        self.assertEqual(nextRxPhy, cRxPhy)
159        return True
160
161    @BluetoothBaseTest.bt_test_wrap
162    @test_tracker_info(uuid='6b66af0a-35eb-42af-acd5-9634684f275d')
163    def test_phy_change_20_times(self):
164        """Test PHY update.
165
166        Test LE PHY update.
167
168        Steps:
169        1. Central: read PHY.
170        2. Central: update PHY to 1M, 2M, 1M... 20 times, each time ensuring
171                    both client and server received PHY update event.
172
173        Expected Result:
174        Verify that read update PHY worked properly each time.
175
176        Returns:
177          Pass if True
178          Fail if False
179
180        TAGS: LE, PHY
181        Priority: 0
182        """
183        self.cen_ad.droid.gattClientRequestConnectionPriority(
184            self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH)
185        self.pop_initial_phy_update()
186
187        txPhyB, rxPhyB = self.read_client_phy()
188        txPhyA = (txPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
189        rxPhyA = (rxPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
190
191        self.log.info("Initial connection PHY was: " + lfmt(txPhyB, rxPhyB))
192
193        for i in range(20):
194            #swap values between iterations
195            txPhy = (i & 1) and txPhyB or txPhyA
196            rxPhy = (i & 1) and rxPhyB or rxPhyA
197
198            self.log.info("Will try to set PHY to: " + lfmt(txPhy, rxPhy))
199            self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
200                                                        txPhy, rxPhy, 0)
201            self.ensure_both_updated_phy(txPhy, rxPhy)
202        return True
203
204    @BluetoothBaseTest.bt_test_wrap
205    @test_tracker_info(uuid='13f28de4-07f4-458c-a3e5-3ba95318616f')
206    def test_phy_change_asym(self):
207        """Test PHY update with asymetric rx and tx PHY.
208
209        Test PHY update with asymetric rx and tx PHY.
210
211        Steps:
212        1. Central: read PHY.
213        2. Central: update PHY to tx: 1M, rx: 2M, ensure both devices received
214                    the asymetric update.
215        3. Central: update PHY to tx: 2M, rx: 1M, ensure both devices received
216                    the asymetric update.
217
218        Expected Result:
219        Verify that read update PHY worked properly each time.
220
221        Returns:
222          Pass if True
223          Fail if False
224
225        TAGS: LE, PHY
226        Priority: 0
227        """
228        self.cen_ad.droid.gattClientRequestConnectionPriority(
229            self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH)
230        self.pop_initial_phy_update()
231
232        txPhy, rxPhy = self.read_client_phy()
233
234        self.log.info("Initial connection PHY was: " + lfmt(txPhy, rxPhy))
235        self.log.info("will try to set PHY to: PHY_LE_1M, PHY_LE_2M")
236
237        #try to update PHY to tx 1M, rx 2M from Client
238        self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
239                                                    PHY_LE_1M, PHY_LE_2M, 0)
240        self.ensure_both_updated_phy(PHY_LE_1M, PHY_LE_2M)
241
242        #try to update PHY to TX 2M, RX 1M from Client
243        self.log.info("will try to set PHY to: PHY_LE_2M, PHY_LE_1M")
244        self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
245                                                    PHY_LE_2M, PHY_LE_1M, 0)
246        self.ensure_both_updated_phy(PHY_LE_2M, PHY_LE_1M)
247
248        return True
249