1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17"""Apollo Commander through USB/UART interface.
18
19It uses python serial lib to communicate to a Apollo device.
20Some of the commander may not work yet, pending on the final version of the
21commander implementation.
22
23Typical usage examples:
24
25    To get a list of all apollo devices:
26    >>> devices = apollo_lib.get_devices()
27
28    To work with a specific apollo device:
29    >>> apollo = apollo_lib.Device(serial_number='ABCDEF0123456789',
30    >>> commander_port='/dev/ttyACM0')
31
32    To send a single command:
33    >>> apollo.cmd('PowOff')
34
35    To send a list of commands:
36    >>> apollo.cmd(['PowOff', 'PowOn', 'VolUp', 'VolDown']
37"""
38from __future__ import absolute_import
39from __future__ import division
40from __future__ import print_function
41
42import atexit
43import os
44import re
45import subprocess
46import time
47
48import serial
49from acts import tracelogger
50from acts.controllers.buds_lib import logserial
51from acts.controllers.buds_lib.b29_lib import B29Device
52from acts.controllers.buds_lib.dev_utils import apollo_log_decoder
53from acts.controllers.buds_lib.dev_utils import apollo_log_regex
54from acts.controllers.buds_lib.dev_utils import apollo_sink_events
55from logging import Logger
56from retry import retry
57
58logging = tracelogger.TakoTraceLogger(Logger('apollo'))
59
60BAUD_RATE = 115200
61BYTE_SIZE = 8
62PARITY = 'N'
63STOP_BITS = 1
64DEFAULT_TIMEOUT = 3
65WRITE_TO_FLASH_WAIT = 30  # wait 30 sec when writing to external flash.
66LOG_REGEX = re.compile(r'(?P<time_stamp>\d+)\s(?P<msg>.*)')
67STATUS_REGEX = r'(?P<time_stamp>\d+)\s(?P<key>.+?): (?P<value>.+)'
68APOLLO_CHIP = '_Apollo_'
69DEVICE_REGEX = (
70    r'_(?P<device_serial>[A-Z0-9]+)-(?P<interface>\w+)'
71    r'\s->\s(\.\./){2}(?P<port>\w+)'
72)
73OTA_VERIFICATION_FAILED = 'OTA verification failed. corrupt image?'
74OTA_ERASING_PARTITION = 'INFO OTA eras ptns'
75OTA_RECEIVE_CSR_REGEX = r'INFO OTA CSR rcv begin'
76CODEC_REGEX = r'(?P<time_stamp>\d+)\s(?P<codec>\w+) codec is used.'
77BUILD_REGEX = r'\d+\.\d+\.(?P<build>\d+)-?(?P<psoc_build>\d*)-?(?P<debug>\w*)'
78
79
80class Error(Exception):
81    """Module Level Error."""
82
83
84class ResponseError(Error):
85    """cmd Response Error."""
86
87
88class DeviceError(Error):
89    """Device Error."""
90
91
92class ConnectError(Error):
93    """Connection Error."""
94
95
96def get_devices():
97    """Get all available Apollo devices.
98
99    Returns:
100        (list) A list of available devices or empty list if none found
101
102    Raises:
103        Error: raises Error if no Apollo devices or wrong interfaces were found.
104    """
105    devices = []
106    result = os.popen('ls -l /dev/serial/by-id/*%s*' % APOLLO_CHIP).read()
107    if not result:
108        raise Error('No Apollo Devices found.')
109    for line in result.splitlines():
110        match = re.search(DEVICE_REGEX, line)
111        interface = match.group('interface')
112        # TODO: The commander port will always be None.
113        commander_port = None
114        if interface == 'if00':
115            commander_port = '/dev/' + match.group('port')
116            continue
117        elif interface == 'if02':
118            log_port = '/dev/' + match.group('port')
119        else:
120            raise Error('Wrong interface found.')
121        device_serial = match.group('device_serial')
122
123        device = {
124            'commander_port': commander_port,
125            'log_port': log_port,
126            'serial_number': device_serial
127        }
128        devices.append(device)
129    return devices
130
131
132class BudsDevice(object):
133    """Provides a simple class to interact with Apollo."""
134
135    def __init__(self, serial_number, commander_port=None, log_port=None,
136                 serial_logger=None):
137        """Establish a connection to a Apollo.
138
139        Open a connection to a device with a specific serial number.
140
141        Raises:
142            ConnectError: raises ConnectError if cannot open the device.
143        """
144        self.set_log = False
145        self.connection_handle = None
146        self.device_closed = False
147        if serial_logger:
148            self.set_logger(serial_logger)
149        self.pc = logserial.PortCheck()
150        self.serial_number = serial_number
151        # TODO (kselvakumaran): move this to an interface device class that
152        # apollo_lib.BudsDevice should derive from
153        if not commander_port and not log_port:
154            self.get_device_ports(self.serial_number)
155        if commander_port:
156            self.commander_port = commander_port
157        if log_port:
158            self.log_port = log_port
159        self.apollo_log = None
160        self.cmd_log = None
161        self.apollo_log_regex = apollo_log_regex
162        self.dut_type = 'apollo'
163
164        # TODO (kselvakumaran): move this to an interface device class that
165        # apollo_lib.BudsDevice should derive from
166
167        try:  # Try to open the device
168            self.connection_handle = logserial.LogSerial(
169                self.commander_port, BAUD_RATE, flush_output=False,
170                serial_logger=logging)
171            self.wait_for_commander()
172        except (serial.SerialException, AssertionError, ConnectError) as e:
173            logging.error(
174                'error opening device {}: {}'.format(serial_number, e))
175            raise ConnectError('Error open the device.')
176        # disable sleep on idle
177        self.stay_connected_state = 1
178        atexit.register(self.close)
179
180    def set_logger(self, serial_logger):
181        global logging
182        logging = serial_logger
183        self.set_log = True
184        if self.connection_handle:
185            self.connection_handle.set_logger(serial_logger)
186
187    def get_device_ports(self, serial_number):
188        commander_query = {'ID_SERIAL_SHORT': serial_number,
189                           'ID_USB_INTERFACE_NUM': '00'}
190        log_query = {'ID_SERIAL_SHORT': serial_number,
191                     'ID_USB_INTERFACE_NUM': '02'}
192        self.commander_port = self.pc.search_port_by_property(commander_query)
193        self.log_port = self.pc.search_port_by_property(log_query)
194        if not self.commander_port and not self.log_port:
195            raise ConnectError(
196                'BudsDevice serial number %s not found' % serial_number)
197        else:
198            if not self.commander_port:
199                raise ConnectError('No devices found')
200            self.commander_port = self.commander_port[0]
201            self.log_port = self.log_port[0]
202
203    def get_all_log(self):
204        return self.connection_handle.get_all_log()
205
206    def query_log(self, from_timestamp, to_timestamp):
207        return self.connection_handle.query_serial_log(
208            from_timestamp=from_timestamp, to_timestamp=to_timestamp)
209
210    def send(self, cmd):
211        """Sends the command to serial port.
212
213        It does not care about whether the cmd is successful or not.
214
215        Args:
216            cmd: The passed command
217
218        Returns:
219            The number of characters written
220        """
221        logging.debug(cmd)
222        # with self._lock:
223        self.connection_handle.write(cmd)
224        result = self.connection_handle.read()
225        return result
226
227    def cmd(self, cmds, wait=None):
228        """Sends the commands and check responses.
229
230        Valid cmd will return something like '585857269 running cmd VolUp'.
231        Invalid cmd will log an error and return something like '585826369 No
232        command vol exists'.
233
234        Args:
235            cmds: The commands to the commander.
236            wait: wait in seconds for the cmd response.
237
238        Returns:
239            (list) The second element of the array returned by _cmd.
240        """
241        if isinstance(cmds, str):
242            cmds = [cmds]
243        results = []
244        for cmd in cmds:
245            _, result = self._cmd(cmd, wait=wait)
246            results.append(result)
247        return results
248
249    def _cmd(self, cmd, wait=None, throw_error=True):
250        """Sends a single command and check responses.
251
252        Valid cmd will return something like '585857269 running cmd VolUp'.
253        Invalid cmd will log an error and return something like '585826369 No
254        command vol exists'. Some cmd will return multiple lines of output.
255        eg. 'menu'.
256
257        Args:
258            cmd: The command to the commander.
259            wait: wait in seconds for the cmd response.
260            throw_error: Throw exception on True
261
262        Returns:
263            (list) containing such as the following:
264            [<return value>, [<protobuf dictionary>, str]]
265            Hex strings (protobuf) are replaced by its decoded dictionaries
266            and stored in an arry along with other string returned fom the
267            device.
268
269        Raises:
270            DeviceError: On Error.(Optional)
271        """
272        self.connection_handle.write(cmd)
273
274        while self.connection_handle.is_logging:
275            time.sleep(.01)
276        if wait:
277            self.wait(wait)
278        # Using read_serial_port as readlines is a blocking call until idle.
279        res = self.read_serial_port()
280        result = []
281        self.cmd_log = res
282        command_resv = False
283        # TODO: Cleanup the usage of the two booleans below.
284        command_finish = False
285        command_rejected = False
286        # for line in iter_res:
287        for line in res:
288            if isinstance(line, dict):
289                if 'COMMANDER_RECV_COMMAND' in line.values():
290                    command_resv = True
291                elif 'COMMANDER_REJECT_COMMAND' in line.values():
292                    logging.info('Command rejected')
293                    command_rejected = True
294                    break
295                elif 'COMMANDER_FINISH_COMMAND' in line.values():
296                    command_finish = True
297                    break
298                elif (command_resv and not command_finish and
299                      not command_rejected):
300                    result.append(line)
301            # TODO(jesussalinas): Remove when only encoded lines are required
302            elif command_resv and not command_finish and not command_rejected:
303                if 'running cmd' not in line:
304                    result.append(line)
305        success = True
306        if command_rejected or not command_resv:
307            success = False
308            if throw_error:
309                logging.info(res)
310                raise DeviceError('Unknown command %s' % cmd)
311        return success, result
312
313    def get_pdl(self):
314        """Returns the PDL stack dictionary.
315
316        The PDL stack stores paired devices of Apollo. Each PDL entry include
317        mac_address, flags, link_key, priority fields.
318
319        Returns:
320            list of pdl dicts.
321        """
322        # Get the mask from CONNLIB41:
323        # CONNLIB41 typically looks something like this: 2403 fff1
324        # 2403 fff1 is actually two 16-bit words of a 32-bit integer
325        # like 0xfff12403 . This tells the chronological order of the entries
326        # in the paired device list one nibble each. LSB to MSB corresponds to
327        # CONNLIB42 through CONNLIB49. So, the above tells us that the device at
328        # 0x2638 is the 3rd most recent entry 0x2639 the latest entry etc. As
329        # a device re-pairs the masks are updated.
330        response = []
331        mask = 'ffffffff'
332        res = self.cmd('GetPSHex 0x2637')
333        if len(res[0]) == 0:
334            logging.warning('Error reading PDL mask @ 0x2637')
335            return response
336        else:
337            regexp = r'\d+\s+(?P<m1>....)\s(?P<m2>....)'
338            match = re.match(regexp, res[0][0])
339            if match:
340                connlib41 = match.group('m2') + match.group('m1')
341                mask = connlib41[::-1]
342                logging.debug('PDL mask: %s' % mask)
343
344        # Now get the MAC/link key
345        mask_idx = 0
346        for i in range(9784, 9883):
347            types = {}
348            res = self.cmd('GetPSHex ' + '%0.2x' % i)
349            if len(res[0]) == 0:
350                break
351            else:
352                regexp = ('\d+\s+(?P<Mac>....\s....\s....)\s'
353                          '(?P<Flags>....\s....)\s(?P<Linkkey>.*)')
354                match = re.search(regexp, res[0][0])
355                if match:
356                    mac_address = match.group('Mac').replace(' ', '').upper()
357                    formatted_mac = ''
358                    for i in range(len(mac_address)):
359                        formatted_mac += mac_address[i]
360                        if i % 2 != 0 and i < (len(mac_address) - 1):
361                            formatted_mac += ':'
362                    types['mac_address'] = formatted_mac
363                    types['flags'] = match.group('Flags').replace(' ', '')
364                    types['link_key'] = match.group('Linkkey').replace(' ', '')
365                    types['priority'] = int(mask[mask_idx], 16)
366                    mask_idx += 1
367                    response.append(types)
368
369        return response
370
371    def set_pairing_mode(self):
372        """Enter Bluetooth Pairing mode."""
373        logging.debug('Inside set_pairing_mode()...')
374        try:
375            return self.cmd('Pair')
376        except DeviceError:
377            logging.exception('Pair cmd failed')
378
379    # TODO (kselvakumaran): move this to an interface BT class that
380    # apollo_lib.BudsDevice should derive from
381    def turn_on_bluetooth(self):
382        return True
383
384    # TODO (kselvakumaran): move this to an interface BT class that
385    # apollo_lib.BudsDevice should derive from
386    def is_bt_enabled(self):
387        """Check if BT is enabled.
388
389        (TODO:weisu)Currently it is always true since there is no way to disable
390        BT in apollo
391
392        Returns:
393            True if BT is enabled.
394        """
395        logging.debug('Inside is_bt_enabled()...')
396        return True
397
398    def panic(self):
399        """Hitting a panic, device will be automatically reset after 5s."""
400        logging.debug('Inside panic()...')
401        try:
402            self.send('panic')
403        except serial.SerialException:
404            logging.exception('panic cmd failed')
405
406    def power(self, cmd):
407        """Controls the power state of the device.
408
409        Args:
410            cmd: If 'Off', powers the device off. Otherwise, powers the device
411                 on.
412        """
413        logging.debug('Inside power({})...'.format(cmd))
414        mode = '0' if cmd == 'Off' else '1'
415        cmd = 'Pow ' + mode
416        try:
417            return self.cmd(cmd)
418        except DeviceError:
419            logging.exception('{} cmd failed'.format(cmd))
420
421    def charge(self, state):
422        """Charging Control of the device.
423
424        Args:
425          state: '1/0' to enable/disable charging.
426        """
427        logging.debug('Inside charge({})...'.format(state))
428        cmd = 'chg ' + state
429        try:
430            self.cmd(cmd)
431        except DeviceError:
432            logging.exception('{} cmd failed'.format(cmd))
433
434    def get_battery_level(self):
435        """Get the battery charge level.
436
437        Returns:
438            charge percentage string.
439
440        Raises:
441            DeviceError: GetBatt response error.
442        """
443        response = self.cmd('GetBatt')
444        for line in response[0]:
445            if line.find('Batt:') > -1:
446                # Response if in this format '<messageID> Batt: <percentage>'
447                return line.split()[2]
448        raise DeviceError('Battery Level not found in GetBatt response')
449
450    def get_gas_gauge_current(self):
451        """Get the Gauge current value.
452
453        Returns:
454            Float value with the info
455
456        Raises:
457            DeviceError: I2CRead response error.
458        """
459        response = self.cmd('I2CRead 2 0x29')
460        for line in response[0]:
461            if line.find('value') > -1:
462                return float.fromhex(line.split()[6].replace(',', ''))
463        raise DeviceError('Current Level not found in I2CRead response')
464
465    def get_gas_gauge_voltage(self):
466        """Get the Gauge voltage value.
467
468        Returns:
469            Float value with the info
470
471        Raises:
472            DeviceError: I2CRead response error.
473        """
474        response = self.cmd('I2CRead 2 0x2A')
475        for line in response[0]:
476            if line.find('value') > -1:
477                return float.fromhex(line.split()[6].replace(',', ''))
478        raise DeviceError('Voltage Level not found in I2CRead response')
479
480    def reset(self, wait=5):
481        """Resetting the device."""
482        logging.debug('Inside reset()...')
483        self.power('Off')
484        self.wait(wait)
485        self.power('On')
486
487    def close(self):
488        if not self.device_closed:
489            self.connection_handle.close()
490            self.device_closed = True
491            if not self.set_log:
492                logging.flush_log()
493
494    def get_serial_log(self):
495        """Retrieve the logs from connection handle."""
496        return self.connection_handle.get_all_log()
497
498    def factory_reset(self):
499        """Erase paired device(s) (bond) data and reboot device."""
500        cmd = 'FactoryReset 1'
501        self.send(cmd)
502        self.wait(5)
503        self.reconnect()
504
505    def reboot(self, reconnect=10, retry_timer=30):
506        """Rebooting the device.
507
508        Args:
509            reconnect: reconnect attempts after reboot, None for no reconnect.
510            retry_timer: wait time in seconds before next connect retry.
511
512        Returns:
513            True if successfully reboot or reconnect.
514        """
515        logging.debug('Inside reboot()...')
516        self.panic()
517        if not reconnect:
518            return True
519        ini_time = time.time()
520        message = 'waiting for {} to shutdown'.format(self.serial_number)
521        logging.info(message)
522        while True:
523            alive = self.connection_handle.is_port_alive()
524            if not alive:
525                logging.info('rebooted')
526                break
527            if time.time() - ini_time > 60:
528                logging.info('Shutdown timeouted')
529                break
530            time.sleep(0.5)
531        return self.reconnect(reconnect, retry_timer)
532
533    def reconnect(self, iterations=30, retry_timer=20):
534        """Reconnect to the device.
535
536        Args:
537            iterations: Number of retry iterations.
538            retry_timer: wait time in seconds before next connect retry.
539
540        Returns:
541            True if reconnect to the device successfully.
542
543        Raises:
544            DeviceError: Failed to reconnect.
545        """
546        logging.debug('Inside reconnect()...')
547        for i in range(iterations):
548            try:
549                # port might be changed, refresh the port list.
550                self.get_device_ports(self.serial_number)
551                message = 'commander_port: {}, log_port: {}'.format(
552                    self.commander_port, self.log_port)
553                logging.info(message)
554                self.connection_handle.refresh_port_connection(
555                    self.commander_port)
556                # Sometimes there might be sfome delay when commander is
557                # functioning.
558                self.wait_for_commander()
559                return True
560            except Exception as e:  # pylint: disable=broad-except
561                message = 'Fail to connect {} times due to {}'.format(
562                    i + 1, e)
563                logging.warning(message)
564                # self.close()
565                time.sleep(retry_timer)
566        raise DeviceError('Cannot reconnect to %s with %d attempts.',
567                          self.commander_port, iterations)
568
569    @retry(Exception, tries=4, delay=1, backoff=2)
570    def wait_for_commander(self):
571        """Wait for commander to function.
572
573        Returns:
574            True if commander worked.
575
576        Raises:
577            DeviceError: Failed to bring up commander.
578        """
579        # self.Flush()
580        result = self.cmd('menu')
581        if result:
582            return True
583        else:
584            raise DeviceError('Cannot start commander.')
585
586    def wait(self, timeout=1):
587        """Wait for the device."""
588        logging.debug('Inside wait()...')
589        time.sleep(timeout)
590
591    def led(self, cmd):
592        """LED control of the device."""
593        message = 'Inside led({})...'.format(cmd)
594        logging.debug(message)
595        cmd = 'EventUsrLeds' + cmd
596        try:
597            return self.cmd(_evt_hex(cmd))
598        except DeviceError:
599            logging.exception('LED cmd failed')
600
601    def volume(self, key, times=1):
602        """Volume Control. (Down/Up).
603
604        Args:
605            key: Down --Decrease a volume.
606                 Up --Increase a volume.
607            times: Simulate number of swipes.
608
609        Returns:
610            (int) Volume level.
611
612        Raises:
613            DeviceError
614        """
615        message = 'Inside volume({}, {})...'.format(key, times)
616        logging.debug(message)
617        updown = {
618            'Up': '1',
619            'Down': '0',
620        }
621        cmds = ['ButtonSwipe ' + updown[key]] * times
622        logging.info(cmds)
623        try:
624            self.cmd(cmds)
625            for line in self.cmd_log:
626                if isinstance(line, dict):
627                    if 'id' in line and line['id'] == 'VOLUME_CHANGE':
628                        if 'data' in line and line['data']:
629                            return int(line['data'])
630        except DeviceError:
631            logging.exception('ButtonSwipe cmd failed')
632
633    def menu(self):
634        """Return a list of supported commands."""
635        logging.debug('Inside menu()...')
636        try:
637            return self.cmd('menu')
638        except DeviceError:
639            logging.exception('menu cmd failed')
640
641    def set_ohd(self, mode='AUTO'):
642        """Manually set the OHD status and override auto-detection.
643
644        Args:
645            mode: ON --OHD manual mode with on-ear state.
646                  OFF --OHD manual mode with off-ear state.
647                  AUTO --OHD auto-detection mode.
648        Raises:
649            DeviceError: OHD Command failure.
650        """
651        logging.debug('Inside set_ohd()...')
652        try:
653            if mode != 'AUTO':
654                # Set up OHD manual mode
655                self.cmd('Test 14 0 2 1')
656                if mode == 'ON':
657                    # Detects on-ear
658                    self.cmd('Test 14 0 2 1 0x3')
659                else:
660                    # Detects off-ear
661                    self.cmd('Test 14 0 2 1 0x0')
662            else:
663                # Default mode (auto detect.)
664                self.cmd('Test 14 0 2 0')
665        except DeviceError:
666            logging.exception('OHD cmd failed')
667
668    def music_control_events(self, cmd, regexp=None, wait=.5):
669        """Sends the EvtHex to control media player.
670
671        Arguments:
672            cmd: the command to perform.
673            regexp: Optional pattern to validate the event logs.
674
675        Returns:
676            Boolean: True if the command triggers the correct events on the
677                     device, False otherwise.
678
679        # TODO(nviboonchan:) Add more supported commands.
680        Supported commands:
681            'PlayPause'
682            'VolumeUp'
683            'VolumeDown',
684        """
685        cmd_regexp = {
686            # Play/ Pause would need to pass the regexp argument since it's
687            # sending the same event but returns different responses depending
688            # on the device state.
689            'VolumeUp': apollo_log_regex.VOLUP_REGEX,
690            'VolumeDown': apollo_log_regex.VOLDOWN_REGEX,
691        }
692        if not regexp:
693            if cmd not in cmd_regexp:
694                logmsg = 'Expected pattern is not defined for event %s' % cmd
695                logging.exception(logmsg)
696                return False
697            regexp = cmd_regexp[cmd]
698        self.cmd('EvtHex %s' % apollo_sink_events.SINK_EVENTS['EventUsr' + cmd],
699                 wait=wait)
700        for line in self.cmd_log:
701            if isinstance(line, str):
702                if re.search(regexp, line):
703                    return True
704            elif isinstance(line, dict):
705                if line.get('id', None) == 'AVRCP_PLAY_STATUS_CHANGE':
706                    return True
707        return False
708
709    def avrcp(self, cmd):
710        """sends the Audio/Video Remote Control Profile (avrcp) control command.
711
712        Supported commands:
713            'PlayPause'
714            'Stop'
715            'SkipForward',
716            'SkipBackward',
717            'FastForwardPress',
718            'FastForwardRelease',
719            'RewindPress',
720            'RewindRelease',
721            'ShuffleOff',
722            'ShuffleAllTrack',
723            'ShuffleGroup',
724            'RepeatOff':,
725            'RepeatSingleTrack',
726            'RepeatAllTrack',
727            'RepeatGroup',
728            'Play',
729            'Pause',
730            'ToggleActive',
731            'NextGroupPress',
732            'PreviousGroupPress',
733            'NextGroupRelease',
734            'PreviousGroupRelease',
735
736        Args:
737            cmd: The avrcp command.
738
739        """
740        cmd = 'EventUsrAvrcp' + cmd
741        logging.debug(cmd)
742        try:
743            self.cmd(_evt_hex(cmd))
744        except DeviceError:
745            logging.exception('avrcp cmd failed')
746
747    def enable_log(self, levels=None):
748        """Enable specified logs."""
749        logging.debug('Inside enable_log()...')
750        if levels is None:
751            levels = ['ALL']
752        masks = hex(
753            sum([int(apollo_sink_events.LOG_FEATURES[x], 16) for x in levels]))
754        try:
755            self.cmd('LogOff %s' % apollo_sink_events.LOG_FEATURES['ALL'])
756            return self.cmd('LogOn %s' % masks)
757        except DeviceError:
758            logging.exception('Enable log failed')
759
760    def disable_log(self, levels=None):
761        """Disable specified logs."""
762        logging.debug('Inside disable_log()...')
763        if levels is None:
764            levels = ['ALL']
765        masks = hex(
766            sum([int(apollo_sink_events.LOG_FEATURES[x], 16) for x in levels]))
767        try:
768            self.cmd('LogOn %s' % apollo_sink_events.LOG_FEATURES['ALL'])
769            return self.cmd('LogOff %s' % masks)
770        except DeviceError:
771            logging.exception('Disable log failed')
772
773    def write_to_flash(self, file_name=None):
774        """Write file to external flash.
775
776        Note: Assume pv is installed. If not, install it by
777              'apt-get install pv'.
778
779        Args:
780            file_name: Full path file name.
781
782        Returns:
783            Boolean: True if write to partition is successful. False otherwise.
784        """
785        logging.debug('Inside write_to_flash()...')
786        if not os.path.isfile(file_name):
787            message = 'DFU file %s not found.'.format(file_name)
788            logging.exception(message)
789            return False
790        logging.info(
791            'Write file {} to external flash partition ...'.format(file_name))
792        image_size = os.path.getsize(file_name)
793        logging.info('image size is {}'.format(image_size))
794        results = self.cmd('Ota {}'.format(image_size), wait=3)
795        logging.debug('Result of Ota command' + str(results))
796        if any(OTA_VERIFICATION_FAILED in result for result in results[0]):
797            return False
798        # finished cmd Ota
799        if (any('OTA_ERASE_PARTITION' in result.values() for result in
800                results[0] if
801                isinstance(result, dict)) or
802                any('OTA erasd ptns' in result for result in results[0])):
803            try:
804                # -B: buffer size in bytes, -L rate-limit in B/s.
805                subcmd = ('pv --force -B 160 -L 10000 %s > %s' %
806                          (file_name, self.commander_port))
807                logging.info(subcmd)
808                p = subprocess.Popen(subcmd, stdout=subprocess.PIPE, shell=True)
809            except OSError:
810                logging.exception(
811                    'pv not installed, please install by: apt-get install pv')
812                return False
813            try:
814                res = self.read_serial_port(read_until=6)
815            except DeviceError:
816                logging.exception('Unable to read the device port')
817                return False
818            for line in res:
819                if isinstance(line, dict):
820                    logging.info(line)
821                else:
822                    match = re.search(OTA_RECEIVE_CSR_REGEX, line)
823                    if match:
824                        logging.info(
825                            'OTA Image received. Transfer is in progress...')
826                        # Polling during a transfer could miss the final message
827                        # when the device reboots, so we wait until the transfer
828                        # completes.
829                        p.wait()
830                        return True
831            # No image transfer in progress.
832            return False
833        else:
834            return False
835
836    def flash_from_file(self, file_name, reconnect=True):
837        """Upgrade Apollo from an image file.
838
839        Args:
840            file_name: DFU file name. eg. /google/data/ro/teams/wearables/
841                       apollo/ota/master/v76/apollo.dfu
842            reconnect: True to reconnect the device after flashing
843        Returns:
844            Bool: True if the upgrade is successful. False otherwise.
845        """
846        logging.debug('Inside flash_from_file()...')
847        if self.write_to_flash(file_name):
848            logging.info('OTA image transfer is completed')
849            if reconnect:
850                # Transfer is completed; waiting for the device to reboot.
851                logging.info('wait to make sure old connection disappears.')
852                self.wait_for_reset(timeout=150)
853                self.reconnect()
854                logging.info('BudsDevice reboots successfully after OTA.')
855            return True
856
857    def open_mic(self, post_delay=5):
858        """Open Microphone on the device using EvtHex command.
859
860        Args:
861            post_delay: time delay in seconds after the microphone is opened.
862
863        Returns:
864            Returns True or False based on whether the command was executed.
865        """
866        logging.debug('Inside open_mic()...')
867        success, _ = self._cmd('Voicecmd 1', post_delay)
868        return success
869
870    def close_mic(self, post_delay=5):
871        """Close Microphone on the device using EvtHex command.
872
873        Args:
874            post_delay: time delay in seconds after the microphone is closed.
875
876        Returns:
877            Returns true or false based on whether the command was executed.
878        """
879        logging.debug('Inside close_mic()...')
880        success, _ = self._cmd('Voicecmd 0', post_delay)
881        return success
882
883    def touch_key_press_event(self, wait=1):
884        """send key press event command.
885
886        Args:
887            wait: Inject delay after key press to simulate real touch event .
888        """
889        logging.debug('Inside KeyPress()...')
890        self._cmd('Touch 6')
891        self.wait(wait)
892
893    def touch_tap_event(self, wait_if_pause=10):
894        """send key release event after key press to simulate single tap.
895
896        Args:
897            wait_if_pause: Inject delay after avrcp pause was detected.
898
899        Returns:
900            Returns False if avrcp play orp ause not detected else True.
901        """
902        logging.debug('Inside Touch Tap event()...')
903        self._cmd('Touch 4')
904        for line in self.cmd_log:
905            if 'avrcp play' in line:
906                logging.info('avrcp play detected')
907                return True
908            if 'avrcp pause' in line:
909                logging.info('avrcp pause detected')
910                self.wait(wait_if_pause)
911                return True
912        return False
913
914    def touch_hold_up_event(self):
915        """Open Microphone on the device using touch hold up command.
916
917        Returns:
918            Returns True or False based on whether the command was executed.
919        """
920        logging.debug('Inside open_mic()...')
921        self._cmd('Touch 3')
922        for line in self.cmd_log:
923            if 'Button 1 LONG_BEGIN' in line:
924                logging.info('mic open success')
925                return True
926        return False
927
928    def touch_hold_down_event(self):
929        """Close Microphone on the device using touch hold down command.
930
931        Returns:
932            Returns true or false based on whether the command was executed.
933        """
934        logging.debug('Inside close_mic()...')
935        self._cmd('Touch 8')
936        for line in self.cmd_log:
937            if 'Button 1 LONG_END' in line:
938                logging.info('mic close success')
939                return True
940        return False
941
942    def tap(self):
943        """Performs a Tap gesture."""
944        logging.debug('Inside tap()')
945        self.cmd('ButtonTap 0')
946
947    def hold(self, duration):
948        """Tap and hold a button.
949
950        Args:
951            duration: (int) duration in milliseconds.
952        """
953        logging.debug('Inside hold()')
954        self.cmd('ButtonHold ' + str(duration))
955
956    def swipe(self, direction):
957        """Perform a swipe gesture.
958
959        Args:
960            direction: (int) swipe direction 1 forward, 0 backward.
961        """
962        logging.debug('Inside swipe()')
963        self.cmd('ButtonSwipe ' + direction)
964
965    def get_pskey(self, key):
966        """Fetch value from persistent store."""
967        try:
968            cmd = 'GetPSHex ' + apollo_sink_events.PSKEY[key]
969        except KeyError:
970            raise DeviceError('PS Key: %s not found' % key)
971        pskey = ''
972        try:
973            ret = self.cmd(cmd)
974            for result in ret[0]:
975                if not re.search(r'pskey', result.lower()) and LOG_REGEX.match(
976                        result):
977                    # values are broken into words separated by spaces.
978                    pskey += LOG_REGEX.match(result).group('msg').replace(' ',
979                                                                          '')
980                else:
981                    continue
982        except DeviceError:
983            logging.exception('GetPSHex cmd failed')
984        return pskey
985
986    def get_version(self):
987        """Return a device version information.
988
989        Note: Version information is obtained from the firmware loader. Old
990        information is lost when firmware is updated.
991        Returns:
992            A dictionary of device version info. eg.
993            {
994                'Fw Build': '73',
995                'OTA Status': 'No OTA performed before this boot',
996            }
997
998        """
999        logging.debug('Inside get_version()...')
1000        success, result = self._cmd('GetVer', throw_error=False)
1001        status = {}
1002        if result:
1003            for line in result:
1004                if isinstance(line, dict):
1005                    status['build'] = line['vm_build_number']
1006                    status['psoc_build'] = line['psoc_version']
1007                    status['debug'] = line['csr_fw_debug_build']
1008                    status['Fw Build Label'] = line['build_label']
1009                    if 'last_ota_status' in line.keys():
1010                        # Optional value in the proto response
1011                        status['OTA Status'] = line['last_ota_status']
1012                    else:
1013                        status['OTA Status'] = 'No info'
1014        return success, status
1015
1016    def get_earcon_version(self):
1017        """Return a device Earson version information.
1018
1019        Returns:
1020            Boolean:  True if success, False otherwise.
1021            String: Earon Version e.g. 7001 0201 6100 0000
1022
1023        """
1024        # TODO(nviboonchan): Earcon version format would be changed in the
1025        # future.
1026        logging.debug('Inside get_earcon_version()...')
1027        result = self.get_pskey('PSKEY_EARCON_VERSION')
1028        if result:
1029            return True, result
1030        else:
1031            return False, None
1032
1033    def get_bt_status(self):
1034        """Return a device bluetooth connection information.
1035
1036        Returns:
1037            A dictionary of bluetooth status. eg.
1038            {
1039                'Comp. App': 'FALSE',
1040               'HFP (pri.)', 'FALSE',
1041               'HFP (sec.)': 'FALSE',
1042               'A2DP (pri.)': 'FALSE',
1043               'A2DP (sec.)': 'FALSE',
1044               'A2DP disconnects': '3',
1045               'A2DP Role (pri.)': 'slave',
1046               'A2DP RSSI (pri.)': '-Touch'
1047            }
1048        """
1049        logging.debug('Inside get_bt_status()...')
1050        return self._get_status('GetBTStatus')
1051
1052    def get_conn_devices(self):
1053        """Gets the BT connected devices.
1054
1055        Returns:
1056            A dictionary of BT connected devices. eg.
1057            {
1058                'HFP Pri': 'xxxx',
1059                'HFP Sec': 'xxxx',
1060                'A2DP Pri': 'xxxx',
1061                'A2DP Sec': 'xxxx',
1062                'RFCOMM devices': 'xxxx',
1063                'CTRL': 'xxxx',
1064                'AUDIO': 'None',
1065                'DEBUG': 'None',
1066                'TRANS': 'None'
1067             }
1068
1069        Raises:
1070            ResponseError: If unexpected response occurs.
1071        """
1072        response_regex = re.compile('[0-9]+ .+: ')
1073        connected_status = {}
1074        response = self.cmd('GetConnDevices')
1075        if not response:
1076            raise ResponseError(
1077                'No response returned by GetConnDevices command')
1078        for line in response[0]:
1079            if response_regex.search(line):
1080                profile, value = line[line.find(' '):].split(':', 1)
1081                connected_status[profile] = value
1082        if not connected_status:
1083            raise ResponseError('No BT Profile Status in response.')
1084        return connected_status
1085
1086    def _get_status(self, cmd):
1087        """Return a device status information."""
1088        status = {}
1089        try:
1090            results = self.cmd(cmd)
1091        except DeviceError as ex:
1092            # logging.exception('{} cmd failed'.format(cmd))
1093            logging.warning('Failed to get device status info.')
1094            raise ex
1095        results = results[0]
1096        for result in results:
1097            match = re.match(STATUS_REGEX, result)
1098            if match:
1099                key = match.group('key')
1100                value = match.group('value')
1101                status.update({key: value})
1102        return status
1103
1104    def is_streaming(self):
1105        """Returns the music streaming status on Apollo.
1106
1107        Returns:
1108            Boolean: True if device is streaming music. False otherwise.
1109        """
1110
1111        status = self.cmd('GetDSPStatus')
1112        if any('active feature mask: 0' in log for log in
1113               status[0]):
1114            return False
1115        elif any('active feature mask: 2' in log for log in
1116                 status[0]):
1117            return True
1118        else:
1119            return False
1120
1121    def is_in_call(self):
1122        """Returns the phone call status on Apollo.
1123
1124        Returns:
1125            Boolean: True if device has incoming call. False otherwise.
1126        """
1127
1128        status = self.cmd('GetDSPStatus')
1129        if not any('Inc' or 'out' in log for log in status[0]):
1130            return False
1131        return True
1132
1133    def is_device_limbo(self):
1134        """Check if device is in Limbo state.
1135
1136        Returns:
1137            Boolean: True if device is in limbo state, False otherwise.
1138        """
1139        device_state = self.get_device_state()
1140        logging.info('BudsDevice "{}" state {}'.format(self.serial_number,
1141                                                       device_state))
1142        return device_state == 'limbo'
1143
1144    def get_device_state(self):
1145        """Get state of the device.
1146
1147        Returns:
1148            String representing the device state.
1149
1150        Raises:
1151            DeviceError: If command fails.
1152        """
1153        _, status = self._cmd('GetDSPStatus')
1154        for stat in status:
1155            if isinstance(stat, dict):
1156                logging.info(stat)
1157                return stat['sink_state'].lower()
1158        raise DeviceError('BudsDevice state not found in GetDSPStatus.')
1159
1160    def set_stay_connected(self, value):
1161        """Run command to set the value for SetAlwaysConnected.
1162
1163        Args:
1164            value: (int) 1 to keep connection engages at all time,
1165                         0 for restoring
1166        Returns:
1167            the set state of type int (0 or 1) or None if not applicable
1168        """
1169
1170        if int(self.version) >= 1663:
1171            self._cmd('SetAlwaysConnected {}'.format(value))
1172            logging.info('Setting sleep on idle to {}'.format(value))
1173            return value
1174
1175    def get_codec(self):
1176        """Get device's current audio codec.
1177
1178        Returns:
1179            String representing the audio codec.
1180
1181        Raises:
1182            DeviceError: If command fails.
1183        """
1184        success, status = self._cmd('get_codec')
1185        logging.info('---------------------------------------')
1186        logging.info(status)
1187        logging.info('---------------------------------------')
1188        if success:
1189            for line in status:
1190                if isinstance(line, dict):
1191                    logging.info('Codec found: %s'.format(line['codec']))
1192                    return line['codec']
1193        raise DeviceError('BudsDevice state not found in get_codec.')
1194
1195    def crash_dump_detection(self):
1196        """Reads crash dump determines if a crash is detected.
1197
1198        Returns:
1199            True if crash detection is supported and if a new crash is found.
1200            False otherwise.
1201        """
1202        # Detects if crashdump output is new
1203        new_crash_regex = r'new crash = ([01]+)'
1204        # filter crashdump for just the trace
1205        crash_stack_regex = r'BASIC(.*)\n[\d]+ APP_STACK(.*)\n'
1206        # remove time stamp commander output
1207        timestamp_remover_regex = '\n[\\d]+ '
1208
1209        logging.debug('Inside IsCrashDumpDetection()...')
1210        cmd_return = self.cmd('CrashDump', wait=1)
1211        crash_dump_str = '\n'.join(cmd_return[0])
1212        logging.info(crash_dump_str)
1213        try:
1214            # check for crash
1215            match = re.search(new_crash_regex, crash_dump_str)
1216            if match is not None:
1217                if match.groups()[0] == '1':  # new crash found
1218                    logging.error('Crash detected!!')
1219                    basic, app_stack = re.search(crash_stack_regex,
1220                                                 crash_dump_str,
1221                                                 re.DOTALL).groups()
1222                    # remove time stamps from capture
1223                    basic = re.sub(timestamp_remover_regex, '', basic)
1224                    app_stack = re.sub(timestamp_remover_regex, '', app_stack)
1225                    # write to log
1226                    # pylint: disable=bad-whitespace
1227                    logging.info(
1228                        '\n&270d = %s\n&270e = %s\n' % (basic, app_stack))
1229                    # pylint: enable=bad-whitespace
1230                    return True
1231                else:  # no new crash
1232                    logging.info('No crash detected')
1233                    return False
1234        except AttributeError:
1235            logging.exception(
1236                'Apollo crash dump output is not in expected format')
1237            raise DeviceError('Apollo crash dump not in expected format')
1238
1239    @property
1240    def version(self):
1241        """Application version.
1242
1243        Returns:
1244            (String) Firmware version.
1245        """
1246        _, result = self.get_version()
1247        return result['build']
1248
1249    @property
1250    def bluetooth_address(self):
1251        """Bluetooth MAC address.
1252
1253        Returns:
1254            a string representing 48bit BT MAC address in Hex.
1255
1256        Raises:
1257            DeviceError: Unable to find BT Address
1258        """
1259        results = self.get_pskey('PSKEY_BDADDR')
1260        if not results:
1261            raise DeviceError('Unable to find BT Address')
1262        logging.info(results)
1263        # Bluetooth lower address part, upper address part and non-significant
1264        # address part.
1265        bt_lap = results[2:8]
1266        bt_uap = results[10:12]
1267        bt_nap = results[12:16]
1268        results = bt_nap + bt_uap + bt_lap
1269
1270        return ':'.join(map(''.join, zip(*[iter(results)] * 2))).upper()
1271
1272    @property
1273    def device_name(self):
1274        """Device Friendly Name.
1275
1276        Returns:
1277            a string representing device friendly name.
1278
1279        Raises:
1280            DeviceError: Unable to find a wearable device name.
1281        """
1282        result = self.get_pskey('PSKEY_DEVICE_NAME')
1283        if not result:
1284            raise DeviceError('Unable to find BudsDevice Name')
1285        logging.info(_to_ascii(result))
1286        return _to_ascii(result)
1287
1288    @property
1289    def stay_connected(self):
1290        return self.stay_connected_state
1291
1292    @stay_connected.setter
1293    def stay_connected(self, value):
1294        self.stay_connected_state = self.set_stay_connected(value)
1295
1296    def read_serial_port(self, read_until=None):
1297        """Read serial port until specified read_until value in seconds."""
1298        # use default read_until value if not specified
1299        if read_until:
1300            time.sleep(read_until)
1301        res = self.connection_handle.read()
1302        buf_read = []
1303        for line in res:
1304            if apollo_log_decoder.is_automation_protobuf(line):
1305                decoded = apollo_log_decoder.decode(line)
1306                buf_read.append(decoded)
1307            else:
1308                buf_read.append(line)
1309        return buf_read
1310
1311    def wait_for_reset(self, timeout=30):
1312        """waits for the device to reset by check serial enumeration.
1313
1314        Checks every .5 seconds for the port.
1315
1316        Args:
1317            timeout: The max time to wait for the device to disappear.
1318
1319        Returns:
1320            Bool: True if the device reset was detected. False if not.
1321        """
1322        start_time = time.time()
1323        while True:
1324            res = subprocess.Popen(['ls', self.commander_port],
1325                                   stdout=subprocess.PIPE,
1326                                   stderr=subprocess.PIPE)
1327            res.communicate()
1328            if res.returncode != 0:
1329                logging.info('BudsDevice reset detected')
1330                return True
1331            elif (time.time() - start_time) > timeout:
1332                logging.info('Timeout waiting for device to reset.....')
1333                return False
1334            else:
1335                time.sleep(.5)
1336
1337    def set_in_case(self, reconnect=True):
1338        """Simulates setting apollo in case and wait for device to come up.
1339
1340        Args:
1341            reconnect: bool - if method should block until reconnect
1342        """
1343        logging.info('Setting device in case')
1344        out = self.send('Pow 2')
1345        for i in out:
1346            if 'No OTA wakeup condition' in i:
1347                logging.info('No wake up condition.')
1348            elif 'STM Wakeup 10s' in i:
1349                logging.info('Wake up condition detected.')
1350        if reconnect:
1351            self.wait_for_reset()
1352            self.reconnect()
1353
1354
1355class ParentDevice(BudsDevice):
1356    """Wrapper object for Device that addresses b10 recovery and build flashing.
1357
1358    Recovery mechanism:
1359    In case a serial connection could not be established to b10, the recovery
1360    mechanism is activated  ONLY if'recover_device' is set to 'true' and
1361    b29_serial is defined in config file. This helps recover a device that has a
1362    bad build installed.
1363    """
1364
1365    def __init__(self, serial_number, recover_device=False, b29_serial=None):
1366        # if recover device parameter is supplied and there is an error in
1367        # instantiating B10 try to recover device instantiating b10 has to fail
1368        # at most $tries_before_recovery time before initiating a recovery
1369        # try to run the recovery at most $recovery_times before raising Error
1370        # after the first recovery attempt failure try to reset b29 each
1371        # iteration
1372        self.b29_device = None
1373        if recover_device:
1374            if b29_serial is None:
1375                logging.error('B29 serial not defined')
1376                raise Error(
1377                    'Recovery failed because "b29_serial" definition not '
1378                    'present in device manifest file')
1379            else:
1380                self.b29_device = B29Device(b29_serial)
1381            tries_before_recovery = 5
1382            recovery_tries = 5
1383            for attempt in range(tries_before_recovery):
1384                try:
1385                    # build crash symptoms varies based on the nature of the
1386                    # crash connectError is thrown if the device never shows up
1387                    # in /dev/ sometimes device shows and can connect but
1388                    # sending commands fails or crashes apollo in that case,
1389                    # DeviceError is thrown
1390                    super().__init__(serial_number, commander_port=None,
1391                                     log_port=None, serial_logger=None)
1392                    break
1393                except (ConnectError, DeviceError) as ex:
1394                    logging.warning(
1395                        'Error initializing apollo object - # of attempt '
1396                        'left : %d' % (tries_before_recovery - attempt - 1))
1397                    if attempt + 1 >= tries_before_recovery:
1398                        logging.error(
1399                            'Retries exhausted - now attempting to restore '
1400                            'golden image')
1401                        for recovery_attempt in range(recovery_tries):
1402                            if not self.b29_device.restore_golden_image():
1403                                logging.error('Recovery failed - retrying...')
1404                                self.b29_device.reset_charger()
1405                                continue
1406                            # try to instantiate now
1407                            try:
1408                                super().__init__(serial_number,
1409                                                 commander_port=None,
1410                                                 log_port=None,
1411                                                 serial_logger=None)
1412                                break
1413                            except (ConnectError, DeviceError):
1414                                if recovery_attempt == recovery_tries - 1:
1415                                    raise Error(
1416                                        'Recovery failed - ensure that there '
1417                                        'is no mismatching serial numbers of '
1418                                        'b29 and b10 is specified in config')
1419                                else:
1420                                    logging.warning(
1421                                        'Recovery attempt failed - retrying...')
1422                    time.sleep(2)
1423        else:
1424            super().__init__(serial_number, commander_port=None, log_port=None,
1425                             serial_logger=None)
1426        # set this to prevent sleep
1427        self.set_stay_connected(1)
1428
1429    def get_info(self):
1430        information_dictionary = {}
1431        information_dictionary['type'] = self.dut_type
1432        information_dictionary['serial'] = self.serial_number
1433        information_dictionary['log port'] = self.log_port
1434        information_dictionary['command port'] = self.commander_port
1435        information_dictionary['bluetooth address'] = self.bluetooth_address
1436        success, build_dict = self.get_version()
1437        information_dictionary['build'] = build_dict
1438        # Extract the build number as a separate key. Useful for BigQuery.
1439        information_dictionary['firmware build number'] = build_dict.get(
1440            'build', '9999')
1441        information_dictionary['name'] = self.device_name
1442        if self.b29_device:
1443            information_dictionary['b29 serial'] = self.b29_device.serial
1444            information_dictionary['b29 firmware'] = self.b29_device.fw_version
1445            information_dictionary['b29 commander port'] = self.b29_device.port
1446            information_dictionary[
1447                'b29 app version'] = self.b29_device.app_version
1448        return information_dictionary
1449
1450    def setup(self, **kwargs):
1451        """
1452
1453        Args:
1454            apollo_build: if specified, will be used in flashing the device to
1455                          that build prior to running any of the tests. If not
1456                          specified flashing is skipped.
1457        """
1458        if 'apollo_build' in kwargs and kwargs['apollo_build'] is not None:
1459            build = kwargs['apollo_build']
1460            X20_REGEX = re.compile(r'/google/data/')
1461            if not os.path.exists(build) or os.stat(build).st_size == 0:
1462                # if x20 path, retry on file-not-found error or if file size is
1463                # zero b/c X20 path does not update immediately
1464                if X20_REGEX.match(build):
1465                    for i in range(20):
1466                        # wait until file exists and size is > 0 w/ 6 second
1467                        # interval on retry
1468                        if os.path.exists(build) and os.stat(build).st_size > 0:
1469                            break
1470
1471                        if i == 19:
1472                            logging.error('Build path (%s) does not exist or '
1473                                          'file size is 0 - aborted' % build)
1474
1475                            raise Error('Specified build path (%s) does not '
1476                                        'exist or file size is 0' % build)
1477                        else:
1478                            logging.warning('Build path (%s) does not exist or '
1479                                            'file size is 0 - retrying...' %
1480                                            build)
1481                            time.sleep(6)
1482                else:
1483                    raise Error('Specified build path (%s) does not exist or '
1484                                'file size is 0' % build)
1485                self.flash_from_file(file_name=build, reconnect=True)
1486        else:
1487            logging.info('Not flashing apollo.')
1488
1489    def teardown(self, **kwargs):
1490        self.close()
1491
1492
1493def _evt_hex(cmd):
1494    return 'EvtHex ' + apollo_sink_events.SINK_EVENTS[cmd]
1495
1496
1497def _to_ascii(orig):
1498    # Returned value need to be byte swapped. Remove last octet if it is 0.
1499    result = _byte_swap(orig)
1500    result = result[:-2] if result[-2:] == '00' else result
1501    return bytearray.fromhex(result).decode()
1502
1503
1504def _byte_swap(orig):
1505    """Simple function to swap bytes order.
1506
1507    Args:
1508        orig: original string
1509
1510    Returns:
1511        a string with bytes swapped.
1512        eg. orig = '6557276920736952006f'.
1513        After swap, return '57656927732052696f00'
1514    """
1515    return ''.join(
1516        sum([(c, d, a, b) for a, b, c, d in zip(*[iter(orig)] * 4)], ()))
1517