1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""
17Prerequisites:
18    Windows 10
19    Bluetooth PTS installed
20    Recommended: Running cmder as Admin: https://cmder.net/
21
22### BEGIN SETUP STEPS###
231. Install latest version of Python for windows:
24    https://www.python.org/downloads/windows/
25
26Tested successfully on Python 3.7.3.:
27    https://www.python.org/ftp/python/3.7.3/python-3.7.3.exe
28
292. Launch Powershell and setup PATH:
30Setx PATH “%PATH%;C:/Users/<username>/AppData/Local/Programs/Python/Python37-32/Scripts31
323. Launch Cmder as Admin before running any PTS related ACTS tests.
33
34
35### END SETUP STEPS###
36
37
38Bluetooth PTS controller.
39Mandatory parameters are log_directory and sig_root_directory.
40
41ACTS Config setup:
42"BluetoothPtsDevice": {
43    "log_directory": "C:\\Users\\fsbtt\\Documents\\Profile Tuning Suite\\Test_Dir",
44    "sig_root_directory": "C:\\Program Files (x86)\\Bluetooth SIG"
45}
46
47"""
48from acts import signals
49from datetime import datetime
50from threading import Thread
51
52import ctypes
53import logging
54import os
55import subprocess
56import time
57import xml.etree.ElementTree as ET
58
59from xml.dom import minidom
60from xml.etree.ElementTree import Element
61
62
63class BluetoothPtsDeviceConfigError(signals.ControllerError):
64    pass
65
66
67class BluetoothPtsSnifferError(signals.ControllerError):
68    pass
69
70
71MOBLY_CONTROLLER_CONFIG_NAME = "BluetoothPtsDevice"
72ACTS_CONTROLLER_REFERENCE_NAME = "bluetooth_pts_device"
73
74# Prefix to identify final verdict string. This is a PTS specific log String.
75VERDICT = 'VERDICT/'
76
77# Verdict strings that are specific to PTS.
78VERDICT_STRINGS = {
79    'RESULT_PASS': 'PASS',
80    'RESULT_FAIL': 'FAIL',
81    'RESULT_INCONC': 'INCONC',
82    'RESULT_INCOMP':
83    'INCOMP',  # Initial final verdict meaning that test has not completed yet.
84    'RESULT_NONE':
85    'NONE',  # Error verdict usually indicating internal PTS error.
86}
87
88# Sniffer ready log message.
89SNIFFER_READY = 'SNIFFER/Save and clear complete'
90
91# PTS Log Types as defined by PTS:
92LOG_TYPE_GENERAL_TEXT = 0
93LOG_TYPE_FIRST = 1
94LOG_TYPE_START_TEST_CASE = 1
95LOG_TYPE_TEST_CASE_ENDED = 2
96LOG_TYPE_START_DEFAULT = 3
97LOG_TYPE_DEFAULT_ENDED = 4
98LOG_TYPE_FINAL_VERDICT = 5
99LOG_TYPE_PRELIMINARY_VERDICT = 6
100LOG_TYPE_TIMEOUT = 7
101LOG_TYPE_ASSIGNMENT = 8
102LOG_TYPE_START_TIMER = 9
103LOG_TYPE_STOP_TIMER = 10
104LOG_TYPE_CANCEL_TIMER = 11
105LOG_TYPE_READ_TIMER = 12
106LOG_TYPE_ATTACH = 13
107LOG_TYPE_IMPLICIT_SEND = 14
108LOG_TYPE_GOTO = 15
109LOG_TYPE_TIMED_OUT_TIMER = 16
110LOG_TYPE_ERROR = 17
111LOG_TYPE_CREATE = 18
112LOG_TYPE_DONE = 19
113LOG_TYPE_ACTIVATE = 20
114LOG_TYPE_MESSAGE = 21
115LOG_TYPE_LINE_MATCHED = 22
116LOG_TYPE_LINE_NOT_MATCHED = 23
117LOG_TYPE_SEND_EVENT = 24
118LOG_TYPE_RECEIVE_EVENT = 25
119LOG_TYPE_OTHERWISE_EVENT = 26
120LOG_TYPE_RECEIVED_ON_PCO = 27
121LOG_TYPE_MATCH_FAILED = 28
122LOG_TYPE_COORDINATION_MESSAGE = 29
123
124PTS_DEVICE_EMPTY_CONFIG_MSG = "Configuration is empty, abort!"
125
126
127def create(config):
128    if not config:
129        raise errors.PTS_DEVICE_EMPTY_CONFIG_MSG
130    return get_instance(config)
131
132
133def destroy(pts):
134    try:
135        pts[0].clean_up()
136    except:
137        pts[0].log.error("Failed to clean up properly.")
138
139
140def get_info(pts_devices):
141    """Get information from the BluetoothPtsDevice object.
142
143    Args:
144        pts_devices: A list of BluetoothPtsDevice objects although only one
145        will ever be specified.
146
147    Returns:
148        A dict, representing info for BluetoothPtsDevice object.
149    """
150    return {
151        "address": pts_devices[0].address,
152        "sniffer_ready": pts_devices[0].sniffer_ready,
153        "ets_manager_library": pts_devices[0].ets_manager_library,
154        "log_directory": pts_devices[0].log_directory,
155        "pts_installation_directory":
156        pts_devices[0].pts_installation_directory,
157    }
158
159
160def get_instance(config):
161    """Create BluetoothPtsDevice instance from a dictionary containing
162    information related to PTS. Namely the SIG root directory as
163    sig_root_directory and the log directory represented by the log_directory.
164
165    Args:
166        config: A dict that contains BluetoothPtsDevice device info.
167
168    Returns:
169        A list of BluetoothPtsDevice objects.
170    """
171    result = []
172    try:
173        log_directory = config.pop("log_directory")
174    except KeyError:
175        raise BluetoothPtsDeviceConfigError(
176            "Missing mandatory log_directory in config.")
177    try:
178        sig_root_directory = config.pop("sig_root_directory")
179    except KeyError:
180        example_path = \
181            "C:\\\\Program Files (x86)\\\\Bluetooth SIG"
182        raise BluetoothPtsDeviceConfigError(
183            "Missing mandatory sig_root_directory in config. Example path: {}".
184            format(example_path))
185
186    # "C:\\Program Files (x86)\\Bluetooth SIG\\Bluetooth PTS\\bin\\ETSManager.dll"
187    ets_manager_library = "{}\\Bluetooth PTS\\bin\\ETSManager.dll".format(
188        sig_root_directory)
189    # "C:\\Program Files (x86)\\Bluetooth SIG\\Bluetooth PTS\\bin"
190    pts_installation_directory = "{}\\Bluetooth PTS\\bin".format(
191        sig_root_directory)
192    # "C:\\Program Files (x86)\\Bluetooth SIG\\Bluetooth Protocol Viewer"
193    pts_sniffer_directory = "{}\\Bluetooth Protocol Viewer".format(
194        sig_root_directory)
195    result.append(
196        BluetoothPtsDevice(ets_manager_library, log_directory,
197                           pts_installation_directory, pts_sniffer_directory))
198    return result
199
200
201class BluetoothPtsDevice:
202    """Class representing an Bluetooth PTS device and associated functions.
203
204    Each object of this class represents one BluetoothPtsDevice in ACTS.
205    """
206
207    _next_action = -1
208    _observers = []
209    address = ""
210    current_implicit_send_description = ""
211    devices = []
212    extra_answers = []
213    log_directory = ""
214    log = None
215    ics = None
216    ixit = None
217    profile_under_test = None
218    pts_library = None
219    pts_profile_mmi_request = ""
220    pts_test_result = VERDICT_STRINGS['RESULT_INCOMP']
221    sniffer_ready = False
222    test_log_directory = ""
223    test_log_prefix = ""
224
225    def __init__(self, ets_manager_library, log_directory,
226                 pts_installation_directory, pts_sniffer_directory):
227        self.log = logging.getLogger()
228        if ets_manager_library is not None:
229            self.ets_manager_library = ets_manager_library
230        self.log_directory = log_directory
231        if pts_installation_directory is not None:
232            self.pts_installation_directory = pts_installation_directory
233        if pts_sniffer_directory is not None:
234            self.pts_sniffer_directory = pts_sniffer_directory
235        # Define callback functions
236        self.USEAUTOIMPLSENDFUNC = ctypes.CFUNCTYPE(ctypes.c_bool)
237        self.use_auto_impl_send_func = self.USEAUTOIMPLSENDFUNC(
238            self.UseAutoImplicitSend)
239
240        self.DONGLE_MSG_FUNC = ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_char_p)
241        self.dongle_msg_func = self.DONGLE_MSG_FUNC(self.DongleMsg)
242
243        self.DEVICE_SEARCH_MSG_FUNC = ctypes.CFUNCTYPE(ctypes.c_bool,
244                                                       ctypes.c_char_p,
245                                                       ctypes.c_char_p,
246                                                       ctypes.c_char_p)
247        self.dev_search_msg_func = self.DEVICE_SEARCH_MSG_FUNC(
248            self.DeviceSearchMsg)
249
250        self.LOGFUNC = ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_char_p,
251                                        ctypes.c_char_p, ctypes.c_char_p,
252                                        ctypes.c_int, ctypes.c_void_p)
253        self.log_func = self.LOGFUNC(self.Log)
254
255        self.ONIMPLSENDFUNC = ctypes.CFUNCTYPE(ctypes.c_char_p,
256                                               ctypes.c_char_p, ctypes.c_int)
257        self.onimplsend_func = self.ONIMPLSENDFUNC(self.ImplicitSend)
258
259        # Helps with PTS reliability.
260        os.chdir(self.pts_installation_directory)
261        # Load EtsManager
262        self.pts_library = ctypes.cdll.LoadLibrary(self.ets_manager_library)
263        self.log.info("ETS Manager library {0:s} has been loaded".format(
264            self.ets_manager_library))
265        # If post-logging is turned on all callbacks to LPLOG-type function
266        # will be executed after test execution is complete. It is recommended
267        # that post-logging is turned on to avoid simultaneous invocations of
268        # LPLOG and LPAUTOIMPLICITSEND callbacks.
269        self.pts_library.SetPostLoggingEx(True)
270
271        self.xml_root = Element("ARCHIVE")
272        version = Element("VERSION")
273        version.text = "2.0"
274        self.xml_root.append(version)
275        self.xml_pts_pixit = Element("PicsPixit")
276        self.xml_pts_pixit.text = ""
277        self.xml_pts_running_log = Element("LOG")
278        self.xml_pts_running_log.text = ""
279        self.xml_pts_running_summary = Element("SUMMARY")
280        self.xml_pts_running_summary.text = ""
281
282    def clean_up(self):
283        # Since we have no insight to the actual PTS library,
284        # catch all Exceptions and log them.
285        try:
286            self.log.info("Cleaning up Stack...")
287            self.pts_library.ExitStackEx(self.profile_under_test)
288        except Exception as err:
289            self.log.error(
290                "Failed to clean up BluetoothPtsDevice: {}".format(err))
291        try:
292            self.log.info("Unregistering Profile...")
293            self.pts_library.UnregisterProfileEx.argtypes = [ctypes.c_char_p]
294            self.pts_library.UnregisterProfileEx(
295                self.profile_under_test.encode())
296            self.pts_library.UnRegisterGetDevInfoEx()
297        except Exception as err:
298            self.log.error(
299                "Failed to clean up BluetoothPtsDevice: {}".format(err))
300        try:
301            self.log.info("Cleaning up Sniffer")
302            self.pts_library.SnifferTerminateEx()
303        except Exception as err:
304            self.log.error(
305                "Failed to clean up BluetoothPtsDevice: {}".format(err))
306        self.log.info("Cleanup Done.")
307
308    def write_xml_pts_pixit_values_for_current_test(self):
309        """ Writes the current PICS and IXIT values to the XML result.
310        """
311        self.xml_pts_pixit.text = "ICS VALUES:\n\n"
312        for key, value in self.ics.items():
313            self.xml_pts_pixit.text += "{} {}\n".format(
314                key.decode(), value.decode())
315        self.xml_pts_pixit.text += "\nIXIT VALUES:\n\n"
316        for key, (_, value) in self.ixit.items():
317            self.xml_pts_pixit.text += "{} {}\n".format(
318                key.decode(), value.decode())
319
320    def set_ics_and_ixit(self, ics, ixit):
321        self.ics = ics
322        self.ixit = ixit
323
324    def set_profile_under_test(self, profile):
325        self.profile_under_test = profile
326
327    def setup_pts(self):
328        """Prepares PTS to run tests. This needs to be called in test classes
329        after ICS, IXIT, and setting Profile under test.
330        Specifically BluetoothPtsDevice functions:
331            set_profile_under_test
332            set_ics_and_ixit
333        """
334
335        # Register layer to test with callbacks
336        self.pts_library.RegisterProfileWithCallbacks.argtypes = [
337            ctypes.c_char_p, self.USEAUTOIMPLSENDFUNC, self.ONIMPLSENDFUNC,
338            self.LOGFUNC, self.DEVICE_SEARCH_MSG_FUNC, self.DONGLE_MSG_FUNC
339        ]
340        res = self.pts_library.RegisterProfileWithCallbacks(
341            self.profile_under_test.encode(), self.use_auto_impl_send_func,
342            self.onimplsend_func, self.log_func, self.dev_search_msg_func,
343            self.dongle_msg_func)
344
345        self.log.info(
346            "Profile has been registered with result {0:d}".format(res))
347
348        # GetDeviceInfo module is for discovering devices and PTS Dongle address
349        # Initialize GetDeviceInfo and register it with callbacks
350        # First parameter is PTS executable directory
351        self.pts_library.InitGetDevInfoWithCallbacks.argtypes = [
352            ctypes.c_char_p, self.DEVICE_SEARCH_MSG_FUNC, self.DONGLE_MSG_FUNC
353        ]
354        res = self.pts_library.InitGetDevInfoWithCallbacks(
355            self.pts_installation_directory.encode(), self.dev_search_msg_func,
356            self.dongle_msg_func)
357        self.log.info(
358            "GetDevInfo has been initialized with result {0:d}".format(res))
359        # Initialize PTS dongle
360        res = self.pts_library.VerifyDongleEx()
361        self.log.info(
362            "PTS dongle has been initialized with result {0:d}".format(res))
363
364        # Find PTS dongle address
365        self.pts_library.GetDongleBDAddress.restype = ctypes.c_ulonglong
366        self.address = self.pts_library.GetDongleBDAddress()
367        self.address_str = "{0:012X}".format(self.address)
368        self.log.info("PTS BD Address 0x{0:s}".format(self.address_str))
369
370        # Initialize Bluetooth Protocol Viewer communication module
371        self.pts_library.SnifferInitializeEx()
372
373        # If Bluetooth Protocol Viewer is not running, start it
374        if not self.is_sniffer_running():
375            self.log.info("Starting Protocol Viewer")
376            args = [
377                "{}\Executables\Core\FTS.exe".format(
378                    self.pts_sniffer_directory),
379                '/PTS Protocol Viewer=Generic',
380                '/OEMTitle=Bluetooth Protocol Viewer', '/OEMKey=Virtual'
381            ]
382            subprocess.Popen(args)
383            sniffer_timeout = 10
384            while not self.is_sniffer_running():
385                time.sleep(sniffer_timeout)
386
387        # Register to recieve Bluetooth Protocol Viewer notofications
388        self.pts_library.SnifferRegisterNotificationEx()
389        self.pts_library.SetParameterEx.argtypes = [
390            ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p
391        ]
392
393        for ics_name in self.ics:
394            res = self.pts_library.SetParameterEx(
395                ics_name, b'BOOLEAN', self.ics[ics_name],
396                self.profile_under_test.encode())
397            if res:
398                self.log.info("ICS {0:s} set successfully".format(
399                    str(ics_name)))
400            else:
401                self.log.error("Setting ICS {0:s} value failed".format(
402                    str(ics_name)))
403
404        for ixit_name in self.ixit:
405            res = self.pts_library.SetParameterEx(
406                ixit_name, (self.ixit[ixit_name])[0],
407                (self.ixit[ixit_name])[1], self.profile_under_test.encode())
408            if res:
409                self.log.info("IXIT {0:s} set successfully".format(
410                    str(ixit_name)))
411            else:
412                self.log.error("Setting IXIT {0:s} value failed".format(
413                    str(ixit_name)))
414
415        # Prepare directory to store Bluetooth Protocol Viewer output
416        if not os.path.exists(self.log_directory):
417            os.makedirs(self.log_directory)
418
419        address_b = self.address_str.encode("utf-8")
420        self.pts_library.InitEtsEx.argtypes = [
421            ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p
422        ]
423
424        implicit_send_path = "{}\\implicit_send3.dll".format(
425            self.pts_installation_directory).encode()
426        res = self.pts_library.InitEtsEx(self.profile_under_test.encode(),
427                                         self.log_directory.encode(),
428                                         implicit_send_path, address_b)
429        self.log.info("ETS has been initialized with result {0:s}".format(
430            str(res)))
431
432        # Initialize Host Stack DLL
433        self.pts_library.InitStackEx.argtypes = [ctypes.c_char_p]
434        res = self.pts_library.InitStackEx(self.profile_under_test.encode())
435        self.log.info("Stack has been initialized with result {0:s}".format(
436            str(res)))
437
438        # Select to receive Log messages after test is done
439        self.pts_library.SetPostLoggingEx.argtypes = [
440            ctypes.c_bool, ctypes.c_char_p
441        ]
442        self.pts_library.SetPostLoggingEx(True,
443                                          self.profile_under_test.encode())
444
445        # Clear Bluetooth Protocol Viewer. Dongle message callback will update
446        # sniffer_ready automatically. No need to fail setup if the timeout
447        # is exceeded since the logs will still be available just not starting
448        # from a clean slate. Just post a warning.
449        self.sniffer_ready = False
450        self.pts_library.SnifferClearEx()
451        end_time = time.time() + 10
452        while not self.sniffer_ready and time.time() < end_time:
453            time.sleep(1)
454        if not self.sniffer_ready:
455            self.log.warning("Sniffer not cleared. Continuing.")
456
457    def is_sniffer_running(self):
458        """ Looks for running Bluetooth Protocol Viewer process
459
460        Returns:
461            Returns True if finds one, False otherwise.
462        """
463        prog = [
464            line.split()
465            for line in subprocess.check_output("tasklist").splitlines()
466        ]
467        [prog.pop(e) for e in [0, 1, 2]]
468        for task in prog:
469            task_name = task[0].decode("utf-8")
470            if task_name == "Fts.exe":
471                self.log.info("Found FTS process successfully.")
472                # Sleep recommended by PTS.
473                time.sleep(1)
474                return True
475        return False
476
477    def UseAutoImplicitSend(self):
478        """Callback method that defines Which ImplicitSend will be used.
479
480        Returns:
481            True always to inform PTS to use the local implementation.
482        """
483        return True
484
485    def DongleMsg(self, msg_str):
486        """ Receives PTS dongle messages.
487
488        Specifically this receives the Bluetooth Protocol Viewer completed
489        save/clear operations.
490
491        Returns:
492            True if sniffer is ready, False otherwise.
493        """
494        msg = (ctypes.c_char_p(msg_str).value).decode("utf-8")
495        self.log.info(msg)
496        # Sleep recommended by PTS.
497        time.sleep(1)
498        if SNIFFER_READY in msg:
499            self.sniffer_ready = True
500        return True
501
502    def DeviceSearchMsg(self, addr_str, name_str, cod_str):
503        """ Receives device search messages
504
505        Each device may return multiple messages
506        Each message will contain device address and may contain device name and
507        COD.
508
509        Returns:
510            True always and reports to the callback appropriately.
511        """
512        addr = (ctypes.c_char_p(addr_str).value).replace(b'\xed',
513                                                         b' ').decode("utf-8")
514        name = (ctypes.c_char_p(name_str).value).replace(b'\xed',
515                                                         b' ').decode("utf-8")
516        cod = (ctypes.c_char_p(cod_str).value).replace(b'\xed',
517                                                       b' ').decode("utf-8")
518        self.devices.append(
519            "Device address = {0:s} name = {1:s} cod = {2:s}".format(
520                addr, name, cod))
521        return True
522
523    def Log(self, log_time_str, log_descr_str, log_msg_str, log_type, project):
524        """ Receives PTS log messages.
525
526        Returns:
527            True always and reports to the callback appropriately.
528        """
529        log_time = (ctypes.c_char_p(log_time_str).value).decode("utf-8")
530        log_descr = (ctypes.c_char_p(log_descr_str).value).decode("utf-8")
531        log_msg = (ctypes.c_char_p(log_msg_str).value).decode("utf-8")
532        if "Verdict Description" in log_descr:
533            self.xml_pts_running_summary.text += "\t- {}".format(log_msg)
534        if "Final Verdict" in log_descr:
535            self.xml_pts_running_summary.text += "{}{}\n".format(
536                log_descr.strip(), log_msg.strip())
537        full_log_msg = "{}{}{}".format(log_time, log_descr, log_msg)
538        self.xml_pts_running_log.text += "{}\n".format(str(full_log_msg))
539
540        if ctypes.c_int(log_type).value == LOG_TYPE_FINAL_VERDICT:
541            indx = log_msg.find(VERDICT)
542            if indx == 0:
543                if self.pts_test_result == VERDICT_STRINGS['RESULT_INCOMP']:
544                    if VERDICT_STRINGS['RESULT_INCONC'] in log_msg:
545                        self.pts_test_result = VERDICT_STRINGS['RESULT_INCONC']
546                    elif VERDICT_STRINGS['RESULT_FAIL'] in log_msg:
547                        self.pts_test_result = VERDICT_STRINGS['RESULT_FAIL']
548                    elif VERDICT_STRINGS['RESULT_PASS'] in log_msg:
549                        self.pts_test_result = VERDICT_STRINGS['RESULT_PASS']
550                    elif VERDICT_STRINGS['RESULT_NONE'] in log_msg:
551                        self.pts_test_result = VERDICT_STRINGS['RESULT_NONE']
552        return True
553
554    def ImplicitSend(self, description, style):
555        """ ImplicitSend callback
556
557        Implicit Send Styles:
558            MMI_Style_Ok_Cancel1 =     0x11041, Simple prompt           | OK, Cancel buttons      | Default: OK
559            MMI_Style_Ok_Cancel2 =     0x11141, Simple prompt           | Cancel button           | Default: Cancel
560            MMI_Style_Ok1 =            0x11040, Simple prompt           | OK button               | Default: OK
561            MMI_Style_Yes_No1 =        0x11044, Simple prompt           | Yes, No buttons         | Default: Yes
562            MMI_Style_Yes_No_Cancel1 = 0x11043, Simple prompt           | Yes, No buttons         | Default: Yes
563            MMI_Style_Abort_Retry1 =   0x11042, Simple prompt           | Abort, Retry buttons    | Default: Abort
564            MMI_Style_Edit1 =          0x12040, Request for data input  | OK, Cancel buttons      | Default: OK
565            MMI_Style_Edit2 =          0x12140, Select item from a list | OK, Cancel buttons      | Default: OK
566
567        Handling
568            MMI_Style_Ok_Cancel1
569                OK = return "OK"
570                Cancel = return 0
571
572            MMI_Style_Ok_Cancel2
573                OK = return "OK"
574                Cancel = return 0
575
576            MMI_Style_Ok1
577                OK = return "OK", this version should not return 0
578
579            MMI_Style_Yes_No1
580                Yes = return "OK"
581                No = return 0
582
583            MMI_Style_Yes_No_Cancel1
584                Yes = return "OK"
585                No = return 0
586                Cancel = has been deprecated
587
588            MMI_Style_Abort_Retry1
589                Abort = return 0
590                Retry = return "OK"
591
592            MMI_Style_Edit1
593                OK = return expected string
594                Cancel = return 0
595
596            MMI_Style_Edit2
597                OK = return expected string
598                Cancel = return 0
599
600        Receives ImplicitSend messages
601        Description format is as following:
602        {MMI_ID,Test Name,Layer Name}MMI Action\n\nDescription: MMI Description
603        """
604        descr_str = (ctypes.c_char_p(description).value).decode("utf-8")
605        # Sleep recommended by PTS.
606        time.sleep(1)
607        indx = descr_str.find('}')
608        implicit_send_info = descr_str[1:(indx)]
609        self.current_implicit_send_description = descr_str[(indx + 1):]
610        items = implicit_send_info.split(',')
611        implicit_send_info_id = items[0]
612        implicit_send_info_test_case = items[1]
613        self.pts_profile_mmi_request = items[2]
614        self.log.info(
615            "OnImplicitSend() has been called with the following parameters:\n"
616        )
617        self.log.info("\t\tproject_name = {0:s}".format(
618            self.pts_profile_mmi_request))
619        self.log.info("\t\tid = {0:s}".format(implicit_send_info_id))
620        self.log.info(
621            "\t\ttest_case = {0:s}".format(implicit_send_info_test_case))
622        self.log.info("\t\tdescription = {0:s}".format(
623            self.current_implicit_send_description))
624        self.log.info("\t\tstyle = {0:#X}".format(ctypes.c_int(style).value))
625        self.log.info("")
626        try:
627            self.next_action = int(implicit_send_info_id)
628        except Exception as err:
629            self.log.error(
630                "Setting verdict to RESULT_FAIL, exception found: {}".format(
631                    err))
632            self.pts_test_result = VERDICT_STRINGS['RESULT_FAIL']
633        res = b'OK'
634        if len(self.extra_answers) > 0:
635            res = self.extra_answers.pop(0).encode()
636        self.log.info("Sending Response: {}".format(res))
637        return res
638
639    def log_results(self, test_name):
640        """Log results.
641
642        Saves the sniffer results in cfa format and clears the sniffer.
643
644        Args:
645            test_name: string, name of the test run.
646        """
647        self.pts_library.SnifferCanSaveEx.restype = ctypes.c_bool
648        canSave = ctypes.c_bool(self.pts_library.SnifferCanSaveEx()).value
649        self.pts_library.SnifferCanSaveAndClearEx.restype = ctypes.c_bool
650        canSaveClear = ctypes.c_bool(
651            self.pts_library.SnifferCanSaveAndClearEx()).value
652        file_name = "\\{}.cfa".format(self.test_log_prefix).encode()
653        path = self.test_log_directory.encode() + file_name
654
655        if canSave == True:
656            self.pts_library.SnifferSaveEx.argtypes = [ctypes.c_char_p]
657            self.pts_library.SnifferSaveEx(path)
658        else:
659            self.pts_library.SnifferSaveAndClearEx.argtypes = [ctypes.c_char_p]
660            self.pts_library.SnifferSaveAndClearEx(path)
661        end_time = time.time() + 60
662        while self.sniffer_ready == False and end_time > time.time():
663            self.log.info("Waiting for sniffer to be ready...")
664            time.sleep(1)
665        if self.sniffer_ready == False:
666            raise BluetoothPtsSnifferError(
667                "Sniffer not ready after 60 seconds.")
668
669    def execute_test(self, test_name, test_timeout=60):
670        """Execute the input test name.
671
672        Preps PTS to run the test and waits up to 2 minutes for all steps
673        in the execution to finish. Cleanup of PTS related objects follows
674        any test verdict.
675
676        Args:
677            test_name: string, name of the test to execute.
678        """
679        today = datetime.now()
680        self.write_xml_pts_pixit_values_for_current_test()
681        # TODO: Find out how to grab the PTS version. Temporarily
682        # hardcoded to v.7.4.1.2.
683        self.xml_pts_pixit.text = (
684            "Test Case Started: {} v.7.4.1.2, {} started on {}\n\n{}".format(
685                self.profile_under_test, test_name,
686                today.strftime("%A, %B %d, %Y, %H:%M:%S"),
687                self.xml_pts_pixit.text))
688
689        self.xml_pts_running_summary.text += "Test case : {} started\n".format(
690            test_name)
691        log_time_formatted = "{:%Y_%m_%d_%H_%M_%S}".format(datetime.now())
692        formatted_test_name = test_name.replace('/', '_')
693        formatted_test_name = formatted_test_name.replace('-', '_')
694        self.test_log_prefix = "{}_{}".format(formatted_test_name,
695                                              log_time_formatted)
696        self.test_log_directory = "{}\\{}\\{}".format(self.log_directory,
697                                                      self.profile_under_test,
698                                                      self.test_log_prefix)
699        os.makedirs(self.test_log_directory)
700        curr_test = test_name.encode()
701
702        self.pts_library.StartTestCaseEx.argtypes = [
703            ctypes.c_char_p, ctypes.c_char_p, ctypes.c_bool
704        ]
705        res = self.pts_library.StartTestCaseEx(
706            curr_test, self.profile_under_test.encode(), True)
707        self.log.info("Test has been started with result {0:s}".format(
708            str(res)))
709
710        # Wait till verdict is received
711        self.log.info("Begin Test Execution... waiting for verdict.")
712        end_time = time.time() + test_timeout
713        while self.pts_test_result == VERDICT_STRINGS[
714                'RESULT_INCOMP'] and time.time() < end_time:
715            time.sleep(1)
716        self.log.info("End Test Execution... Verdict {}".format(
717            self.pts_test_result))
718
719        # Clean up after test is done
720        self.pts_library.TestCaseFinishedEx.argtypes = [
721            ctypes.c_char_p, ctypes.c_char_p
722        ]
723        res = self.pts_library.TestCaseFinishedEx(
724            curr_test, self.profile_under_test.encode())
725
726        self.log_results(test_name)
727        self.xml_pts_running_summary.text += "{} finished\n".format(test_name)
728        # Add the log results to the XML output
729        self.xml_root.append(self.xml_pts_pixit)
730        self.xml_root.append(self.xml_pts_running_log)
731        self.xml_root.append(self.xml_pts_running_summary)
732        rough_string = ET.tostring(self.xml_root,
733                                   encoding='utf-8',
734                                   method='xml')
735        reparsed = minidom.parseString(rough_string)
736        with open(
737                "{}\\{}.xml".format(self.test_log_directory,
738                                    self.test_log_prefix), "w") as writter:
739            writter.write(
740                reparsed.toprettyxml(indent="  ", encoding="utf-8").decode())
741
742        if self.pts_test_result is VERDICT_STRINGS['RESULT_PASS']:
743            return True
744        return False
745
746    """Observer functions"""
747
748    def bind_to(self, callback):
749        """ Callbacks to add to the observer.
750        This is used for DUTS automatic responses (ImplicitSends local
751        implementation).
752        """
753        self._observers.append(callback)
754
755    @property
756    def next_action(self):
757        return self._next_action
758
759    @next_action.setter
760    def next_action(self, action):
761        self._next_action = action
762        for callback in self._observers:
763            callback(self._next_action)
764
765    """End Observer functions"""
766