1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from bluetooth_packets_python3 import hci_packets
20from cert.closable import Closable
21from cert.closable import safeClose
22from cert.event_stream import EventStream
23from cert.truth import assertThat
24from facade import common_pb2 as common
25from google.protobuf import empty_pb2 as empty_proto
26from hci.facade import facade_pb2 as hci_facade
27
28from security.facade_pb2 import AuthenticationRequirements
29from security.facade_pb2 import AuthenticationRequirementsMessage
30from security.facade_pb2 import SecurityPolicyMessage
31from security.facade_pb2 import IoCapabilities
32from security.facade_pb2 import IoCapabilityMessage
33from security.facade_pb2 import OobDataPresentMessage
34from security.facade_pb2 import UiCallbackMsg
35from security.facade_pb2 import UiCallbackType
36
37
38class PySecurity(Closable):
39    """
40        Abstraction for security tasks and GRPC calls
41    """
42
43    _io_capabilities_name_lookup = {
44        IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY",
45        IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP",
46        #IoCapabilities.KEYBOARD_ONLY:"KEYBOARD_ONLY",
47        IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT",
48    }
49
50    _auth_reqs_name_lookup = {
51        AuthenticationRequirements.NO_BONDING: "NO_BONDING",
52        AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: "NO_BONDING_MITM_PROTECTION",
53        AuthenticationRequirements.DEDICATED_BONDING: "DEDICATED_BONDING",
54        AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: "DEDICATED_BONDING_MITM_PROTECTION",
55        AuthenticationRequirements.GENERAL_BONDING: "GENERAL_BONDING",
56        AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: "GENERAL_BONDING_MITM_PROTECTION",
57    }
58
59    _ui_event_stream = None
60    _bond_event_stream = None
61
62    def __init__(self, device):
63        logging.debug("DUT: Init")
64        self._device = device
65        self._device.wait_channel_ready()
66        self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty()))
67        self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty()))
68        self._enforce_security_policy_stream = EventStream(
69            self._device.security.FetchEnforceSecurityPolicyEvents(empty_proto.Empty()))
70
71    def create_bond(self, address, type):
72        """
73            Triggers stack under test to create bond
74        """
75        logging.debug("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address)))
76        self._device.security.CreateBond(
77            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))
78
79    def remove_bond(self, address, type):
80        """
81            Removes bond from stack under test
82        """
83        self._device.security.RemoveBond(
84            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))
85
86    def set_io_capabilities(self, io_capabilities):
87        """
88            Set the IO Capabilities used for the DUT
89        """
90        logging.debug("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get(
91            io_capabilities, "ERROR"))
92        self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities))
93
94    def set_authentication_requirements(self, auth_reqs):
95        """
96            Establish authentication requirements for the stack
97        """
98        logging.debug("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get(
99            auth_reqs, "ERROR"))
100        self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs))
101
102    def set_oob_data(self, data_present):
103        """
104            Set the Out-of-band data present flag for SSP pairing
105        """
106        logging.info("DUT: setting OOB data present to '%s'" % data_present)
107        self._device.security.SetOobDataPresent(OobDataPresentMessage(data_present=data_present))
108
109    def send_ui_callback(self, address, callback_type, b, uid):
110        """
111            Send a callback from the UI as if the user pressed a button on the dialog
112        """
113        logging.debug("DUT: Sending user input response uid: %d; response: %s" % (uid, b))
114        self._device.security.SendUiCallback(
115            UiCallbackMsg(
116                message_type=callback_type,
117                boolean=b,
118                unique_id=uid,
119                address=common.BluetoothAddressWithType(
120                    address=common.BluetoothAddress(address=address),
121                    type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)))
122
123    def enable_secure_simple_pairing(self):
124        """
125            This is called when you want to enable SSP for testing
126            Since the stack under test already enables it by default
127            we do not need to do anything here for the time being
128        """
129        pass
130
131    def accept_pairing(self, cert_address, reply_boolean):
132        """
133            Here we pass, but in cert we perform pairing flow tasks.
134            This was added here in order to be more dynamic, but the stack
135            under test will handle the pairing flow.
136        """
137        pass
138
139    def on_user_input(self, cert_address, reply_boolean, expected_ui_event):
140        """
141            Respond to the UI event
142        """
143        if expected_ui_event is None:
144            return
145
146        ui_id = -1
147
148        def get_unique_id(event):
149            if event.message_type == expected_ui_event:
150                nonlocal ui_id
151                ui_id = event.unique_id
152                return True
153            return False
154
155        logging.debug("DUT: Waiting for expected UI event")
156        assertThat(self._ui_event_stream).emits(get_unique_id)
157        # TODO(optedoblivion): Make UiCallbackType dynamic for PASSKEY when added
158        self.send_ui_callback(cert_address, UiCallbackType.YES_NO, reply_boolean, ui_id)
159
160    def get_address(self):
161        return self._device.address
162
163    def wait_for_bond_event(self, expected_bond_event):
164        """
165            A bond event will be triggered once the bond process
166            is complete.  For the DUT we need to wait for it,
167            for Cert it isn't needed.
168        """
169        logging.debug("DUT: Waiting for Bond Event")
170        assertThat(self._bond_event_stream).emits(lambda event: event.message_type == expected_bond_event)
171
172    def wait_for_enforce_security_event(self, expected_enforce_security_event):
173        """
174            We expect a 'True' or 'False' from the enforce security call
175
176            This interface will allow the caller to wait for a callback
177            result from enforcing security policy over the facade.
178        """
179        logging.info("DUT: Waiting for enforce security event")
180        assertThat(self._enforce_security_policy_stream).emits(
181            lambda event: event.result == expected_enforce_security_event or logging.info(event.result))
182
183    def enforce_security_policy(self, address, type, policy):
184        """
185            Call to enforce classic security policy
186        """
187        self._device.security.EnforceSecurityPolicy(
188            SecurityPolicyMessage(
189                address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type),
190                policy=policy))
191
192    def close(self):
193        safeClose(self._ui_event_stream)
194        safeClose(self._bond_event_stream)
195        safeClose(self._enforce_security_policy_stream)
196