1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from cert.behavior import IHasBehaviors, SingleArgumentBehavior, ReplyStage 20from cert.closable import Closable 21from cert.closable import safeClose 22from cert.py_acl_manager import PyAclManager 23from cert.truth import assertThat 24import bluetooth_packets_python3 as bt_packets 25from bluetooth_packets_python3 import l2cap_packets 26from bluetooth_packets_python3 import RawBuilder 27from bluetooth_packets_python3.l2cap_packets import CommandCode 28from bluetooth_packets_python3.l2cap_packets import Final 29from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly 30from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction 31from bluetooth_packets_python3.l2cap_packets import Poll 32from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType 33from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult 34from cert.event_stream import FilteringEventStream 35from cert.event_stream import IEventStream 36from cert.matchers import L2capMatchers 37from cert.captures import L2capCaptures 38 39 40class CertL2capChannel(IEventStream): 41 42 def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, fcs=None): 43 self._device = device 44 self._scid = scid 45 self._dcid = dcid 46 self._acl_stream = acl_stream 47 self._acl = acl 48 self._control_channel = control_channel 49 self._config_rsp_received = False 50 self._config_rsp_sent = False 51 if fcs == l2cap_packets.FcsType.DEFAULT: 52 self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid)) 53 else: 54 self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid)) 55 56 def get_event_queue(self): 57 return self._our_acl_view.get_event_queue() 58 59 def is_configured(self): 60 return self._config_rsp_received and self._config_rsp_sent 61 62 def send(self, packet): 63 frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) 64 self._acl.send(frame.Serialize()) 65 66 def send_i_frame(self, 67 tx_seq, 68 req_seq, 69 f=Final.NOT_SET, 70 sar=SegmentationAndReassembly.UNSEGMENTED, 71 payload=None, 72 fcs=False): 73 if fcs == l2cap_packets.FcsType.DEFAULT: 74 frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder(self._dcid, tx_seq, f, req_seq, sar, payload) 75 else: 76 frame = l2cap_packets.EnhancedInformationFrameBuilder(self._dcid, tx_seq, f, req_seq, sar, payload) 77 self._acl.send(frame.Serialize()) 78 79 def send_s_frame(self, req_seq, s=SupervisoryFunction.RECEIVER_READY, p=Poll.NOT_SET, f=Final.NOT_SET): 80 frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(self._dcid, s, p, f, req_seq) 81 self._acl.send(frame.Serialize()) 82 83 def config_request_for_me(self): 84 return L2capMatchers.ConfigurationRequest(self._scid) 85 86 def send_configure_request(self, options, sid=2, continuation=l2cap_packets.Continuation.END): 87 assertThat(self._scid).isNotEqualTo(1) 88 request = l2cap_packets.ConfigurationRequestBuilder(sid, self._dcid, continuation, options) 89 self._control_channel.send(request) 90 91 def _send_information_request(self, type): 92 assertThat(self._scid).isEqualTo(1) 93 signal_id = 3 94 information_request = l2cap_packets.InformationRequestBuilder(signal_id, type) 95 self.send(information_request) 96 97 def send_extended_features_request(self): 98 self._send_information_request(InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) 99 100 def verify_configuration_request_and_respond(self, result=ConfigurationResponseResult.SUCCESS, options=None): 101 request_capture = L2capCaptures.ConfigurationRequest(self._scid) 102 assertThat(self._control_channel).emits(request_capture) 103 request = request_capture.get() 104 sid = request.GetIdentifier() 105 if options is None: 106 options = [] 107 config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END, 108 result, options) 109 self._control_channel.send(config_response) 110 111 def send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None): 112 sid = request.GetIdentifier() 113 if options is None: 114 options = [] 115 config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END, 116 result, options) 117 self._control_channel.send(config_response) 118 self._config_rsp_sent = True 119 120 def verify_configuration_response(self, result=ConfigurationResponseResult.SUCCESS): 121 assertThat(self._control_channel).emits(L2capMatchers.ConfigurationResponse(result)) 122 123 def disconnect_and_verify(self): 124 assertThat(self._scid).isNotEqualTo(1) 125 self._control_channel.send(l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, self._scid)) 126 127 assertThat(self._control_channel).emits(L2capMatchers.DisconnectionResponse(self._scid, self._dcid)) 128 129 def verify_disconnect_request(self): 130 assertThat(self._control_channel).emits(L2capMatchers.DisconnectionRequest(self._dcid, self._scid)) 131 132 133class CertL2capControlChannelBehaviors(object): 134 135 def __init__(self, parent): 136 self.on_config_req_behavior = SingleArgumentBehavior( 137 lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent)) 138 139 def on_config_req(self, matcher): 140 return self.on_config_req_behavior.begin(matcher) 141 142 class CertReplyStage(ReplyStage): 143 144 def __init__(self, parent): 145 self.parent = parent 146 147 def send_configuration_response(self, result=ConfigurationResponseResult.SUCCESS, options=None): 148 self._commit(lambda request: self._send_configuration_response(request, result, options)) 149 return self 150 151 def _send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None): 152 dcid = request.GetDestinationCid() 153 if dcid not in self.parent.scid_to_channel: 154 logging.warning("Received config request with unknown dcid") 155 return 156 self.parent.scid_to_channel[dcid].send_configuration_response(request, result, options) 157 158 159class CertL2cap(Closable, IHasBehaviors): 160 161 def __init__(self, device): 162 self._device = device 163 self._acl_manager = PyAclManager(device) 164 self._acl = None 165 166 self.control_table = { 167 CommandCode.CONNECTION_RESPONSE: self._on_connection_response_default, 168 CommandCode.CONFIGURATION_REQUEST: self._on_configuration_request_default, 169 CommandCode.CONFIGURATION_RESPONSE: self._on_configuration_response_default, 170 CommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default, 171 CommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, 172 CommandCode.INFORMATION_REQUEST: self._on_information_request_default, 173 CommandCode.INFORMATION_RESPONSE: self._on_information_response_default 174 } 175 176 self.scid_to_channel = {} 177 178 self.support_ertm = True 179 self.support_fcs = True 180 181 self._control_behaviors = CertL2capControlChannelBehaviors(self) 182 self._control_behaviors.on_config_req_behavior.set_default(self._send_configuration_response_default) 183 184 def close(self): 185 self._acl_manager.close() 186 safeClose(self._acl) 187 188 def get_behaviors(self): 189 return self._control_behaviors 190 191 def connect_acl(self, remote_addr): 192 self._acl_manager.initiate_connection(remote_addr) 193 self._acl = self._acl_manager.complete_outgoing_connection() 194 self.control_channel = CertL2capChannel( 195 self._device, 1, 1, self._acl.acl_stream, self._acl, control_channel=None) 196 self._acl.acl_stream.register_callback(self._handle_control_packet) 197 198 def open_channel(self, signal_id, psm, scid, fcs=None): 199 self.control_channel.send(l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) 200 201 response = L2capCaptures.ConnectionResponse(scid) 202 assertThat(self.control_channel).emits(response) 203 channel = CertL2capChannel(self._device, scid, 204 response.get().GetDestinationCid(), self._acl.acl_stream, self._acl, 205 self.control_channel, fcs) 206 self.scid_to_channel[scid] = channel 207 208 return channel 209 210 def verify_and_respond_open_channel_from_remote(self, psm=0x33, scid=None, fcs=None): 211 212 request = L2capCaptures.ConnectionRequest(psm) 213 assertThat(self.control_channel).emits(request) 214 215 sid = request.get().GetIdentifier() 216 dcid = request.get().GetSourceCid() 217 if scid is None or scid in self.scid_to_channel: 218 scid = dcid 219 channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel, fcs) 220 self.scid_to_channel[scid] = channel 221 222 connection_response = l2cap_packets.ConnectionResponseBuilder( 223 sid, scid, dcid, l2cap_packets.ConnectionResponseResult.SUCCESS, 224 l2cap_packets.ConnectionResponseStatus.NO_FURTHER_INFORMATION_AVAILABLE) 225 self.control_channel.send(connection_response) 226 227 return channel 228 229 def verify_and_respond_open_channel_from_remote_and_send_config_req(self, psm=0x33): 230 """ 231 Verify a connection request, and send a combo packet of connection response and configuration request 232 """ 233 request = L2capCaptures.ConnectionRequest(psm) 234 assertThat(self.control_channel).emits(request) 235 236 sid = request.get().GetIdentifier() 237 dcid = request.get().GetSourceCid() 238 scid = dcid 239 channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel) 240 self.scid_to_channel[scid] = channel 241 242 # Connection response and config request combo packet 243 conn_rsp_and_config_req = RawBuilder([ 244 0x03, sid, 0x08, 0x00, dcid, 0x00, dcid, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, sid + 1, 0x04, 0x00, dcid, 245 0x00, 0x00, 0x00 246 ]) 247 self.control_channel.send(conn_rsp_and_config_req) 248 249 return channel 250 251 # prefer to use channel abstraction instead, if at all possible 252 def send_acl(self, packet): 253 self._acl.send(packet.Serialize()) 254 255 def get_control_channel(self): 256 return self.control_channel 257 258 # Disable ERTM when exchange extened feature 259 def claim_ertm_unsupported(self): 260 self.support_ertm = False 261 262 def _on_connection_response_default(self, l2cap_control_view): 263 pass 264 265 def _on_configuration_request_default(self, l2cap_control_view): 266 request = l2cap_packets.ConfigurationRequestView(l2cap_control_view) 267 dcid = request.GetDestinationCid() 268 if dcid not in self.scid_to_channel: 269 logging.warning("Received config request with unknown dcid") 270 return 271 self._control_behaviors.on_config_req_behavior.run(request) 272 273 def _send_configuration_response_default(self, captured_request_view): 274 dcid = captured_request_view.GetDestinationCid() 275 if dcid not in self.scid_to_channel: 276 return 277 self.scid_to_channel[dcid].send_configuration_response(captured_request_view) 278 279 @staticmethod 280 def config_option_basic_explicit(mtu=642): 281 mtu_opt = l2cap_packets.MtuConfigurationOption() 282 mtu_opt.mtu = mtu 283 rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption() 284 rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC 285 return [mtu_opt, rfc_opt] 286 287 @staticmethod 288 def config_option_mtu_explicit(mtu=642): 289 mtu_opt = l2cap_packets.MtuConfigurationOption() 290 mtu_opt.mtu = mtu 291 return [mtu_opt] 292 293 @staticmethod 294 def config_option_ertm(mtu=642, 295 fcs=None, 296 max_transmit=10, 297 mps=1010, 298 tx_window_size=10, 299 monitor_time_out=2000, 300 retransmission_time_out=1000): 301 result = [] 302 mtu_opt = l2cap_packets.MtuConfigurationOption() 303 mtu_opt.mtu = mtu 304 result.append(mtu_opt) 305 if fcs is not None: 306 fcs_opt = l2cap_packets.FrameCheckSequenceOption() 307 fcs_opt.fcs_type = fcs 308 result.append(fcs_opt) 309 rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption() 310 rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION 311 rfc_opt.tx_window_size = tx_window_size 312 rfc_opt.max_transmit = max_transmit 313 rfc_opt.retransmission_time_out = retransmission_time_out 314 rfc_opt.monitor_time_out = monitor_time_out 315 rfc_opt.maximum_pdu_size = mps 316 result.append(rfc_opt) 317 return result 318 319 @staticmethod 320 def config_option_ertm_with_max_transmit_one(): 321 return CertL2cap.config_option_ertm(max_transmit=1) 322 323 @staticmethod 324 def config_option_ertm_with_mps(mps=1010): 325 return CertL2cap.config_option_ertm(mps=mps) 326 327 def _on_configuration_response_default(self, l2cap_control_view): 328 response = l2cap_packets.ConfigurationResponseView(l2cap_control_view) 329 scid = response.GetSourceCid() 330 if scid not in self.scid_to_channel: 331 logging.warning("Received config request with unknown dcid") 332 return 333 result = response.GetResult() 334 if result == ConfigurationResponseResult.SUCCESS: 335 self.scid_to_channel[scid]._config_rsp_received = True 336 337 def _on_disconnection_request_default(self, l2cap_control_view): 338 disconnection_request = l2cap_packets.DisconnectionRequestView(l2cap_control_view) 339 sid = disconnection_request.GetIdentifier() 340 scid = disconnection_request.GetSourceCid() 341 dcid = disconnection_request.GetDestinationCid() 342 disconnection_response = l2cap_packets.DisconnectionResponseBuilder(sid, dcid, scid) 343 self.control_channel.send(disconnection_response) 344 345 def _on_disconnection_response_default(self, l2cap_control_view): 346 pass 347 348 def _on_information_request_default(self, l2cap_control_view): 349 information_request = l2cap_packets.InformationRequestView(l2cap_control_view) 350 sid = information_request.GetIdentifier() 351 information_type = information_request.GetInfoType() 352 if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU: 353 response = l2cap_packets.InformationResponseConnectionlessMtuBuilder( 354 sid, l2cap_packets.InformationRequestResult.SUCCESS, 100) 355 self.control_channel.send(response) 356 return 357 if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: 358 response = l2cap_packets.InformationResponseExtendedFeaturesBuilder( 359 sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, self.support_ertm, 0, self.support_fcs, 0, 360 0, 0, 0) 361 self.control_channel.send(response) 362 return 363 if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED: 364 response = l2cap_packets.InformationResponseFixedChannelsBuilder( 365 sid, l2cap_packets.InformationRequestResult.SUCCESS, 2) 366 self.control_channel.send(response) 367 return 368 369 def _on_information_response_default(self, l2cap_control_view): 370 pass 371 372 def _handle_control_packet(self, l2cap_packet): 373 packet_bytes = l2cap_packet.payload 374 l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes))) 375 if l2cap_view.GetChannelId() != 1: 376 return 377 l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) 378 fn = self.control_table.get(l2cap_control_view.GetCode()) 379 if fn is not None: 380 fn(l2cap_control_view) 381