1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import timedelta
18import logging
19
20from cert.captures import HalCaptures, HciCaptures
21from cert.gd_base_test import GdBaseTestClass
22from cert.matchers import HciMatchers
23from cert.py_hal import PyHal
24from cert.py_hci import PyHci
25from cert.truth import assertThat
26from hci.facade import facade_pb2 as hci_facade
27from bluetooth_packets_python3 import hci_packets
28
29
30class DirectHciTest(GdBaseTestClass):
31
32    def setup_class(self):
33        super().setup_class(dut_module='HCI', cert_module='HAL')
34
35    def setup_test(self):
36        super().setup_test()
37        self.dut_hci = PyHci(self.dut, acl_streaming=True)
38        self.cert_hal = PyHal(self.cert)
39        self.cert_hal.send_hci_command(hci_packets.ResetBuilder().Serialize())
40
41    def teardown_test(self):
42        self.dut_hci.close()
43        self.cert_hal.close()
44        super().teardown_test()
45
46    def send_hal_hci_command(self, command):
47        self.cert_hal.send_hci_command(bytes(command.Serialize()))
48
49    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
50        acl_msg = hci_facade.AclMsg(
51            handle=int(handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=acl)
52        self.dut.hci.SendAclData(acl_msg)
53
54    def send_hal_acl_data(self, handle, pb_flag, b_flag, acl):
55        lower = handle & 0xff
56        upper = (handle >> 8) & 0xf
57        upper = upper | int(pb_flag) & 0x3
58        upper = upper | ((int(b_flag) & 0x3) << 2)
59        lower_length = len(acl) & 0xff
60        upper_length = (len(acl) & 0xff00) >> 8
61        concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl))
62        self.cert_hal.send_acl(concatenated)
63
64    def test_local_hci_cmd_and_event(self):
65        # Loopback mode responds with ACL and SCO connection complete
66        self.dut_hci.register_for_events(hci_packets.EventCode.LOOPBACK_COMMAND)
67
68        self.dut_hci.send_command_with_complete(
69            hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL))
70
71        cmd2loop = hci_packets.ReadLocalNameBuilder()
72        self.dut_hci.send_command_with_complete(cmd2loop)
73
74        looped_bytes = bytes(cmd2loop.Serialize())
75        assertThat(self.dut_hci.get_event_stream()).emits(lambda packet: looped_bytes in packet.event)
76
77    def test_inquiry_from_dut(self):
78        self.dut_hci.register_for_events(hci_packets.EventCode.INQUIRY_RESULT)
79
80        self.send_hal_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
81        lap = hci_packets.Lap()
82        lap.lap = 0x33
83        self.dut_hci.send_command_with_status(hci_packets.InquiryBuilder(lap, 0x30, 0xff))
84        assertThat(self.dut_hci.get_event_stream()).emits(
85            HciMatchers.EventWithCode(hci_packets.EventCode.INQUIRY_RESULT))
86
87    def test_le_ad_scan_cert_advertises(self):
88        self.dut_hci.register_for_le_events(hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT)
89        self.dut_hci.register_for_le_events(hci_packets.SubeventCode.ADVERTISING_REPORT)
90
91        # DUT Scans
92        self.dut_hci.send_command_with_complete(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
93        phy_scan_params = hci_packets.PhyScanParameters()
94        phy_scan_params.le_scan_interval = 6553
95        phy_scan_params.le_scan_window = 6553
96        phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE
97
98        self.dut_hci.send_command_with_complete(
99            hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
100                                                           hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
101                                                           [phy_scan_params]))
102        self.dut_hci.send_command_with_complete(
103            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
104                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
105
106        # CERT Advertises
107        advertising_handle = 0
108        self.send_hal_hci_command(
109            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
110                advertising_handle,
111                hci_packets.LegacyAdvertisingProperties.ADV_IND,
112                512,
113                768,
114                7,
115                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
116                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
117                'A6:A5:A4:A3:A2:A1',
118                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
119                0xF7,
120                1,  # SID
121                hci_packets.Enable.DISABLED  # Scan request notification
122            ))
123
124        self.send_hal_hci_command(
125            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
126        gap_name = hci_packets.GapData()
127        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
128        gap_name.data = list(bytes(b'Im_A_Cert'))
129
130        self.send_hal_hci_command(
131            hci_packets.LeSetExtendedAdvertisingDataBuilder(
132                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
133                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
134
135        gap_short_name = hci_packets.GapData()
136        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
137        gap_short_name.data = list(bytes(b'Im_A_C'))
138
139        self.send_hal_hci_command(
140            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
141                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
142                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
143
144        enabled_set = hci_packets.EnabledSet()
145        enabled_set.advertising_handle = 0
146        enabled_set.duration = 0
147        enabled_set.max_extended_advertising_events = 0
148        self.send_hal_hci_command(
149            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
150
151        assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.event)
152
153        self.send_hal_hci_command(
154            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set]))
155        self.dut_hci.send_command_with_complete(
156            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED,
157                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
158
159    def _verify_le_connection_complete(self):
160        cert_conn_complete_capture = HalCaptures.LeConnectionCompleteCapture()
161        assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_conn_complete_capture)
162        cert_handle = cert_conn_complete_capture.get().GetConnectionHandle()
163
164        dut_conn_complete_capture = HciCaptures.LeConnectionCompleteCapture()
165        assertThat(self.dut_hci.get_le_event_stream()).emits(dut_conn_complete_capture)
166        dut_handle = dut_conn_complete_capture.get().GetConnectionHandle()
167
168        return (dut_handle, cert_handle)
169
170    @staticmethod
171    def _create_phy_scan_params():
172        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
173        phy_scan_params.scan_interval = 0x60
174        phy_scan_params.scan_window = 0x30
175        phy_scan_params.conn_interval_min = 0x18
176        phy_scan_params.conn_interval_max = 0x28
177        phy_scan_params.conn_latency = 0
178        phy_scan_params.supervision_timeout = 0x1f4
179        phy_scan_params.min_ce_length = 0
180        phy_scan_params.max_ce_length = 0
181        return phy_scan_params
182
183    def test_le_connection_dut_advertises(self):
184        self.dut_hci.register_for_le_events(hci_packets.SubeventCode.CONNECTION_COMPLETE)
185        # Cert Connects
186        self.send_hal_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
187        phy_scan_params = DirectHciTest._create_phy_scan_params()
188        self.send_hal_hci_command(
189            hci_packets.LeExtendedCreateConnectionBuilder(
190                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
191                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params]))
192
193        # DUT Advertises
194        advertising_handle = 0
195        self.dut_hci.send_command_with_complete(
196            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
197                advertising_handle,
198                hci_packets.LegacyAdvertisingProperties.ADV_IND,
199                400,
200                450,
201                7,
202                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
203                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
204                '00:00:00:00:00:00',
205                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
206                0xF8,
207                1,  #SID
208                hci_packets.Enable.DISABLED  # Scan request notification
209            ))
210
211        self.dut_hci.send_command_with_complete(
212            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))
213
214        gap_name = hci_packets.GapData()
215        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
216        gap_name.data = list(bytes(b'Im_The_DUT!'))  # TODO: Fix and remove !
217
218        self.dut_hci.send_command_with_complete(
219            hci_packets.LeSetExtendedAdvertisingDataBuilder(
220                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
221                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
222
223        gap_short_name = hci_packets.GapData()
224        gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
225        gap_short_name.data = list(bytes(b'Im_The_D'))
226
227        self.dut_hci.send_command_with_complete(
228            hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
229                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
230                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
231
232        enabled_set = hci_packets.EnabledSet()
233        enabled_set.advertising_handle = advertising_handle
234        enabled_set.duration = 0
235        enabled_set.max_extended_advertising_events = 0
236        self.dut_hci.send_command_with_complete(
237            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
238
239        # Check for success of Enable
240        assertThat(self.dut_hci.get_event_stream()).emits(
241            HciMatchers.CommandComplete(hci_packets.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
242
243        (dut_handle, cert_handle) = self._verify_le_connection_complete()
244
245        # Send ACL Data
246        self.enqueue_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
247                              hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData'))
248        self.send_hal_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
249                               hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData'))
250
251        assertThat(self.cert_hal.get_acl_stream()).emits(
252            lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload)
253        assertThat(self.dut_hci.get_raw_acl_stream()).emits(
254            lambda packet: logging.debug(packet.data) or b'SomeMoreAclData' in packet.data)
255
256    def test_le_connect_list_connection_cert_advertises(self):
257        self.dut_hci.register_for_le_events(hci_packets.SubeventCode.CONNECTION_COMPLETE)
258        # DUT Connects
259        self.dut_hci.send_command_with_complete(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
260        self.dut_hci.send_command_with_complete(
261            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01'))
262        phy_scan_params = DirectHciTest._create_phy_scan_params()
263        self.dut_hci.send_command_with_status(
264            hci_packets.LeExtendedCreateConnectionBuilder(
265                hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
266                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))
267
268        # CERT Advertises
269        advertising_handle = 1
270        self.send_hal_hci_command(
271            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
272                advertising_handle,
273                hci_packets.LegacyAdvertisingProperties.ADV_IND,
274                512,
275                768,
276                7,
277                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
278                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
279                'A6:A5:A4:A3:A2:A1',
280                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
281                0x7F,
282                0,  # SID
283                hci_packets.Enable.DISABLED  # Scan request notification
284            ))
285
286        self.send_hal_hci_command(
287            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
288
289        gap_name = hci_packets.GapData()
290        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
291        gap_name.data = list(bytes(b'Im_A_Cert'))
292
293        self.send_hal_hci_command(
294            hci_packets.LeSetExtendedAdvertisingDataBuilder(
295                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
296                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
297        enabled_set = hci_packets.EnabledSet()
298        enabled_set.advertising_handle = 1
299        enabled_set.duration = 0
300        enabled_set.max_extended_advertising_events = 0
301        self.send_hal_hci_command(
302            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
303
304        # LeConnectionComplete
305        self._verify_le_connection_complete()
306
307    def _verify_connection_complete(self):
308        cert_connection_complete_capture = HalCaptures.ConnectionCompleteCapture()
309        assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_connection_complete_capture)
310        cert_handle = cert_connection_complete_capture.get().GetConnectionHandle()
311
312        dut_connection_complete_capture = HciCaptures.ConnectionCompleteCapture()
313        assertThat(self.dut_hci.get_event_stream()).emits(dut_connection_complete_capture)
314        dut_handle = dut_connection_complete_capture.get().GetConnectionHandle()
315
316        return (dut_handle, cert_handle)
317
318    def test_connection_dut_connects(self):
319        self.dut_hci.send_command_with_complete(hci_packets.WritePageTimeoutBuilder(0x4000))
320
321        # CERT Enables scans and gets its address
322        self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder())
323
324        cert_read_bd_addr_capture = HalCaptures.ReadBdAddrCompleteCapture()
325        assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_read_bd_addr_capture)
326        address = cert_read_bd_addr_capture.get().GetBdAddr()
327
328        self.send_hal_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
329
330        # DUT Connects
331        self.dut_hci.send_command_with_status(
332            hci_packets.CreateConnectionBuilder(
333                address,
334                0xcc18,  # Packet Type
335                hci_packets.PageScanRepetitionMode.R0,
336                0,
337                hci_packets.ClockOffsetValid.INVALID,
338                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
339
340        # Cert Accepts
341        connect_request_capture = HalCaptures.ConnectionRequestCapture()
342        assertThat(self.cert_hal.get_hci_event_stream()).emits(connect_request_capture, timeout=timedelta(seconds=20))
343        connection_request = connect_request_capture.get()
344        self.send_hal_hci_command(
345            hci_packets.AcceptConnectionRequestBuilder(connection_request.GetBdAddr(),
346                                                       hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
347
348        (dut_handle, cert_handle) = self._verify_connection_complete()
349
350        # Send ACL Data
351        self.enqueue_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
352                              hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData'))
353        self.send_hal_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
354                               hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData'))
355
356        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
357        assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data)
358
359    def test_connection_cert_connects(self):
360        self.send_hal_hci_command(hci_packets.WritePageTimeoutBuilder(0x4000))
361
362        # DUT Enables scans and gets its address
363        self.dut_hci.send_command_with_complete(
364            hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
365        self.dut_hci.send_command_with_complete(hci_packets.ReadBdAddrBuilder())
366
367        read_bd_addr_capture = HciCaptures.ReadBdAddrCompleteCapture()
368        assertThat(self.dut_hci.get_event_stream()).emits(read_bd_addr_capture)
369        address = read_bd_addr_capture.get().GetBdAddr()
370
371        # Cert Connects
372        self.send_hal_hci_command(
373            hci_packets.CreateConnectionBuilder(
374                address,
375                0xcc18,  # Packet Type
376                hci_packets.PageScanRepetitionMode.R0,
377                0,
378                hci_packets.ClockOffsetValid.INVALID,
379                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
380
381        # DUT Accepts
382        connection_request_capture = HciCaptures.ConnectionRequestCapture()
383        assertThat(self.dut_hci.get_event_stream()).emits(connection_request_capture, timeout=timedelta(seconds=20))
384        connection_request = connection_request_capture.get()
385        self.dut_hci.send_command_with_status(
386            hci_packets.AcceptConnectionRequestBuilder(connection_request.GetBdAddr(),
387                                                       hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
388
389        (dut_handle, cert_handle) = self._verify_connection_complete()
390
391        # Send ACL Data
392        self.enqueue_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
393                              hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData'))
394        self.send_hal_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
395                               hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData'))
396
397        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
398        assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data)
399