1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18 19from l2cap.classic import facade_pb2 as l2cap_facade_pb2 20from l2cap.le import facade_pb2 as l2cap_le_facade_pb2 21from l2cap.le.facade_pb2 import SecurityLevel 22from bluetooth_packets_python3 import l2cap_packets 23from cert.event_stream import FilteringEventStream 24from cert.event_stream import EventStream, IEventStream 25from cert.closable import Closable, safeClose 26from cert.truth import assertThat 27from cert.matchers import L2capMatchers 28from facade import common_pb2 as common 29 30 31class PyL2capChannel(IEventStream): 32 33 def __init__(self, device, psm, l2cap_stream): 34 self._device = device 35 self._psm = psm 36 self._le_l2cap_stream = l2cap_stream 37 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 38 L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) 39 40 def get_event_queue(self): 41 return self._our_le_l2cap_view.get_event_queue() 42 43 def send(self, payload): 44 self._device.l2cap.SendDynamicChannelPacket( 45 l2cap_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload)) 46 47 def close_channel(self): 48 self._device.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=self._psm)) 49 50 def set_traffic_paused(self, paused): 51 self._device.l2cap.SetTrafficPaused(l2cap_facade_pb2.SetTrafficPausedRequest(psm=self._psm, paused=paused)) 52 53 54class _ClassicConnectionResponseFutureWrapper(object): 55 """ 56 The future object returned when we send a connection request from DUT. Can be used to get connection status and 57 create the corresponding PyL2capDynamicChannel object later 58 """ 59 60 def __init__(self, grpc_response_future, device, psm, l2cap_stream): 61 self._grpc_response_future = grpc_response_future 62 self._device = device 63 self._psm = psm 64 self._l2cap_stream = l2cap_stream 65 66 def get_channel(self): 67 return PyL2capChannel(self._device, self._psm, self._l2cap_stream) 68 69 70class PyL2cap(Closable): 71 72 def __init__(self, device, cert_address): 73 self._device = device 74 self._cert_address = cert_address 75 self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty())) 76 77 def close(self): 78 safeClose(self._l2cap_stream) 79 80 def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): 81 self._device.l2cap.SetDynamicChannel( 82 l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode)) 83 return PyL2capChannel(self._device, psm, self._l2cap_stream) 84 85 def connect_dynamic_channel_to_cert(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): 86 """ 87 Send open Dynamic channel request to CERT. 88 Get a future for connection result, to be used after CERT accepts request 89 """ 90 self.register_dynamic_channel(psm, mode) 91 response_future = self._device.l2cap.OpenChannel.future( 92 l2cap_facade_pb2.OpenChannelRequest(psm=psm, remote=self._cert_address, mode=mode)) 93 94 return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm, self._l2cap_stream) 95 96 def get_channel_queue_buffer_size(self): 97 return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size 98 99 100class PyLeL2capFixedChannel(IEventStream): 101 102 def __init__(self, device, cid, l2cap_stream): 103 self._device = device 104 self._cid = cid 105 self._le_l2cap_stream = l2cap_stream 106 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 107 L2capMatchers.PacketPayloadWithMatchingCid(self._cid)) 108 109 def get_event_queue(self): 110 return self._our_le_l2cap_view.get_event_queue() 111 112 def send(self, payload): 113 self._device.l2cap_le.SendFixedChannelPacket( 114 l2cap_le_facade_pb2.FixedChannelPacket(cid=self._cid, payload=payload)) 115 116 def close_channel(self): 117 self._device.l2cap_le.SetFixedChannel( 118 l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=self._cid, enable=False)) 119 120 121class PyLeL2capDynamicChannel(IEventStream): 122 123 def __init__(self, device, cert_address, psm, l2cap_stream): 124 self._device = device 125 self._cert_address = cert_address 126 self._psm = psm 127 self._le_l2cap_stream = l2cap_stream 128 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 129 L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) 130 131 def get_event_queue(self): 132 return self._our_le_l2cap_view.get_event_queue() 133 134 def send(self, payload): 135 self._device.l2cap_le.SendDynamicChannelPacket( 136 l2cap_le_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload)) 137 138 def close_channel(self): 139 self._device.l2cap_le.CloseDynamicChannel( 140 l2cap_le_facade_pb2.CloseDynamicChannelRequest(remote=self._cert_address, psm=self._psm)) 141 142 143class _CreditBasedConnectionResponseFutureWrapper(object): 144 """ 145 The future object returned when we send a connection request from DUT. Can be used to get connection status and 146 create the corresponding PyLeL2capDynamicChannel object later 147 """ 148 149 def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream): 150 self._grpc_response_future = grpc_response_future 151 self._device = device 152 self._cert_address = cert_address 153 self._psm = psm 154 self._le_l2cap_stream = le_l2cap_stream 155 156 def get_status(self): 157 return l2cap_packets.LeCreditBasedConnectionResponseResult(self._grpc_response_future.result().status) 158 159 def get_channel(self): 160 assertThat(self.get_status()).isEqualTo(l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) 161 return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream) 162 163 164class PyLeL2cap(Closable): 165 166 def __init__(self, device): 167 self._device = device 168 self._le_l2cap_stream = EventStream(self._device.l2cap_le.FetchL2capData(empty_proto.Empty())) 169 170 def close(self): 171 safeClose(self._le_l2cap_stream) 172 173 def enable_fixed_channel(self, cid=4): 174 self._device.l2cap_le.SetFixedChannel(l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=cid, enable=True)) 175 176 def get_fixed_channel(self, cid=4): 177 return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream) 178 179 def register_coc(self, cert_address, psm=0x33, security_level=SecurityLevel.NO_SECURITY): 180 self._device.l2cap_le.SetDynamicChannel( 181 l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, enable=True, security_level=security_level)) 182 return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream) 183 184 def connect_coc_to_cert(self, cert_address, psm=0x33): 185 """ 186 Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request 187 """ 188 self.register_coc(cert_address, psm) 189 response_future = self._device.l2cap_le.OpenDynamicChannel.future( 190 l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address)) 191 192 return _CreditBasedConnectionResponseFutureWrapper(response_future, self._device, cert_address, psm, 193 self._le_l2cap_stream) 194 195 def update_connection_parameter(self, 196 conn_interval_min=0x10, 197 conn_interval_max=0x10, 198 conn_latency=0x0a, 199 supervision_timeout=0x64, 200 min_ce_length=12, 201 max_ce_length=12): 202 self._device.l2cap_le.SendConnectionParameterUpdate( 203 l2cap_le_facade_pb2.ConnectionParameter( 204 conn_interval_min=conn_interval_min, 205 conn_interval_max=conn_interval_max, 206 conn_latency=conn_latency, 207 supervision_timeout=supervision_timeout, 208 min_ce_length=min_ce_length, 209 max_ce_length=max_ce_length)) 210