1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from cert.behavior import IHasBehaviors, SingleArgumentBehavior, ReplyStage
20from cert.closable import Closable
21from cert.closable import safeClose
22from cert.py_acl_manager import PyAclManager
23from cert.truth import assertThat
24import bluetooth_packets_python3 as bt_packets
25from bluetooth_packets_python3 import l2cap_packets
26from bluetooth_packets_python3 import RawBuilder
27from bluetooth_packets_python3.l2cap_packets import CommandCode
28from bluetooth_packets_python3.l2cap_packets import Final
29from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
30from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
31from bluetooth_packets_python3.l2cap_packets import Poll
32from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
33from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult
34from cert.event_stream import FilteringEventStream
35from cert.event_stream import IEventStream
36from cert.matchers import L2capMatchers
37from cert.captures import L2capCaptures
38
39
40class CertL2capChannel(IEventStream):
41
42    def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, fcs=None):
43        self._device = device
44        self._scid = scid
45        self._dcid = dcid
46        self._acl_stream = acl_stream
47        self._acl = acl
48        self._control_channel = control_channel
49        self._config_rsp_received = False
50        self._config_rsp_sent = False
51        if fcs == l2cap_packets.FcsType.DEFAULT:
52            self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid))
53        else:
54            self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid))
55
56    def get_event_queue(self):
57        return self._our_acl_view.get_event_queue()
58
59    def is_configured(self):
60        return self._config_rsp_received and self._config_rsp_sent
61
62    def send(self, packet):
63        frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet)
64        self._acl.send(frame.Serialize())
65
66    def send_i_frame(self,
67                     tx_seq,
68                     req_seq,
69                     f=Final.NOT_SET,
70                     sar=SegmentationAndReassembly.UNSEGMENTED,
71                     payload=None,
72                     fcs=False):
73        if fcs == l2cap_packets.FcsType.DEFAULT:
74            frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder(self._dcid, tx_seq, f, req_seq, sar, payload)
75        else:
76            frame = l2cap_packets.EnhancedInformationFrameBuilder(self._dcid, tx_seq, f, req_seq, sar, payload)
77        self._acl.send(frame.Serialize())
78
79    def send_s_frame(self, req_seq, s=SupervisoryFunction.RECEIVER_READY, p=Poll.NOT_SET, f=Final.NOT_SET):
80        frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(self._dcid, s, p, f, req_seq)
81        self._acl.send(frame.Serialize())
82
83    def config_request_for_me(self):
84        return L2capMatchers.ConfigurationRequest(self._scid)
85
86    def send_configure_request(self, options, sid=2, continuation=l2cap_packets.Continuation.END):
87        assertThat(self._scid).isNotEqualTo(1)
88        request = l2cap_packets.ConfigurationRequestBuilder(sid, self._dcid, continuation, options)
89        self._control_channel.send(request)
90
91    def _send_information_request(self, type):
92        assertThat(self._scid).isEqualTo(1)
93        signal_id = 3
94        information_request = l2cap_packets.InformationRequestBuilder(signal_id, type)
95        self.send(information_request)
96
97    def send_extended_features_request(self):
98        self._send_information_request(InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
99
100    def verify_configuration_request_and_respond(self, result=ConfigurationResponseResult.SUCCESS, options=None):
101        request_capture = L2capCaptures.ConfigurationRequest(self._scid)
102        assertThat(self._control_channel).emits(request_capture)
103        request = request_capture.get()
104        sid = request.GetIdentifier()
105        if options is None:
106            options = []
107        config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END,
108                                                                     result, options)
109        self._control_channel.send(config_response)
110
111    def send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None):
112        sid = request.GetIdentifier()
113        if options is None:
114            options = []
115        config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END,
116                                                                     result, options)
117        self._control_channel.send(config_response)
118        self._config_rsp_sent = True
119
120    def verify_configuration_response(self, result=ConfigurationResponseResult.SUCCESS):
121        assertThat(self._control_channel).emits(L2capMatchers.ConfigurationResponse(result))
122
123    def disconnect_and_verify(self):
124        assertThat(self._scid).isNotEqualTo(1)
125        self._control_channel.send(l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, self._scid))
126
127        assertThat(self._control_channel).emits(L2capMatchers.DisconnectionResponse(self._scid, self._dcid))
128
129    def verify_disconnect_request(self):
130        assertThat(self._control_channel).emits(L2capMatchers.DisconnectionRequest(self._dcid, self._scid))
131
132
133class CertL2capControlChannelBehaviors(object):
134
135    def __init__(self, parent):
136        self.on_config_req_behavior = SingleArgumentBehavior(
137            lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent))
138
139    def on_config_req(self, matcher):
140        return self.on_config_req_behavior.begin(matcher)
141
142    class CertReplyStage(ReplyStage):
143
144        def __init__(self, parent):
145            self.parent = parent
146
147        def send_configuration_response(self, result=ConfigurationResponseResult.SUCCESS, options=None):
148            self._commit(lambda request: self._send_configuration_response(request, result, options))
149            return self
150
151        def _send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None):
152            dcid = request.GetDestinationCid()
153            if dcid not in self.parent.scid_to_channel:
154                logging.warning("Received config request with unknown dcid")
155                return
156            self.parent.scid_to_channel[dcid].send_configuration_response(request, result, options)
157
158
159class CertL2cap(Closable, IHasBehaviors):
160
161    def __init__(self, device):
162        self._device = device
163        self._acl_manager = PyAclManager(device)
164        self._acl = None
165
166        self.control_table = {
167            CommandCode.CONNECTION_RESPONSE: self._on_connection_response_default,
168            CommandCode.CONFIGURATION_REQUEST: self._on_configuration_request_default,
169            CommandCode.CONFIGURATION_RESPONSE: self._on_configuration_response_default,
170            CommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default,
171            CommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default,
172            CommandCode.INFORMATION_REQUEST: self._on_information_request_default,
173            CommandCode.INFORMATION_RESPONSE: self._on_information_response_default
174        }
175
176        self.scid_to_channel = {}
177
178        self.support_ertm = True
179        self.support_fcs = True
180
181        self._control_behaviors = CertL2capControlChannelBehaviors(self)
182        self._control_behaviors.on_config_req_behavior.set_default(self._send_configuration_response_default)
183
184    def close(self):
185        self._acl_manager.close()
186        safeClose(self._acl)
187
188    def get_behaviors(self):
189        return self._control_behaviors
190
191    def connect_acl(self, remote_addr):
192        self._acl_manager.initiate_connection(remote_addr)
193        self._acl = self._acl_manager.complete_outgoing_connection()
194        self.control_channel = CertL2capChannel(
195            self._device, 1, 1, self._acl.acl_stream, self._acl, control_channel=None)
196        self._acl.acl_stream.register_callback(self._handle_control_packet)
197
198    def open_channel(self, signal_id, psm, scid, fcs=None):
199        self.control_channel.send(l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid))
200
201        response = L2capCaptures.ConnectionResponse(scid)
202        assertThat(self.control_channel).emits(response)
203        channel = CertL2capChannel(self._device, scid,
204                                   response.get().GetDestinationCid(), self._acl.acl_stream, self._acl,
205                                   self.control_channel, fcs)
206        self.scid_to_channel[scid] = channel
207
208        return channel
209
210    def verify_and_respond_open_channel_from_remote(self, psm=0x33, scid=None, fcs=None):
211
212        request = L2capCaptures.ConnectionRequest(psm)
213        assertThat(self.control_channel).emits(request)
214
215        sid = request.get().GetIdentifier()
216        dcid = request.get().GetSourceCid()
217        if scid is None or scid in self.scid_to_channel:
218            scid = dcid
219        channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel, fcs)
220        self.scid_to_channel[scid] = channel
221
222        connection_response = l2cap_packets.ConnectionResponseBuilder(
223            sid, scid, dcid, l2cap_packets.ConnectionResponseResult.SUCCESS,
224            l2cap_packets.ConnectionResponseStatus.NO_FURTHER_INFORMATION_AVAILABLE)
225        self.control_channel.send(connection_response)
226
227        return channel
228
229    def verify_and_respond_open_channel_from_remote_and_send_config_req(self, psm=0x33):
230        """
231        Verify a connection request, and send a combo packet of connection response and configuration request
232        """
233        request = L2capCaptures.ConnectionRequest(psm)
234        assertThat(self.control_channel).emits(request)
235
236        sid = request.get().GetIdentifier()
237        dcid = request.get().GetSourceCid()
238        scid = dcid
239        channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel)
240        self.scid_to_channel[scid] = channel
241
242        # Connection response and config request combo packet
243        conn_rsp_and_config_req = RawBuilder([
244            0x03, sid, 0x08, 0x00, dcid, 0x00, dcid, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, sid + 1, 0x04, 0x00, dcid,
245            0x00, 0x00, 0x00
246        ])
247        self.control_channel.send(conn_rsp_and_config_req)
248
249        return channel
250
251    # prefer to use channel abstraction instead, if at all possible
252    def send_acl(self, packet):
253        self._acl.send(packet.Serialize())
254
255    def get_control_channel(self):
256        return self.control_channel
257
258    # Disable ERTM when exchange extened feature
259    def claim_ertm_unsupported(self):
260        self.support_ertm = False
261
262    def _on_connection_response_default(self, l2cap_control_view):
263        pass
264
265    def _on_configuration_request_default(self, l2cap_control_view):
266        request = l2cap_packets.ConfigurationRequestView(l2cap_control_view)
267        dcid = request.GetDestinationCid()
268        if dcid not in self.scid_to_channel:
269            logging.warning("Received config request with unknown dcid")
270            return
271        self._control_behaviors.on_config_req_behavior.run(request)
272
273    def _send_configuration_response_default(self, captured_request_view):
274        dcid = captured_request_view.GetDestinationCid()
275        if dcid not in self.scid_to_channel:
276            return
277        self.scid_to_channel[dcid].send_configuration_response(captured_request_view)
278
279    @staticmethod
280    def config_option_basic_explicit(mtu=642):
281        mtu_opt = l2cap_packets.MtuConfigurationOption()
282        mtu_opt.mtu = mtu
283        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption()
284        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
285        return [mtu_opt, rfc_opt]
286
287    @staticmethod
288    def config_option_mtu_explicit(mtu=642):
289        mtu_opt = l2cap_packets.MtuConfigurationOption()
290        mtu_opt.mtu = mtu
291        return [mtu_opt]
292
293    @staticmethod
294    def config_option_ertm(mtu=642,
295                           fcs=None,
296                           max_transmit=10,
297                           mps=1010,
298                           tx_window_size=10,
299                           monitor_time_out=2000,
300                           retransmission_time_out=1000):
301        result = []
302        mtu_opt = l2cap_packets.MtuConfigurationOption()
303        mtu_opt.mtu = mtu
304        result.append(mtu_opt)
305        if fcs is not None:
306            fcs_opt = l2cap_packets.FrameCheckSequenceOption()
307            fcs_opt.fcs_type = fcs
308            result.append(fcs_opt)
309        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption()
310        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION
311        rfc_opt.tx_window_size = tx_window_size
312        rfc_opt.max_transmit = max_transmit
313        rfc_opt.retransmission_time_out = retransmission_time_out
314        rfc_opt.monitor_time_out = monitor_time_out
315        rfc_opt.maximum_pdu_size = mps
316        result.append(rfc_opt)
317        return result
318
319    @staticmethod
320    def config_option_ertm_with_max_transmit_one():
321        return CertL2cap.config_option_ertm(max_transmit=1)
322
323    @staticmethod
324    def config_option_ertm_with_mps(mps=1010):
325        return CertL2cap.config_option_ertm(mps=mps)
326
327    def _on_configuration_response_default(self, l2cap_control_view):
328        response = l2cap_packets.ConfigurationResponseView(l2cap_control_view)
329        scid = response.GetSourceCid()
330        if scid not in self.scid_to_channel:
331            logging.warning("Received config request with unknown dcid")
332            return
333        result = response.GetResult()
334        if result == ConfigurationResponseResult.SUCCESS:
335            self.scid_to_channel[scid]._config_rsp_received = True
336
337    def _on_disconnection_request_default(self, l2cap_control_view):
338        disconnection_request = l2cap_packets.DisconnectionRequestView(l2cap_control_view)
339        sid = disconnection_request.GetIdentifier()
340        scid = disconnection_request.GetSourceCid()
341        dcid = disconnection_request.GetDestinationCid()
342        disconnection_response = l2cap_packets.DisconnectionResponseBuilder(sid, dcid, scid)
343        self.control_channel.send(disconnection_response)
344
345    def _on_disconnection_response_default(self, l2cap_control_view):
346        pass
347
348    def _on_information_request_default(self, l2cap_control_view):
349        information_request = l2cap_packets.InformationRequestView(l2cap_control_view)
350        sid = information_request.GetIdentifier()
351        information_type = information_request.GetInfoType()
352        if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU:
353            response = l2cap_packets.InformationResponseConnectionlessMtuBuilder(
354                sid, l2cap_packets.InformationRequestResult.SUCCESS, 100)
355            self.control_channel.send(response)
356            return
357        if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
358            response = l2cap_packets.InformationResponseExtendedFeaturesBuilder(
359                sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, self.support_ertm, 0, self.support_fcs, 0,
360                0, 0, 0)
361            self.control_channel.send(response)
362            return
363        if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED:
364            response = l2cap_packets.InformationResponseFixedChannelsBuilder(
365                sid, l2cap_packets.InformationRequestResult.SUCCESS, 2)
366            self.control_channel.send(response)
367            return
368
369    def _on_information_response_default(self, l2cap_control_view):
370        pass
371
372    def _handle_control_packet(self, l2cap_packet):
373        packet_bytes = l2cap_packet.payload
374        l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
375        if l2cap_view.GetChannelId() != 1:
376            return
377        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
378        fn = self.control_table.get(l2cap_control_view.GetCode())
379        if fn is not None:
380            fn(l2cap_control_view)
381