1# 2# Copyright 2020 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from cert.gd_base_test import GdBaseTestClass 17from cert.truth import assertThat 18from cert.py_l2cap import PyLeL2cap, PyL2cap 19from cert.matchers import L2capMatchers 20from cert.metadata import metadata 21from facade import common_pb2 as common 22from google.protobuf import empty_pb2 as empty_proto 23from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade 24from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade 25from hci.facade import le_initiator_address_facade_pb2 as le_initiator_address_facade 26import bluetooth_packets_python3 as bt_packets 27from bluetooth_packets_python3 import hci_packets, l2cap_packets 28from l2cap.classic.cert.cert_l2cap import CertL2cap 29from l2cap.le.cert.cert_le_l2cap import CertLeL2cap 30from neighbor.facade import facade_pb2 as neighbor_facade 31 32# Assemble a sample packet. 33SAMPLE_PACKET = bt_packets.RawBuilder([0x19, 0x26, 0x08, 0x17]) 34 35 36class DualL2capTest(GdBaseTestClass): 37 38 def setup_class(self): 39 super().setup_class(dut_module='L2CAP', cert_module='HCI_INTERFACES') 40 41 def setup_test(self): 42 super().setup_test() 43 44 self.dut_address = self.dut.hci_controller.GetMacAddressSimple() 45 cert_address = common.BluetoothAddress( 46 address=self.cert.controller_read_only_property.ReadLocalAddress(empty_proto.Empty()).address) 47 48 self.dut_l2cap = PyL2cap(self.dut, cert_address) 49 self.cert_l2cap = CertL2cap(self.cert) 50 self.dut_le_l2cap = PyLeL2cap(self.dut) 51 self.cert_le_l2cap = CertLeL2cap(self.cert) 52 self.dut_le_address = common.BluetoothAddressWithType( 53 address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS) 54 self.cert_address = common.BluetoothAddressWithType( 55 address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS) 56 dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy( 57 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 58 address_with_type=self.dut_le_address, 59 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 60 minimum_rotation_time=0, 61 maximum_rotation_time=0) 62 self.dut_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy) 63 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 64 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 65 address_with_type=self.cert_address, 66 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 67 minimum_rotation_time=0, 68 maximum_rotation_time=0) 69 self.cert_le_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 70 71 def teardown_test(self): 72 self.cert_le_l2cap.close() 73 self.dut_le_l2cap.close() 74 self.cert_l2cap.close() 75 self.dut_l2cap.close() 76 super().teardown_test() 77 78 def _setup_acl_link_from_cert(self): 79 self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True)) 80 self.cert_l2cap.connect_acl(self.dut_address) 81 82 def _setup_le_link_from_cert(self): 83 # DUT Advertises 84 gap_name = hci_packets.GapData() 85 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 86 gap_name.data = list(bytes(b'Im_The_DUT')) 87 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 88 config = le_advertising_facade.AdvertisingConfig( 89 advertisement=[gap_data], 90 interval_min=512, 91 interval_max=768, 92 event_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 93 address_type=common.RANDOM_DEVICE_ADDRESS, 94 channel_map=7, 95 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 96 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 97 create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) 98 self.cert_le_l2cap.connect_le_acl(self.dut_le_address) 99 100 def _open_le_coc_from_dut(self, psm=0x33, our_scid=None): 101 response_future = self.dut_le_l2cap.connect_coc_to_cert(self.cert_address, psm) 102 cert_channel = self.cert_le_l2cap.verify_and_respond_open_channel_from_remote(psm=psm, our_scid=our_scid) 103 dut_channel = response_future.get_channel() 104 return (dut_channel, cert_channel) 105 106 def _open_channel_from_dut(self, psm=0x33, our_scid=None): 107 dut_channel_future = self.dut_l2cap.connect_dynamic_channel_to_cert(psm) 108 cert_channel = self.cert_l2cap.verify_and_respond_open_channel_from_remote(psm=psm, scid=our_scid) 109 dut_channel = dut_channel_future.get_channel() 110 111 cert_channel.verify_configuration_request_and_respond() 112 cert_channel.send_configure_request([]) 113 cert_channel.verify_configuration_response() 114 115 return (dut_channel, cert_channel) 116 117 def _open_unconfigured_channel_from_cert(self, signal_id=1, scid=0x0101, psm=0x33): 118 119 dut_channel = self.dut_l2cap.register_dynamic_channel(psm) 120 cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid) 121 122 return (dut_channel, cert_channel) 123 124 def _open_channel_from_cert(self, signal_id=1, scid=0x0101, psm=0x33): 125 (dut_channel, cert_channel) = self._open_unconfigured_channel_from_cert(signal_id, scid, psm) 126 cert_channel.verify_configuration_request_and_respond() 127 cert_channel.send_configure_request([]) 128 cert_channel.verify_configuration_response() 129 130 return (dut_channel, cert_channel) 131 132 def _open_le_coc_from_cert(self, signal_id=1, scid=0x0101, psm=0x35, mtu=1000, mps=100, initial_credit=6): 133 134 dut_channel = self.dut_le_l2cap.register_coc(self.cert_address, psm) 135 cert_channel = self.cert_le_l2cap.open_channel(signal_id, psm, scid, mtu, mps, initial_credit) 136 137 return (dut_channel, cert_channel) 138 139 @metadata(pts_test_id="L2CAP/LE/CID/BV-01-C", pts_test_name="Receiving DCID over BR/EDR and LE") 140 def test_receiving_dcid_over_bredr_and_le(self): 141 """ 142 Test that the L2CAP entity can receive the same DCID in L2CAP connect responses on both the 143 BR/EDR and LE links. 144 """ 145 self._setup_acl_link_from_cert() 146 # TODO: We should let LE use public address, same as classic link. 147 # TODO: Update AclManager::impl::create_le_connection 148 self._setup_le_link_from_cert() 149 (dut_channel, cert_channel) = self._open_channel_from_dut(0x33, 0x70) 150 (le_dut_channel, le_cert_channel) = self._open_le_coc_from_dut(0x35, 0x70) 151 152 dut_channel.send(b'abc') 153 assertThat(cert_channel).emits(L2capMatchers.Data(b'abc')) 154 155 le_dut_channel.send(b'hello') 156 assertThat(le_cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) 157 158 le_cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) 159 assertThat(le_dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) 160 161 cert_channel.disconnect_and_verify() 162 le_cert_channel.disconnect_and_verify() 163 164 @metadata(pts_test_id="L2CAP/LE/CID/BV-02-C", pts_test_name="Receiving SCID over BR/EDR and LE") 165 def test_receiving_scid_over_bredr_and_le(self): 166 """ 167 Test that the L2CAP entity can receive the same SCID in L2CAP connect requests on both the 168 BR/EDR and LE links. 169 """ 170 self._setup_acl_link_from_cert() 171 # TODO: We should let LE use public address, same as classic link. 172 # TODO: Update AclManager::impl::create_le_connection 173 self._setup_le_link_from_cert() 174 (dut_channel, cert_channel) = self._open_channel_from_cert(0x33, 0x70) 175 (le_dut_channel, le_cert_channel) = self._open_le_coc_from_cert(0x35, 0x70) 176 177 dut_channel.send(b'abc') 178 assertThat(cert_channel).emits(L2capMatchers.Data(b'abc')) 179 180 le_dut_channel.send(b'hello') 181 assertThat(le_cert_channel).emits(L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5)) 182 183 le_cert_channel.send_first_le_i_frame(4, SAMPLE_PACKET) 184 assertThat(le_dut_channel).emits(L2capMatchers.PacketPayloadRawData(b'\x19\x26\x08\x17')) 185 186 cert_channel.disconnect_and_verify() 187 le_cert_channel.disconnect_and_verify() 188