1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from bluetooth_packets_python3 import hci_packets 20from cert.closable import Closable 21from cert.closable import safeClose 22from cert.event_stream import EventStream 23from cert.truth import assertThat 24from facade import common_pb2 as common 25from google.protobuf import empty_pb2 as empty_proto 26from hci.facade import facade_pb2 as hci_facade 27 28from security.facade_pb2 import AuthenticationRequirements 29from security.facade_pb2 import AuthenticationRequirementsMessage 30from security.facade_pb2 import SecurityPolicyMessage 31from security.facade_pb2 import IoCapabilities 32from security.facade_pb2 import IoCapabilityMessage 33from security.facade_pb2 import OobDataPresentMessage 34from security.facade_pb2 import UiCallbackMsg 35from security.facade_pb2 import UiCallbackType 36 37 38class PySecurity(Closable): 39 """ 40 Abstraction for security tasks and GRPC calls 41 """ 42 43 _io_capabilities_name_lookup = { 44 IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY", 45 IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP", 46 #IoCapabilities.KEYBOARD_ONLY:"KEYBOARD_ONLY", 47 IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT", 48 } 49 50 _auth_reqs_name_lookup = { 51 AuthenticationRequirements.NO_BONDING: "NO_BONDING", 52 AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: "NO_BONDING_MITM_PROTECTION", 53 AuthenticationRequirements.DEDICATED_BONDING: "DEDICATED_BONDING", 54 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: "DEDICATED_BONDING_MITM_PROTECTION", 55 AuthenticationRequirements.GENERAL_BONDING: "GENERAL_BONDING", 56 AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: "GENERAL_BONDING_MITM_PROTECTION", 57 } 58 59 _ui_event_stream = None 60 _bond_event_stream = None 61 62 def __init__(self, device): 63 logging.debug("DUT: Init") 64 self._device = device 65 self._device.wait_channel_ready() 66 self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty())) 67 self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty())) 68 self._enforce_security_policy_stream = EventStream( 69 self._device.security.FetchEnforceSecurityPolicyEvents(empty_proto.Empty())) 70 71 def create_bond(self, address, type): 72 """ 73 Triggers stack under test to create bond 74 """ 75 logging.debug("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) 76 self._device.security.CreateBond( 77 common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) 78 79 def remove_bond(self, address, type): 80 """ 81 Removes bond from stack under test 82 """ 83 self._device.security.RemoveBond( 84 common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) 85 86 def set_io_capabilities(self, io_capabilities): 87 """ 88 Set the IO Capabilities used for the DUT 89 """ 90 logging.debug("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( 91 io_capabilities, "ERROR")) 92 self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities)) 93 94 def set_authentication_requirements(self, auth_reqs): 95 """ 96 Establish authentication requirements for the stack 97 """ 98 logging.debug("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( 99 auth_reqs, "ERROR")) 100 self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs)) 101 102 def set_oob_data(self, data_present): 103 """ 104 Set the Out-of-band data present flag for SSP pairing 105 """ 106 logging.info("DUT: setting OOB data present to '%s'" % data_present) 107 self._device.security.SetOobDataPresent(OobDataPresentMessage(data_present=data_present)) 108 109 def send_ui_callback(self, address, callback_type, b, uid): 110 """ 111 Send a callback from the UI as if the user pressed a button on the dialog 112 """ 113 logging.debug("DUT: Sending user input response uid: %d; response: %s" % (uid, b)) 114 self._device.security.SendUiCallback( 115 UiCallbackMsg( 116 message_type=callback_type, 117 boolean=b, 118 unique_id=uid, 119 address=common.BluetoothAddressWithType( 120 address=common.BluetoothAddress(address=address), 121 type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS))) 122 123 def enable_secure_simple_pairing(self): 124 """ 125 This is called when you want to enable SSP for testing 126 Since the stack under test already enables it by default 127 we do not need to do anything here for the time being 128 """ 129 pass 130 131 def accept_pairing(self, cert_address, reply_boolean): 132 """ 133 Here we pass, but in cert we perform pairing flow tasks. 134 This was added here in order to be more dynamic, but the stack 135 under test will handle the pairing flow. 136 """ 137 pass 138 139 def on_user_input(self, cert_address, reply_boolean, expected_ui_event): 140 """ 141 Respond to the UI event 142 """ 143 if expected_ui_event is None: 144 return 145 146 ui_id = -1 147 148 def get_unique_id(event): 149 if event.message_type == expected_ui_event: 150 nonlocal ui_id 151 ui_id = event.unique_id 152 return True 153 return False 154 155 logging.debug("DUT: Waiting for expected UI event") 156 assertThat(self._ui_event_stream).emits(get_unique_id) 157 # TODO(optedoblivion): Make UiCallbackType dynamic for PASSKEY when added 158 self.send_ui_callback(cert_address, UiCallbackType.YES_NO, reply_boolean, ui_id) 159 160 def get_address(self): 161 return self._device.address 162 163 def wait_for_bond_event(self, expected_bond_event): 164 """ 165 A bond event will be triggered once the bond process 166 is complete. For the DUT we need to wait for it, 167 for Cert it isn't needed. 168 """ 169 logging.debug("DUT: Waiting for Bond Event") 170 assertThat(self._bond_event_stream).emits(lambda event: event.message_type == expected_bond_event) 171 172 def wait_for_enforce_security_event(self, expected_enforce_security_event): 173 """ 174 We expect a 'True' or 'False' from the enforce security call 175 176 This interface will allow the caller to wait for a callback 177 result from enforcing security policy over the facade. 178 """ 179 logging.info("DUT: Waiting for enforce security event") 180 assertThat(self._enforce_security_policy_stream).emits( 181 lambda event: event.result == expected_enforce_security_event or logging.info(event.result)) 182 183 def enforce_security_policy(self, address, type, policy): 184 """ 185 Call to enforce classic security policy 186 """ 187 self._device.security.EnforceSecurityPolicy( 188 SecurityPolicyMessage( 189 address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type), 190 policy=policy)) 191 192 def close(self): 193 safeClose(self._ui_event_stream) 194 safeClose(self._bond_event_stream) 195 safeClose(self._enforce_security_policy_stream) 196