1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import timedelta 18import logging 19 20from cert.captures import HalCaptures, HciCaptures 21from cert.gd_base_test import GdBaseTestClass 22from cert.matchers import HciMatchers 23from cert.py_hal import PyHal 24from cert.py_hci import PyHci 25from cert.truth import assertThat 26from hci.facade import facade_pb2 as hci_facade 27from bluetooth_packets_python3 import hci_packets 28 29 30class DirectHciTest(GdBaseTestClass): 31 32 def setup_class(self): 33 super().setup_class(dut_module='HCI', cert_module='HAL') 34 35 def setup_test(self): 36 super().setup_test() 37 self.dut_hci = PyHci(self.dut, acl_streaming=True) 38 self.cert_hal = PyHal(self.cert) 39 self.cert_hal.send_hci_command(hci_packets.ResetBuilder().Serialize()) 40 41 def teardown_test(self): 42 self.dut_hci.close() 43 self.cert_hal.close() 44 super().teardown_test() 45 46 def send_hal_hci_command(self, command): 47 self.cert_hal.send_hci_command(bytes(command.Serialize())) 48 49 def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): 50 acl_msg = hci_facade.AclMsg( 51 handle=int(handle), packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=acl) 52 self.dut.hci.SendAclData(acl_msg) 53 54 def send_hal_acl_data(self, handle, pb_flag, b_flag, acl): 55 lower = handle & 0xff 56 upper = (handle >> 8) & 0xf 57 upper = upper | int(pb_flag) & 0x3 58 upper = upper | ((int(b_flag) & 0x3) << 2) 59 lower_length = len(acl) & 0xff 60 upper_length = (len(acl) & 0xff00) >> 8 61 concatenated = bytes([lower, upper, lower_length, upper_length] + list(acl)) 62 self.cert_hal.send_acl(concatenated) 63 64 def test_local_hci_cmd_and_event(self): 65 # Loopback mode responds with ACL and SCO connection complete 66 self.dut_hci.register_for_events(hci_packets.EventCode.LOOPBACK_COMMAND) 67 68 self.dut_hci.send_command_with_complete( 69 hci_packets.WriteLoopbackModeBuilder(hci_packets.LoopbackMode.ENABLE_LOCAL)) 70 71 cmd2loop = hci_packets.ReadLocalNameBuilder() 72 self.dut_hci.send_command_with_complete(cmd2loop) 73 74 looped_bytes = bytes(cmd2loop.Serialize()) 75 assertThat(self.dut_hci.get_event_stream()).emits(lambda packet: looped_bytes in packet.event) 76 77 def test_inquiry_from_dut(self): 78 self.dut_hci.register_for_events(hci_packets.EventCode.INQUIRY_RESULT) 79 80 self.send_hal_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 81 lap = hci_packets.Lap() 82 lap.lap = 0x33 83 self.dut_hci.send_command_with_status(hci_packets.InquiryBuilder(lap, 0x30, 0xff)) 84 assertThat(self.dut_hci.get_event_stream()).emits( 85 HciMatchers.EventWithCode(hci_packets.EventCode.INQUIRY_RESULT)) 86 87 def test_le_ad_scan_cert_advertises(self): 88 self.dut_hci.register_for_le_events(hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT) 89 self.dut_hci.register_for_le_events(hci_packets.SubeventCode.ADVERTISING_REPORT) 90 91 # DUT Scans 92 self.dut_hci.send_command_with_complete(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) 93 phy_scan_params = hci_packets.PhyScanParameters() 94 phy_scan_params.le_scan_interval = 6553 95 phy_scan_params.le_scan_window = 6553 96 phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE 97 98 self.dut_hci.send_command_with_complete( 99 hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 100 hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, 101 [phy_scan_params])) 102 self.dut_hci.send_command_with_complete( 103 hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, 104 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 105 106 # CERT Advertises 107 advertising_handle = 0 108 self.send_hal_hci_command( 109 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 110 advertising_handle, 111 hci_packets.LegacyAdvertisingProperties.ADV_IND, 112 512, 113 768, 114 7, 115 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 116 hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 117 'A6:A5:A4:A3:A2:A1', 118 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 119 0xF7, 120 1, # SID 121 hci_packets.Enable.DISABLED # Scan request notification 122 )) 123 124 self.send_hal_hci_command( 125 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) 126 gap_name = hci_packets.GapData() 127 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 128 gap_name.data = list(bytes(b'Im_A_Cert')) 129 130 self.send_hal_hci_command( 131 hci_packets.LeSetExtendedAdvertisingDataBuilder( 132 advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, 133 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) 134 135 gap_short_name = hci_packets.GapData() 136 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 137 gap_short_name.data = list(bytes(b'Im_A_C')) 138 139 self.send_hal_hci_command( 140 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 141 advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, 142 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) 143 144 enabled_set = hci_packets.EnabledSet() 145 enabled_set.advertising_handle = 0 146 enabled_set.duration = 0 147 enabled_set.max_extended_advertising_events = 0 148 self.send_hal_hci_command( 149 hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) 150 151 assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.event) 152 153 self.send_hal_hci_command( 154 hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set])) 155 self.dut_hci.send_command_with_complete( 156 hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED, 157 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 158 159 def _verify_le_connection_complete(self): 160 cert_conn_complete_capture = HalCaptures.LeConnectionCompleteCapture() 161 assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_conn_complete_capture) 162 cert_handle = cert_conn_complete_capture.get().GetConnectionHandle() 163 164 dut_conn_complete_capture = HciCaptures.LeConnectionCompleteCapture() 165 assertThat(self.dut_hci.get_le_event_stream()).emits(dut_conn_complete_capture) 166 dut_handle = dut_conn_complete_capture.get().GetConnectionHandle() 167 168 return (dut_handle, cert_handle) 169 170 @staticmethod 171 def _create_phy_scan_params(): 172 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 173 phy_scan_params.scan_interval = 0x60 174 phy_scan_params.scan_window = 0x30 175 phy_scan_params.conn_interval_min = 0x18 176 phy_scan_params.conn_interval_max = 0x28 177 phy_scan_params.conn_latency = 0 178 phy_scan_params.supervision_timeout = 0x1f4 179 phy_scan_params.min_ce_length = 0 180 phy_scan_params.max_ce_length = 0 181 return phy_scan_params 182 183 def test_le_connection_dut_advertises(self): 184 self.dut_hci.register_for_le_events(hci_packets.SubeventCode.CONNECTION_COMPLETE) 185 # Cert Connects 186 self.send_hal_hci_command(hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) 187 phy_scan_params = DirectHciTest._create_phy_scan_params() 188 self.send_hal_hci_command( 189 hci_packets.LeExtendedCreateConnectionBuilder( 190 hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 191 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, '0D:05:04:03:02:01', 1, [phy_scan_params])) 192 193 # DUT Advertises 194 advertising_handle = 0 195 self.dut_hci.send_command_with_complete( 196 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 197 advertising_handle, 198 hci_packets.LegacyAdvertisingProperties.ADV_IND, 199 400, 200 450, 201 7, 202 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 203 hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 204 '00:00:00:00:00:00', 205 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 206 0xF8, 207 1, #SID 208 hci_packets.Enable.DISABLED # Scan request notification 209 )) 210 211 self.dut_hci.send_command_with_complete( 212 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01')) 213 214 gap_name = hci_packets.GapData() 215 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 216 gap_name.data = list(bytes(b'Im_The_DUT!')) # TODO: Fix and remove ! 217 218 self.dut_hci.send_command_with_complete( 219 hci_packets.LeSetExtendedAdvertisingDataBuilder( 220 advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, 221 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) 222 223 gap_short_name = hci_packets.GapData() 224 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 225 gap_short_name.data = list(bytes(b'Im_The_D')) 226 227 self.dut_hci.send_command_with_complete( 228 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 229 advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, 230 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name])) 231 232 enabled_set = hci_packets.EnabledSet() 233 enabled_set.advertising_handle = advertising_handle 234 enabled_set.duration = 0 235 enabled_set.max_extended_advertising_events = 0 236 self.dut_hci.send_command_with_complete( 237 hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) 238 239 # Check for success of Enable 240 assertThat(self.dut_hci.get_event_stream()).emits( 241 HciMatchers.CommandComplete(hci_packets.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) 242 243 (dut_handle, cert_handle) = self._verify_le_connection_complete() 244 245 # Send ACL Data 246 self.enqueue_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 247 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData')) 248 self.send_hal_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 249 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData')) 250 251 assertThat(self.cert_hal.get_acl_stream()).emits( 252 lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload) 253 assertThat(self.dut_hci.get_raw_acl_stream()).emits( 254 lambda packet: logging.debug(packet.data) or b'SomeMoreAclData' in packet.data) 255 256 def test_le_connect_list_connection_cert_advertises(self): 257 self.dut_hci.register_for_le_events(hci_packets.SubeventCode.CONNECTION_COMPLETE) 258 # DUT Connects 259 self.dut_hci.send_command_with_complete(hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01')) 260 self.dut_hci.send_command_with_complete( 261 hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, '0C:05:04:03:02:01')) 262 phy_scan_params = DirectHciTest._create_phy_scan_params() 263 self.dut_hci.send_command_with_status( 264 hci_packets.LeExtendedCreateConnectionBuilder( 265 hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 266 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params])) 267 268 # CERT Advertises 269 advertising_handle = 1 270 self.send_hal_hci_command( 271 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 272 advertising_handle, 273 hci_packets.LegacyAdvertisingProperties.ADV_IND, 274 512, 275 768, 276 7, 277 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 278 hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 279 'A6:A5:A4:A3:A2:A1', 280 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 281 0x7F, 282 0, # SID 283 hci_packets.Enable.DISABLED # Scan request notification 284 )) 285 286 self.send_hal_hci_command( 287 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01')) 288 289 gap_name = hci_packets.GapData() 290 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 291 gap_name.data = list(bytes(b'Im_A_Cert')) 292 293 self.send_hal_hci_command( 294 hci_packets.LeSetExtendedAdvertisingDataBuilder( 295 advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT, 296 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name])) 297 enabled_set = hci_packets.EnabledSet() 298 enabled_set.advertising_handle = 1 299 enabled_set.duration = 0 300 enabled_set.max_extended_advertising_events = 0 301 self.send_hal_hci_command( 302 hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set])) 303 304 # LeConnectionComplete 305 self._verify_le_connection_complete() 306 307 def _verify_connection_complete(self): 308 cert_connection_complete_capture = HalCaptures.ConnectionCompleteCapture() 309 assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_connection_complete_capture) 310 cert_handle = cert_connection_complete_capture.get().GetConnectionHandle() 311 312 dut_connection_complete_capture = HciCaptures.ConnectionCompleteCapture() 313 assertThat(self.dut_hci.get_event_stream()).emits(dut_connection_complete_capture) 314 dut_handle = dut_connection_complete_capture.get().GetConnectionHandle() 315 316 return (dut_handle, cert_handle) 317 318 def test_connection_dut_connects(self): 319 self.dut_hci.send_command_with_complete(hci_packets.WritePageTimeoutBuilder(0x4000)) 320 321 # CERT Enables scans and gets its address 322 self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder()) 323 324 cert_read_bd_addr_capture = HalCaptures.ReadBdAddrCompleteCapture() 325 assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_read_bd_addr_capture) 326 address = cert_read_bd_addr_capture.get().GetBdAddr() 327 328 self.send_hal_hci_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 329 330 # DUT Connects 331 self.dut_hci.send_command_with_status( 332 hci_packets.CreateConnectionBuilder( 333 address, 334 0xcc18, # Packet Type 335 hci_packets.PageScanRepetitionMode.R0, 336 0, 337 hci_packets.ClockOffsetValid.INVALID, 338 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 339 340 # Cert Accepts 341 connect_request_capture = HalCaptures.ConnectionRequestCapture() 342 assertThat(self.cert_hal.get_hci_event_stream()).emits(connect_request_capture, timeout=timedelta(seconds=20)) 343 connection_request = connect_request_capture.get() 344 self.send_hal_hci_command( 345 hci_packets.AcceptConnectionRequestBuilder(connection_request.GetBdAddr(), 346 hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) 347 348 (dut_handle, cert_handle) = self._verify_connection_complete() 349 350 # Send ACL Data 351 self.enqueue_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 352 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData')) 353 self.send_hal_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 354 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeMoreAclData')) 355 356 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 357 assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data) 358 359 def test_connection_cert_connects(self): 360 self.send_hal_hci_command(hci_packets.WritePageTimeoutBuilder(0x4000)) 361 362 # DUT Enables scans and gets its address 363 self.dut_hci.send_command_with_complete( 364 hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 365 self.dut_hci.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) 366 367 read_bd_addr_capture = HciCaptures.ReadBdAddrCompleteCapture() 368 assertThat(self.dut_hci.get_event_stream()).emits(read_bd_addr_capture) 369 address = read_bd_addr_capture.get().GetBdAddr() 370 371 # Cert Connects 372 self.send_hal_hci_command( 373 hci_packets.CreateConnectionBuilder( 374 address, 375 0xcc18, # Packet Type 376 hci_packets.PageScanRepetitionMode.R0, 377 0, 378 hci_packets.ClockOffsetValid.INVALID, 379 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 380 381 # DUT Accepts 382 connection_request_capture = HciCaptures.ConnectionRequestCapture() 383 assertThat(self.dut_hci.get_event_stream()).emits(connection_request_capture, timeout=timedelta(seconds=20)) 384 connection_request = connection_request_capture.get() 385 self.dut_hci.send_command_with_status( 386 hci_packets.AcceptConnectionRequestBuilder(connection_request.GetBdAddr(), 387 hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) 388 389 (dut_handle, cert_handle) = self._verify_connection_complete() 390 391 # Send ACL Data 392 self.enqueue_acl_data(dut_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 393 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeAclData')) 394 self.send_hal_acl_data(cert_handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 395 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'This is just SomeMoreAclData')) 396 397 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 398 assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.data) 399