1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18
19from l2cap.classic import facade_pb2 as l2cap_facade_pb2
20from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
21from l2cap.le.facade_pb2 import SecurityLevel
22from bluetooth_packets_python3 import l2cap_packets
23from cert.event_stream import FilteringEventStream
24from cert.event_stream import EventStream, IEventStream
25from cert.closable import Closable, safeClose
26from cert.truth import assertThat
27from cert.matchers import L2capMatchers
28from facade import common_pb2 as common
29
30
31class PyL2capChannel(IEventStream):
32
33    def __init__(self, device, psm, l2cap_stream):
34        self._device = device
35        self._psm = psm
36        self._le_l2cap_stream = l2cap_stream
37        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
38                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))
39
40    def get_event_queue(self):
41        return self._our_le_l2cap_view.get_event_queue()
42
43    def send(self, payload):
44        self._device.l2cap.SendDynamicChannelPacket(
45            l2cap_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))
46
47    def close_channel(self):
48        self._device.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=self._psm))
49
50    def set_traffic_paused(self, paused):
51        self._device.l2cap.SetTrafficPaused(l2cap_facade_pb2.SetTrafficPausedRequest(psm=self._psm, paused=paused))
52
53
54class _ClassicConnectionResponseFutureWrapper(object):
55    """
56    The future object returned when we send a connection request from DUT. Can be used to get connection status and
57    create the corresponding PyL2capDynamicChannel object later
58    """
59
60    def __init__(self, grpc_response_future, device, psm, l2cap_stream):
61        self._grpc_response_future = grpc_response_future
62        self._device = device
63        self._psm = psm
64        self._l2cap_stream = l2cap_stream
65
66    def get_channel(self):
67        return PyL2capChannel(self._device, self._psm, self._l2cap_stream)
68
69
70class PyL2cap(Closable):
71
72    def __init__(self, device, cert_address):
73        self._device = device
74        self._cert_address = cert_address
75        self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty()))
76
77    def close(self):
78        safeClose(self._l2cap_stream)
79
80    def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
81        self._device.l2cap.SetDynamicChannel(
82            l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode))
83        return PyL2capChannel(self._device, psm, self._l2cap_stream)
84
85    def connect_dynamic_channel_to_cert(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
86        """
87        Send open Dynamic channel request to CERT.
88        Get a future for connection result, to be used after CERT accepts request
89        """
90        self.register_dynamic_channel(psm, mode)
91        response_future = self._device.l2cap.OpenChannel.future(
92            l2cap_facade_pb2.OpenChannelRequest(psm=psm, remote=self._cert_address, mode=mode))
93
94        return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm, self._l2cap_stream)
95
96    def get_channel_queue_buffer_size(self):
97        return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size
98
99
100class PyLeL2capFixedChannel(IEventStream):
101
102    def __init__(self, device, cid, l2cap_stream):
103        self._device = device
104        self._cid = cid
105        self._le_l2cap_stream = l2cap_stream
106        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
107                                                       L2capMatchers.PacketPayloadWithMatchingCid(self._cid))
108
109    def get_event_queue(self):
110        return self._our_le_l2cap_view.get_event_queue()
111
112    def send(self, payload):
113        self._device.l2cap_le.SendFixedChannelPacket(
114            l2cap_le_facade_pb2.FixedChannelPacket(cid=self._cid, payload=payload))
115
116    def close_channel(self):
117        self._device.l2cap_le.SetFixedChannel(
118            l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=self._cid, enable=False))
119
120
121class PyLeL2capDynamicChannel(IEventStream):
122
123    def __init__(self, device, cert_address, psm, l2cap_stream):
124        self._device = device
125        self._cert_address = cert_address
126        self._psm = psm
127        self._le_l2cap_stream = l2cap_stream
128        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
129                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))
130
131    def get_event_queue(self):
132        return self._our_le_l2cap_view.get_event_queue()
133
134    def send(self, payload):
135        self._device.l2cap_le.SendDynamicChannelPacket(
136            l2cap_le_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))
137
138    def close_channel(self):
139        self._device.l2cap_le.CloseDynamicChannel(
140            l2cap_le_facade_pb2.CloseDynamicChannelRequest(remote=self._cert_address, psm=self._psm))
141
142
143class _CreditBasedConnectionResponseFutureWrapper(object):
144    """
145    The future object returned when we send a connection request from DUT. Can be used to get connection status and
146    create the corresponding PyLeL2capDynamicChannel object later
147    """
148
149    def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream):
150        self._grpc_response_future = grpc_response_future
151        self._device = device
152        self._cert_address = cert_address
153        self._psm = psm
154        self._le_l2cap_stream = le_l2cap_stream
155
156    def get_status(self):
157        return l2cap_packets.LeCreditBasedConnectionResponseResult(self._grpc_response_future.result().status)
158
159    def get_channel(self):
160        assertThat(self.get_status()).isEqualTo(l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS)
161        return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream)
162
163
164class PyLeL2cap(Closable):
165
166    def __init__(self, device):
167        self._device = device
168        self._le_l2cap_stream = EventStream(self._device.l2cap_le.FetchL2capData(empty_proto.Empty()))
169
170    def close(self):
171        safeClose(self._le_l2cap_stream)
172
173    def enable_fixed_channel(self, cid=4):
174        self._device.l2cap_le.SetFixedChannel(l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=cid, enable=True))
175
176    def get_fixed_channel(self, cid=4):
177        return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream)
178
179    def register_coc(self, cert_address, psm=0x33, security_level=SecurityLevel.NO_SECURITY):
180        self._device.l2cap_le.SetDynamicChannel(
181            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, enable=True, security_level=security_level))
182        return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream)
183
184    def connect_coc_to_cert(self, cert_address, psm=0x33):
185        """
186        Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request
187        """
188        self.register_coc(cert_address, psm)
189        response_future = self._device.l2cap_le.OpenDynamicChannel.future(
190            l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address))
191
192        return _CreditBasedConnectionResponseFutureWrapper(response_future, self._device, cert_address, psm,
193                                                           self._le_l2cap_stream)
194
195    def update_connection_parameter(self,
196                                    conn_interval_min=0x10,
197                                    conn_interval_max=0x10,
198                                    conn_latency=0x0a,
199                                    supervision_timeout=0x64,
200                                    min_ce_length=12,
201                                    max_ce_length=12):
202        self._device.l2cap_le.SendConnectionParameterUpdate(
203            l2cap_le_facade_pb2.ConnectionParameter(
204                conn_interval_min=conn_interval_min,
205                conn_interval_max=conn_interval_max,
206                conn_latency=conn_latency,
207                supervision_timeout=supervision_timeout,
208                min_ce_length=min_ce_length,
209                max_ce_length=max_ce_length))
210