1#!/usr/bin/env python3 2############################################################################################################### 3# 4# Copyright (C) 2019 Motorola Mobility LLC 5# 6# Redistribution and use in source and binary forms, with or without modification, are permitted provided that 7# the following conditions are met: 8# 9# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the 10# following disclaimer. 11# 12# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and 13# the following disclaimer in the documentation and/or other materials provided with the distribution. 14# 15# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or 16# promote products derived from this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED 19# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 20# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR 21# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 22# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 25# POSSIBILITY OF SUCH DAMAGE. 26# 27############################################################################################################### 28############################################################################################################### 29# 30# Bluetooth Virtual Sniffing for Android 31# 32# This script supports Bluetooth Virtual Sniffing via Live Import Feature of Frontline Bluetooth Sniffer(FTS) 33# 34# It extracts the HCI packets from Snoop logs and redirect it to FTS for live HCI sniffing. 35# 36# It uses liveimport.ini and LiveImportAPI.dll from FTS path to communicate with FTS sniffer software. 37# 38# It works on both Windows and Ubuntu. For Ubuntu, both FTS and Python should be installed on Wine. 39# 40# FTS_INI_PATH & FTS_DLL_PATH should be set to absolute path of liveimport.ini and LiveImportAPI.dll of FTS 41# 42# Example below - This may change per machine per FTS version in Windows vs Ubuntu (Wine), set accordingly. 43# For Windows 44# FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\' 45# FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\' 46# 47# For Ubuntu - FTS path recognized by Wine (not Ubuntu path) 48# FTS_INI_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\' 49# FTS_DLL_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\' 50# 51############################################################################################################### 52 53import os 54import platform 55import socket 56import struct 57import subprocess 58import sys 59import time 60if sys.version_info[0] >= 3: 61 import configparser 62else: 63 import ConfigParser as configparser 64 65from calendar import timegm 66from ctypes import byref, c_bool, c_longlong, CDLL 67from _ctypes import FreeLibrary 68from datetime import datetime 69 70# Update below to right path corresponding to your machine, FTS version and OS used. 71FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\' 72FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\Executables\\Core\\' 73 74iniName = 'liveimport.ini' 75if (platform.architecture()[0] == '32bit'): 76 dllName = 'LiveImportAPI.dll' 77else: 78 dllName = 'LiveImportAPI_x64.dll' 79 80launchFtsCmd = '\"' + FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"' 81 82# Unix Epoch delta since 01/01/1970 83FILETIME_EPOCH_DELTA = 116444736000000000 84HUNDREDS_OF_NANOSECONDS = 10000000 85 86HOST = 'localhost' 87PORT = 8872 88SNOOP_ID = 16 89SNOOP_HDR = 24 90 91 92def get_file_time(): 93 """ 94 Obtain current time in file time format for display 95 """ 96 date_time = datetime.now() 97 file_time = FILETIME_EPOCH_DELTA + (timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS) 98 file_time = file_time + (date_time.microsecond * 10) 99 return file_time 100 101 102def get_connection_string(): 103 """ 104 Read ConnectionString from liveimport.ini 105 """ 106 config = configparser.ConfigParser() 107 config.read(FTS_INI_PATH + iniName) 108 try: 109 conn_str = config.get('General', 'ConnectionString') 110 except (configparser.NoSectionError, configparser.NoOptionError): 111 return None 112 113 return conn_str 114 115 116def get_configuration_string(): 117 """ 118 Read Configuration string from liveimport.ini 119 """ 120 config_str = '' 121 config = configparser.ConfigParser() 122 config.read(FTS_INI_PATH + iniName) 123 try: 124 config_items = config.items('Configuration') 125 except (configparser.NoSectionError, configparser.NoOptionError): 126 return None 127 128 if config_items is not None: 129 for item in config_items: 130 key, value = item 131 config_str += ("%s=%s\n" % (key, value)) 132 return config_str 133 else: 134 return None 135 136 137def check_live_import_connection(live_import): 138 """ 139 Launch FTS app in Virtual Sniffing Mode 140 Check if FTS App is ready to start receiving the data. 141 If not, wait until 1 min and exit if FTS didn't start. 142 """ 143 is_connection_running = c_bool() 144 count = 0 145 146 status = live_import.IsAppReady(byref(is_connection_running)) 147 if (is_connection_running.value == True): 148 print("FTS is already launched, Start capture if not already started") 149 return True 150 151 print("Launching FTS Virtual Sniffing") 152 try: 153 ftsProcess = subprocess.Popen((launchFtsCmd), stdout=subprocess.PIPE) 154 except: 155 print("Error in Launching FTS.. exiting") 156 return False 157 158 while (is_connection_running.value == False and count < 12): 159 status = live_import.IsAppReady(byref(is_connection_running)) 160 if (status < 0): 161 print("Live Import Internal Error %d" % (status)) 162 return False 163 if (is_connection_running.value == False): 164 print("Waiting for 5 sec.. Open FTS Virtual Sniffing") 165 time.sleep(5) 166 count += 1 167 if (is_connection_running.value == True): 168 print("FTS is ready to receive the data, Start capture now") 169 return True 170 else: 171 print("FTS Virtual Sniffing didn't start until 1 min.. exiting") 172 return False 173 174 175def init_live_import(conn_str, config_str): 176 """ 177 Load DLL and Initialize the LiveImport module for FTS. 178 """ 179 success = c_bool() 180 try: 181 live_import = CDLL(FTS_DLL_PATH + dllName) 182 except: 183 return None 184 185 if live_import is None: 186 print("Error: Path to LiveImportAPI.dll is incorrect.. exiting") 187 return None 188 189 print(dllName + " loaded successfully") 190 result = live_import.InitializeLiveImport( 191 conn_str.encode('ascii', 'ignore'), config_str.encode('ascii', 'ignore'), byref(success)) 192 if (result < 0): 193 print("Live Import Init failed") 194 return None 195 else: 196 print("Live Import Init success") 197 return live_import 198 199 200def release_live_import(live_import): 201 """ 202 Cleanup and exit live import module. 203 """ 204 if live_import is not None: 205 live_import.ReleaseLiveImport() 206 FreeLibrary(live_import._handle) 207 208 209def main(): 210 211 print("Bluetooth Virtual Sniffing for Fluoride") 212 connection_str = get_connection_string() 213 if connection_str is None: 214 print("Error: path to liveimport.ini is incorrect.. exiting") 215 exit(0) 216 217 configuration_str = get_configuration_string() 218 if configuration_str is None: 219 print("Error: path to liveimport.ini is incorrect.. exiting") 220 exit(0) 221 222 live_import = init_live_import(connection_str, configuration_str) 223 if live_import is None: 224 print("Error: Path to LiveImportAPI.dll is incorrect.. exiting") 225 exit(0) 226 227 if (check_live_import_connection(live_import) == False): 228 release_live_import(live_import) 229 exit(0) 230 231 # Wait until the forward socket is ready 232 print("Waiting until adb is ready") 233 os.system('adb wait-for-device forward tcp:8872 tcp:8872') 234 235 btsnoop_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 236 btsnoop_sock.connect((HOST, PORT)) 237 snoop_id = btsnoop_sock.recv(SNOOP_ID) 238 if not snoop_id.startswith(b"btsnoop"): 239 print("Error: Snoop ID wasn't received.. exiting") 240 release_live_import(live_import) 241 exit(0) 242 243 while True: 244 try: 245 snoop_hdr = btsnoop_sock.recv(SNOOP_HDR) 246 if snoop_hdr is not None: 247 try: 248 olen, ilen, flags = struct.unpack(">LLL", snoop_hdr[0:12]) 249 except struct.error: 250 print("Error: Invalid data", repr(snoop_hdr)) 251 continue 252 253 file_time = get_file_time() 254 timestamp = c_longlong(file_time) 255 256 snoop_data = b'' 257 while (len(snoop_data) < olen): 258 data_frag = btsnoop_sock.recv(olen - len(snoop_data)) 259 if data_frag is not None: 260 snoop_data += data_frag 261 262 print("Bytes received %d Olen %d ilen %d flags %d" % (len(snoop_data), olen, ilen, flags)) 263 packet_type = struct.unpack(">B", snoop_data[0:1])[0] 264 if packet_type == 1: 265 drf = 1 266 isend = 0 267 elif packet_type == 2: 268 drf = 2 269 if (flags & 0x01): 270 isend = 1 271 else: 272 isend = 0 273 elif packet_type == 3: 274 drf = 4 275 if (flags & 0x01): 276 isend = 1 277 else: 278 isend = 0 279 elif packet_type == 4: 280 drf = 8 281 isend = 1 282 283 result = live_import.SendFrame(olen - 1, olen - 1, snoop_data[1:olen], drf, isend, timestamp) 284 if (result < 0): 285 print("Send frame failed") 286 except KeyboardInterrupt: 287 print("Cleanup and exit") 288 release_live_import(live_import) 289 btsnoop_sock.close() 290 exit(0) 291 292 293if __name__ == '__main__': 294 main() 295