1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18from cert.event_stream import EventStream
19from cert.event_stream import FilteringEventStream
20from cert.event_stream import IEventStream
21from cert.closable import Closable
22from cert.closable import safeClose
23from cert.captures import HciCaptures
24from bluetooth_packets_python3 import hci_packets
25from cert.truth import assertThat
26from hci.facade import facade_pb2 as hci_facade
27
28
29class PyHciAclConnection(IEventStream):
30
31    def __init__(self, handle, acl_stream, device):
32        self.handle = int(handle)
33        self.device = device
34        # todo, handle we got is 0, so doesn't match - fix before enabling filtering
35        self.our_acl_stream = FilteringEventStream(acl_stream, None)
36
37    def send(self, pb_flag, b_flag, data):
38        acl_msg = hci_facade.AclMsg(
39            handle=self.handle, packet_boundary_flag=int(pb_flag), broadcast_flag=int(b_flag), data=data)
40        self.device.hci.SendAclData(acl_msg)
41
42    def send_first(self, data):
43        self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
44                  hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))
45
46    def send_continuing(self, data):
47        self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT,
48                  bytes(data))
49
50    def get_event_queue(self):
51        return self.our_acl_stream.get_event_queue()
52
53
54class PyHci(Closable):
55
56    event_stream = None
57    le_event_stream = None
58    acl_stream = None
59
60    def __init__(self, device, acl_streaming=False):
61        """
62            If you are planning on personally using the ACL data stream
63            coming from HCI, specify acl_streaming=True. You probably only
64            want this if you are testing HCI itself.
65        """
66        self.device = device
67        self._setup_event_stream()
68        self._setup_le_event_stream()
69        if acl_streaming:
70            self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST,
71                                     hci_packets.EventCode.CONNECTION_COMPLETE,
72                                     hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
73            self._setup_acl_stream()
74
75    def _setup_event_stream(self):
76        self.event_stream = EventStream(self.device.hci.FetchEvents(empty_proto.Empty()))
77
78    def _setup_le_event_stream(self):
79        self.le_event_stream = EventStream(self.device.hci.FetchLeSubevents(empty_proto.Empty()))
80
81    def _setup_acl_stream(self):
82        self.acl_stream = EventStream(self.device.hci.FetchAclPackets(empty_proto.Empty()))
83
84    def close(self):
85        safeClose(self.event_stream)
86        safeClose(self.le_event_stream)
87        safeClose(self.acl_stream)
88
89    def get_event_stream(self):
90        return self.event_stream
91
92    def get_le_event_stream(self):
93        return self.le_event_stream
94
95    def get_raw_acl_stream(self):
96        if self.acl_stream is None:
97            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
98        return self.acl_stream
99
100    def register_for_events(self, *event_codes):
101        for event_code in event_codes:
102            msg = hci_facade.EventCodeMsg(code=int(event_code))
103            self.device.hci.RegisterEventHandler(msg)
104
105    def register_for_le_events(self, *event_codes):
106        for event_code in event_codes:
107            msg = hci_facade.EventCodeMsg(code=int(event_code))
108            self.device.hci.RegisterLeEventHandler(msg)
109
110    def send_command_with_complete(self, command):
111        cmd_bytes = bytes(command.Serialize())
112        cmd = hci_facade.CommandMsg(command=cmd_bytes)
113        self.device.hci.EnqueueCommandWithComplete(cmd)
114
115    def send_command_with_status(self, command):
116        cmd_bytes = bytes(command.Serialize())
117        cmd = hci_facade.CommandMsg(command=cmd_bytes)
118        self.device.hci.EnqueueCommandWithStatus(cmd)
119
120    def enable_inquiry_and_page_scan(self):
121        self.send_command_with_complete(
122            hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
123
124    def read_own_address(self):
125        self.send_command_with_complete(hci_packets.ReadBdAddrBuilder())
126        read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture()
127        assertThat(self.event_stream).emits(read_bd_addr)
128        return read_bd_addr.get().GetBdAddr()
129
130    def initiate_connection(self, remote_addr):
131        self.send_command_with_status(
132            hci_packets.CreateConnectionBuilder(
133                remote_addr.decode('utf-8'),
134                0xcc18,  # Packet Type
135                hci_packets.PageScanRepetitionMode.R1,
136                0x0,
137                hci_packets.ClockOffsetValid.INVALID,
138                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
139
140    def accept_connection(self):
141        connection_request = HciCaptures.ConnectionRequestCapture()
142        assertThat(self.event_stream).emits(connection_request)
143
144        self.send_command_with_status(
145            hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(),
146                                                       hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
147        return self.complete_connection()
148
149    def complete_connection(self):
150        connection_complete = HciCaptures.ConnectionCompleteCapture()
151        assertThat(self.event_stream).emits(connection_complete)
152
153        handle = connection_complete.get().GetConnectionHandle()
154        if self.acl_stream is None:
155            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
156        return PyHciAclConnection(handle, self.acl_stream, self.device)
157