1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18from cert.event_stream import EventStream 19from cert.event_stream import FilteringEventStream 20from cert.event_stream import IEventStream 21from cert.closable import Closable 22from cert.closable import safeClose 23from cert.captures import HciCaptures 24from bluetooth_packets_python3 import hci_packets 25from cert.truth import assertThat 26from hci.facade import facade_pb2 as hci_facade 27 28 29class PyHciAclConnection(IEventStream): 30 31 def __init__(self, handle, acl_stream, device): 32 self.handle = int(handle) 33 self.device = device 34 # todo, handle we got is 0, so doesn't match - fix before enabling filtering 35 self.our_acl_stream = FilteringEventStream(acl_stream, None) 36 37 def send(self, pb_flag, b_flag, data): 38 acl_msg = hci_facade.AclMsg( 39 handle=self.handle, packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=data) 40 self.device.hci.SendAclData(acl_msg) 41 42 def send_first(self, data): 43 self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, 44 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) 45 46 def send_continuing(self, data): 47 self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, 48 bytes(data)) 49 50 def get_event_queue(self): 51 return self.our_acl_stream.get_event_queue() 52 53 54class PyHci(Closable): 55 56 event_stream = None 57 le_event_stream = None 58 acl_stream = None 59 60 def __init__(self, device, acl_streaming=False): 61 """ 62 If you are planning on personally using the ACL data stream 63 coming from HCI, specify acl_streaming=True. You probably only 64 want this if you are testing HCI itself. 65 """ 66 self.device = device 67 self._setup_event_stream() 68 self._setup_le_event_stream() 69 if acl_streaming: 70 self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST, 71 hci_packets.EventCode.CONNECTION_COMPLETE, 72 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 73 self._setup_acl_stream() 74 75 def _setup_event_stream(self): 76 self.event_stream = EventStream(self.device.hci.FetchEvents(empty_proto.Empty())) 77 78 def _setup_le_event_stream(self): 79 self.le_event_stream = EventStream(self.device.hci.FetchLeSubevents(empty_proto.Empty())) 80 81 def _setup_acl_stream(self): 82 self.acl_stream = EventStream(self.device.hci.FetchAclPackets(empty_proto.Empty())) 83 84 def close(self): 85 safeClose(self.event_stream) 86 safeClose(self.le_event_stream) 87 safeClose(self.acl_stream) 88 89 def get_event_stream(self): 90 return self.event_stream 91 92 def get_le_event_stream(self): 93 return self.le_event_stream 94 95 def get_raw_acl_stream(self): 96 if self.acl_stream is None: 97 raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) 98 return self.acl_stream 99 100 def register_for_events(self, *event_codes): 101 for event_code in event_codes: 102 msg = hci_facade.EventCodeMsg(code=int(event_code)) 103 self.device.hci.RegisterEventHandler(msg) 104 105 def register_for_le_events(self, *event_codes): 106 for event_code in event_codes: 107 msg = hci_facade.EventCodeMsg(code=int(event_code)) 108 self.device.hci.RegisterLeEventHandler(msg) 109 110 def send_command_with_complete(self, command): 111 cmd_bytes = bytes(command.Serialize()) 112 cmd = hci_facade.CommandMsg(command=cmd_bytes) 113 self.device.hci.EnqueueCommandWithComplete(cmd) 114 115 def send_command_with_status(self, command): 116 cmd_bytes = bytes(command.Serialize()) 117 cmd = hci_facade.CommandMsg(command=cmd_bytes) 118 self.device.hci.EnqueueCommandWithStatus(cmd) 119 120 def enable_inquiry_and_page_scan(self): 121 self.send_command_with_complete( 122 hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 123 124 def read_own_address(self): 125 self.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) 126 read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() 127 assertThat(self.event_stream).emits(read_bd_addr) 128 return read_bd_addr.get().GetBdAddr() 129 130 def initiate_connection(self, remote_addr): 131 self.send_command_with_status( 132 hci_packets.CreateConnectionBuilder( 133 remote_addr.decode('utf-8'), 134 0xcc18, # Packet Type 135 hci_packets.PageScanRepetitionMode.R1, 136 0x0, 137 hci_packets.ClockOffsetValid.INVALID, 138 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 139 140 def accept_connection(self): 141 connection_request = HciCaptures.ConnectionRequestCapture() 142 assertThat(self.event_stream).emits(connection_request) 143 144 self.send_command_with_status( 145 hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), 146 hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) 147 return self.complete_connection() 148 149 def complete_connection(self): 150 connection_complete = HciCaptures.ConnectionCompleteCapture() 151 assertThat(self.event_stream).emits(connection_complete) 152 153 handle = connection_complete.get().GetConnectionHandle() 154 if self.acl_stream is None: 155 raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) 156 return PyHciAclConnection(handle, self.acl_stream, self.device) 157