1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Apollo Commander through USB/UART interface. 18 19It uses python serial lib to communicate to a Apollo device. 20Some of the commander may not work yet, pending on the final version of the 21commander implementation. 22 23Typical usage examples: 24 25 To get a list of all apollo devices: 26 >>> devices = apollo_lib.get_devices() 27 28 To work with a specific apollo device: 29 >>> apollo = apollo_lib.Device(serial_number='ABCDEF0123456789', 30 >>> commander_port='/dev/ttyACM0') 31 32 To send a single command: 33 >>> apollo.cmd('PowOff') 34 35 To send a list of commands: 36 >>> apollo.cmd(['PowOff', 'PowOn', 'VolUp', 'VolDown'] 37""" 38from __future__ import absolute_import 39from __future__ import division 40from __future__ import print_function 41 42import atexit 43import os 44import re 45import subprocess 46import time 47 48import serial 49from acts import tracelogger 50from acts.controllers.buds_lib import logserial 51from acts.controllers.buds_lib.b29_lib import B29Device 52from acts.controllers.buds_lib.dev_utils import apollo_log_decoder 53from acts.controllers.buds_lib.dev_utils import apollo_log_regex 54from acts.controllers.buds_lib.dev_utils import apollo_sink_events 55from logging import Logger 56from retry import retry 57 58logging = tracelogger.TakoTraceLogger(Logger('apollo')) 59 60BAUD_RATE = 115200 61BYTE_SIZE = 8 62PARITY = 'N' 63STOP_BITS = 1 64DEFAULT_TIMEOUT = 3 65WRITE_TO_FLASH_WAIT = 30 # wait 30 sec when writing to external flash. 66LOG_REGEX = re.compile(r'(?P<time_stamp>\d+)\s(?P<msg>.*)') 67STATUS_REGEX = r'(?P<time_stamp>\d+)\s(?P<key>.+?): (?P<value>.+)' 68APOLLO_CHIP = '_Apollo_' 69DEVICE_REGEX = ( 70 r'_(?P<device_serial>[A-Z0-9]+)-(?P<interface>\w+)' 71 r'\s->\s(\.\./){2}(?P<port>\w+)' 72) 73OTA_VERIFICATION_FAILED = 'OTA verification failed. corrupt image?' 74OTA_ERASING_PARTITION = 'INFO OTA eras ptns' 75OTA_RECEIVE_CSR_REGEX = r'INFO OTA CSR rcv begin' 76CODEC_REGEX = r'(?P<time_stamp>\d+)\s(?P<codec>\w+) codec is used.' 77BUILD_REGEX = r'\d+\.\d+\.(?P<build>\d+)-?(?P<psoc_build>\d*)-?(?P<debug>\w*)' 78 79 80class Error(Exception): 81 """Module Level Error.""" 82 83 84class ResponseError(Error): 85 """cmd Response Error.""" 86 87 88class DeviceError(Error): 89 """Device Error.""" 90 91 92class ConnectError(Error): 93 """Connection Error.""" 94 95 96def get_devices(): 97 """Get all available Apollo devices. 98 99 Returns: 100 (list) A list of available devices or empty list if none found 101 102 Raises: 103 Error: raises Error if no Apollo devices or wrong interfaces were found. 104 """ 105 devices = [] 106 result = os.popen('ls -l /dev/serial/by-id/*%s*' % APOLLO_CHIP).read() 107 if not result: 108 raise Error('No Apollo Devices found.') 109 for line in result.splitlines(): 110 match = re.search(DEVICE_REGEX, line) 111 interface = match.group('interface') 112 # TODO: The commander port will always be None. 113 commander_port = None 114 if interface == 'if00': 115 commander_port = '/dev/' + match.group('port') 116 continue 117 elif interface == 'if02': 118 log_port = '/dev/' + match.group('port') 119 else: 120 raise Error('Wrong interface found.') 121 device_serial = match.group('device_serial') 122 123 device = { 124 'commander_port': commander_port, 125 'log_port': log_port, 126 'serial_number': device_serial 127 } 128 devices.append(device) 129 return devices 130 131 132class BudsDevice(object): 133 """Provides a simple class to interact with Apollo.""" 134 135 def __init__(self, serial_number, commander_port=None, log_port=None, 136 serial_logger=None): 137 """Establish a connection to a Apollo. 138 139 Open a connection to a device with a specific serial number. 140 141 Raises: 142 ConnectError: raises ConnectError if cannot open the device. 143 """ 144 self.set_log = False 145 self.connection_handle = None 146 self.device_closed = False 147 if serial_logger: 148 self.set_logger(serial_logger) 149 self.pc = logserial.PortCheck() 150 self.serial_number = serial_number 151 # TODO (kselvakumaran): move this to an interface device class that 152 # apollo_lib.BudsDevice should derive from 153 if not commander_port and not log_port: 154 self.get_device_ports(self.serial_number) 155 if commander_port: 156 self.commander_port = commander_port 157 if log_port: 158 self.log_port = log_port 159 self.apollo_log = None 160 self.cmd_log = None 161 self.apollo_log_regex = apollo_log_regex 162 self.dut_type = 'apollo' 163 164 # TODO (kselvakumaran): move this to an interface device class that 165 # apollo_lib.BudsDevice should derive from 166 167 try: # Try to open the device 168 self.connection_handle = logserial.LogSerial( 169 self.commander_port, BAUD_RATE, flush_output=False, 170 serial_logger=logging) 171 self.wait_for_commander() 172 except (serial.SerialException, AssertionError, ConnectError) as e: 173 logging.error( 174 'error opening device {}: {}'.format(serial_number, e)) 175 raise ConnectError('Error open the device.') 176 # disable sleep on idle 177 self.stay_connected_state = 1 178 atexit.register(self.close) 179 180 def set_logger(self, serial_logger): 181 global logging 182 logging = serial_logger 183 self.set_log = True 184 if self.connection_handle: 185 self.connection_handle.set_logger(serial_logger) 186 187 def get_device_ports(self, serial_number): 188 commander_query = {'ID_SERIAL_SHORT': serial_number, 189 'ID_USB_INTERFACE_NUM': '00'} 190 log_query = {'ID_SERIAL_SHORT': serial_number, 191 'ID_USB_INTERFACE_NUM': '02'} 192 self.commander_port = self.pc.search_port_by_property(commander_query) 193 self.log_port = self.pc.search_port_by_property(log_query) 194 if not self.commander_port and not self.log_port: 195 raise ConnectError( 196 'BudsDevice serial number %s not found' % serial_number) 197 else: 198 if not self.commander_port: 199 raise ConnectError('No devices found') 200 self.commander_port = self.commander_port[0] 201 self.log_port = self.log_port[0] 202 203 def get_all_log(self): 204 return self.connection_handle.get_all_log() 205 206 def query_log(self, from_timestamp, to_timestamp): 207 return self.connection_handle.query_serial_log( 208 from_timestamp=from_timestamp, to_timestamp=to_timestamp) 209 210 def send(self, cmd): 211 """Sends the command to serial port. 212 213 It does not care about whether the cmd is successful or not. 214 215 Args: 216 cmd: The passed command 217 218 Returns: 219 The number of characters written 220 """ 221 logging.debug(cmd) 222 # with self._lock: 223 self.connection_handle.write(cmd) 224 result = self.connection_handle.read() 225 return result 226 227 def cmd(self, cmds, wait=None): 228 """Sends the commands and check responses. 229 230 Valid cmd will return something like '585857269 running cmd VolUp'. 231 Invalid cmd will log an error and return something like '585826369 No 232 command vol exists'. 233 234 Args: 235 cmds: The commands to the commander. 236 wait: wait in seconds for the cmd response. 237 238 Returns: 239 (list) The second element of the array returned by _cmd. 240 """ 241 if isinstance(cmds, str): 242 cmds = [cmds] 243 results = [] 244 for cmd in cmds: 245 _, result = self._cmd(cmd, wait=wait) 246 results.append(result) 247 return results 248 249 def _cmd(self, cmd, wait=None, throw_error=True): 250 """Sends a single command and check responses. 251 252 Valid cmd will return something like '585857269 running cmd VolUp'. 253 Invalid cmd will log an error and return something like '585826369 No 254 command vol exists'. Some cmd will return multiple lines of output. 255 eg. 'menu'. 256 257 Args: 258 cmd: The command to the commander. 259 wait: wait in seconds for the cmd response. 260 throw_error: Throw exception on True 261 262 Returns: 263 (list) containing such as the following: 264 [<return value>, [<protobuf dictionary>, str]] 265 Hex strings (protobuf) are replaced by its decoded dictionaries 266 and stored in an arry along with other string returned fom the 267 device. 268 269 Raises: 270 DeviceError: On Error.(Optional) 271 """ 272 self.connection_handle.write(cmd) 273 274 while self.connection_handle.is_logging: 275 time.sleep(.01) 276 if wait: 277 self.wait(wait) 278 # Using read_serial_port as readlines is a blocking call until idle. 279 res = self.read_serial_port() 280 result = [] 281 self.cmd_log = res 282 command_resv = False 283 # TODO: Cleanup the usage of the two booleans below. 284 command_finish = False 285 command_rejected = False 286 # for line in iter_res: 287 for line in res: 288 if isinstance(line, dict): 289 if 'COMMANDER_RECV_COMMAND' in line.values(): 290 command_resv = True 291 elif 'COMMANDER_REJECT_COMMAND' in line.values(): 292 logging.info('Command rejected') 293 command_rejected = True 294 break 295 elif 'COMMANDER_FINISH_COMMAND' in line.values(): 296 command_finish = True 297 break 298 elif (command_resv and not command_finish and 299 not command_rejected): 300 result.append(line) 301 # TODO(jesussalinas): Remove when only encoded lines are required 302 elif command_resv and not command_finish and not command_rejected: 303 if 'running cmd' not in line: 304 result.append(line) 305 success = True 306 if command_rejected or not command_resv: 307 success = False 308 if throw_error: 309 logging.info(res) 310 raise DeviceError('Unknown command %s' % cmd) 311 return success, result 312 313 def get_pdl(self): 314 """Returns the PDL stack dictionary. 315 316 The PDL stack stores paired devices of Apollo. Each PDL entry include 317 mac_address, flags, link_key, priority fields. 318 319 Returns: 320 list of pdl dicts. 321 """ 322 # Get the mask from CONNLIB41: 323 # CONNLIB41 typically looks something like this: 2403 fff1 324 # 2403 fff1 is actually two 16-bit words of a 32-bit integer 325 # like 0xfff12403 . This tells the chronological order of the entries 326 # in the paired device list one nibble each. LSB to MSB corresponds to 327 # CONNLIB42 through CONNLIB49. So, the above tells us that the device at 328 # 0x2638 is the 3rd most recent entry 0x2639 the latest entry etc. As 329 # a device re-pairs the masks are updated. 330 response = [] 331 mask = 'ffffffff' 332 res = self.cmd('GetPSHex 0x2637') 333 if len(res[0]) == 0: 334 logging.warning('Error reading PDL mask @ 0x2637') 335 return response 336 else: 337 regexp = r'\d+\s+(?P<m1>....)\s(?P<m2>....)' 338 match = re.match(regexp, res[0][0]) 339 if match: 340 connlib41 = match.group('m2') + match.group('m1') 341 mask = connlib41[::-1] 342 logging.debug('PDL mask: %s' % mask) 343 344 # Now get the MAC/link key 345 mask_idx = 0 346 for i in range(9784, 9883): 347 types = {} 348 res = self.cmd('GetPSHex ' + '%0.2x' % i) 349 if len(res[0]) == 0: 350 break 351 else: 352 regexp = ('\d+\s+(?P<Mac>....\s....\s....)\s' 353 '(?P<Flags>....\s....)\s(?P<Linkkey>.*)') 354 match = re.search(regexp, res[0][0]) 355 if match: 356 mac_address = match.group('Mac').replace(' ', '').upper() 357 formatted_mac = '' 358 for i in range(len(mac_address)): 359 formatted_mac += mac_address[i] 360 if i % 2 != 0 and i < (len(mac_address) - 1): 361 formatted_mac += ':' 362 types['mac_address'] = formatted_mac 363 types['flags'] = match.group('Flags').replace(' ', '') 364 types['link_key'] = match.group('Linkkey').replace(' ', '') 365 types['priority'] = int(mask[mask_idx], 16) 366 mask_idx += 1 367 response.append(types) 368 369 return response 370 371 def set_pairing_mode(self): 372 """Enter Bluetooth Pairing mode.""" 373 logging.debug('Inside set_pairing_mode()...') 374 try: 375 return self.cmd('Pair') 376 except DeviceError: 377 logging.exception('Pair cmd failed') 378 379 # TODO (kselvakumaran): move this to an interface BT class that 380 # apollo_lib.BudsDevice should derive from 381 def turn_on_bluetooth(self): 382 return True 383 384 # TODO (kselvakumaran): move this to an interface BT class that 385 # apollo_lib.BudsDevice should derive from 386 def is_bt_enabled(self): 387 """Check if BT is enabled. 388 389 (TODO:weisu)Currently it is always true since there is no way to disable 390 BT in apollo 391 392 Returns: 393 True if BT is enabled. 394 """ 395 logging.debug('Inside is_bt_enabled()...') 396 return True 397 398 def panic(self): 399 """Hitting a panic, device will be automatically reset after 5s.""" 400 logging.debug('Inside panic()...') 401 try: 402 self.send('panic') 403 except serial.SerialException: 404 logging.exception('panic cmd failed') 405 406 def power(self, cmd): 407 """Controls the power state of the device. 408 409 Args: 410 cmd: If 'Off', powers the device off. Otherwise, powers the device 411 on. 412 """ 413 logging.debug('Inside power({})...'.format(cmd)) 414 mode = '0' if cmd == 'Off' else '1' 415 cmd = 'Pow ' + mode 416 try: 417 return self.cmd(cmd) 418 except DeviceError: 419 logging.exception('{} cmd failed'.format(cmd)) 420 421 def charge(self, state): 422 """Charging Control of the device. 423 424 Args: 425 state: '1/0' to enable/disable charging. 426 """ 427 logging.debug('Inside charge({})...'.format(state)) 428 cmd = 'chg ' + state 429 try: 430 self.cmd(cmd) 431 except DeviceError: 432 logging.exception('{} cmd failed'.format(cmd)) 433 434 def get_battery_level(self): 435 """Get the battery charge level. 436 437 Returns: 438 charge percentage string. 439 440 Raises: 441 DeviceError: GetBatt response error. 442 """ 443 response = self.cmd('GetBatt') 444 for line in response[0]: 445 if line.find('Batt:') > -1: 446 # Response if in this format '<messageID> Batt: <percentage>' 447 return line.split()[2] 448 raise DeviceError('Battery Level not found in GetBatt response') 449 450 def get_gas_gauge_current(self): 451 """Get the Gauge current value. 452 453 Returns: 454 Float value with the info 455 456 Raises: 457 DeviceError: I2CRead response error. 458 """ 459 response = self.cmd('I2CRead 2 0x29') 460 for line in response[0]: 461 if line.find('value') > -1: 462 return float.fromhex(line.split()[6].replace(',', '')) 463 raise DeviceError('Current Level not found in I2CRead response') 464 465 def get_gas_gauge_voltage(self): 466 """Get the Gauge voltage value. 467 468 Returns: 469 Float value with the info 470 471 Raises: 472 DeviceError: I2CRead response error. 473 """ 474 response = self.cmd('I2CRead 2 0x2A') 475 for line in response[0]: 476 if line.find('value') > -1: 477 return float.fromhex(line.split()[6].replace(',', '')) 478 raise DeviceError('Voltage Level not found in I2CRead response') 479 480 def reset(self, wait=5): 481 """Resetting the device.""" 482 logging.debug('Inside reset()...') 483 self.power('Off') 484 self.wait(wait) 485 self.power('On') 486 487 def close(self): 488 if not self.device_closed: 489 self.connection_handle.close() 490 self.device_closed = True 491 if not self.set_log: 492 logging.flush_log() 493 494 def get_serial_log(self): 495 """Retrieve the logs from connection handle.""" 496 return self.connection_handle.get_all_log() 497 498 def factory_reset(self): 499 """Erase paired device(s) (bond) data and reboot device.""" 500 cmd = 'FactoryReset 1' 501 self.send(cmd) 502 self.wait(5) 503 self.reconnect() 504 505 def reboot(self, reconnect=10, retry_timer=30): 506 """Rebooting the device. 507 508 Args: 509 reconnect: reconnect attempts after reboot, None for no reconnect. 510 retry_timer: wait time in seconds before next connect retry. 511 512 Returns: 513 True if successfully reboot or reconnect. 514 """ 515 logging.debug('Inside reboot()...') 516 self.panic() 517 if not reconnect: 518 return True 519 ini_time = time.time() 520 message = 'waiting for {} to shutdown'.format(self.serial_number) 521 logging.info(message) 522 while True: 523 alive = self.connection_handle.is_port_alive() 524 if not alive: 525 logging.info('rebooted') 526 break 527 if time.time() - ini_time > 60: 528 logging.info('Shutdown timeouted') 529 break 530 time.sleep(0.5) 531 return self.reconnect(reconnect, retry_timer) 532 533 def reconnect(self, iterations=30, retry_timer=20): 534 """Reconnect to the device. 535 536 Args: 537 iterations: Number of retry iterations. 538 retry_timer: wait time in seconds before next connect retry. 539 540 Returns: 541 True if reconnect to the device successfully. 542 543 Raises: 544 DeviceError: Failed to reconnect. 545 """ 546 logging.debug('Inside reconnect()...') 547 for i in range(iterations): 548 try: 549 # port might be changed, refresh the port list. 550 self.get_device_ports(self.serial_number) 551 message = 'commander_port: {}, log_port: {}'.format( 552 self.commander_port, self.log_port) 553 logging.info(message) 554 self.connection_handle.refresh_port_connection( 555 self.commander_port) 556 # Sometimes there might be sfome delay when commander is 557 # functioning. 558 self.wait_for_commander() 559 return True 560 except Exception as e: # pylint: disable=broad-except 561 message = 'Fail to connect {} times due to {}'.format( 562 i + 1, e) 563 logging.warning(message) 564 # self.close() 565 time.sleep(retry_timer) 566 raise DeviceError('Cannot reconnect to %s with %d attempts.', 567 self.commander_port, iterations) 568 569 @retry(Exception, tries=4, delay=1, backoff=2) 570 def wait_for_commander(self): 571 """Wait for commander to function. 572 573 Returns: 574 True if commander worked. 575 576 Raises: 577 DeviceError: Failed to bring up commander. 578 """ 579 # self.Flush() 580 result = self.cmd('menu') 581 if result: 582 return True 583 else: 584 raise DeviceError('Cannot start commander.') 585 586 def wait(self, timeout=1): 587 """Wait for the device.""" 588 logging.debug('Inside wait()...') 589 time.sleep(timeout) 590 591 def led(self, cmd): 592 """LED control of the device.""" 593 message = 'Inside led({})...'.format(cmd) 594 logging.debug(message) 595 cmd = 'EventUsrLeds' + cmd 596 try: 597 return self.cmd(_evt_hex(cmd)) 598 except DeviceError: 599 logging.exception('LED cmd failed') 600 601 def volume(self, key, times=1): 602 """Volume Control. (Down/Up). 603 604 Args: 605 key: Down --Decrease a volume. 606 Up --Increase a volume. 607 times: Simulate number of swipes. 608 609 Returns: 610 (int) Volume level. 611 612 Raises: 613 DeviceError 614 """ 615 message = 'Inside volume({}, {})...'.format(key, times) 616 logging.debug(message) 617 updown = { 618 'Up': '1', 619 'Down': '0', 620 } 621 cmds = ['ButtonSwipe ' + updown[key]] * times 622 logging.info(cmds) 623 try: 624 self.cmd(cmds) 625 for line in self.cmd_log: 626 if isinstance(line, dict): 627 if 'id' in line and line['id'] == 'VOLUME_CHANGE': 628 if 'data' in line and line['data']: 629 return int(line['data']) 630 except DeviceError: 631 logging.exception('ButtonSwipe cmd failed') 632 633 def menu(self): 634 """Return a list of supported commands.""" 635 logging.debug('Inside menu()...') 636 try: 637 return self.cmd('menu') 638 except DeviceError: 639 logging.exception('menu cmd failed') 640 641 def set_ohd(self, mode='AUTO'): 642 """Manually set the OHD status and override auto-detection. 643 644 Args: 645 mode: ON --OHD manual mode with on-ear state. 646 OFF --OHD manual mode with off-ear state. 647 AUTO --OHD auto-detection mode. 648 Raises: 649 DeviceError: OHD Command failure. 650 """ 651 logging.debug('Inside set_ohd()...') 652 try: 653 if mode != 'AUTO': 654 # Set up OHD manual mode 655 self.cmd('Test 14 0 2 1') 656 if mode == 'ON': 657 # Detects on-ear 658 self.cmd('Test 14 0 2 1 0x3') 659 else: 660 # Detects off-ear 661 self.cmd('Test 14 0 2 1 0x0') 662 else: 663 # Default mode (auto detect.) 664 self.cmd('Test 14 0 2 0') 665 except DeviceError: 666 logging.exception('OHD cmd failed') 667 668 def music_control_events(self, cmd, regexp=None, wait=.5): 669 """Sends the EvtHex to control media player. 670 671 Arguments: 672 cmd: the command to perform. 673 regexp: Optional pattern to validate the event logs. 674 675 Returns: 676 Boolean: True if the command triggers the correct events on the 677 device, False otherwise. 678 679 # TODO(nviboonchan:) Add more supported commands. 680 Supported commands: 681 'PlayPause' 682 'VolumeUp' 683 'VolumeDown', 684 """ 685 cmd_regexp = { 686 # Play/ Pause would need to pass the regexp argument since it's 687 # sending the same event but returns different responses depending 688 # on the device state. 689 'VolumeUp': apollo_log_regex.VOLUP_REGEX, 690 'VolumeDown': apollo_log_regex.VOLDOWN_REGEX, 691 } 692 if not regexp: 693 if cmd not in cmd_regexp: 694 logmsg = 'Expected pattern is not defined for event %s' % cmd 695 logging.exception(logmsg) 696 return False 697 regexp = cmd_regexp[cmd] 698 self.cmd('EvtHex %s' % apollo_sink_events.SINK_EVENTS['EventUsr' + cmd], 699 wait=wait) 700 for line in self.cmd_log: 701 if isinstance(line, str): 702 if re.search(regexp, line): 703 return True 704 elif isinstance(line, dict): 705 if line.get('id', None) == 'AVRCP_PLAY_STATUS_CHANGE': 706 return True 707 return False 708 709 def avrcp(self, cmd): 710 """sends the Audio/Video Remote Control Profile (avrcp) control command. 711 712 Supported commands: 713 'PlayPause' 714 'Stop' 715 'SkipForward', 716 'SkipBackward', 717 'FastForwardPress', 718 'FastForwardRelease', 719 'RewindPress', 720 'RewindRelease', 721 'ShuffleOff', 722 'ShuffleAllTrack', 723 'ShuffleGroup', 724 'RepeatOff':, 725 'RepeatSingleTrack', 726 'RepeatAllTrack', 727 'RepeatGroup', 728 'Play', 729 'Pause', 730 'ToggleActive', 731 'NextGroupPress', 732 'PreviousGroupPress', 733 'NextGroupRelease', 734 'PreviousGroupRelease', 735 736 Args: 737 cmd: The avrcp command. 738 739 """ 740 cmd = 'EventUsrAvrcp' + cmd 741 logging.debug(cmd) 742 try: 743 self.cmd(_evt_hex(cmd)) 744 except DeviceError: 745 logging.exception('avrcp cmd failed') 746 747 def enable_log(self, levels=None): 748 """Enable specified logs.""" 749 logging.debug('Inside enable_log()...') 750 if levels is None: 751 levels = ['ALL'] 752 masks = hex( 753 sum([int(apollo_sink_events.LOG_FEATURES[x], 16) for x in levels])) 754 try: 755 self.cmd('LogOff %s' % apollo_sink_events.LOG_FEATURES['ALL']) 756 return self.cmd('LogOn %s' % masks) 757 except DeviceError: 758 logging.exception('Enable log failed') 759 760 def disable_log(self, levels=None): 761 """Disable specified logs.""" 762 logging.debug('Inside disable_log()...') 763 if levels is None: 764 levels = ['ALL'] 765 masks = hex( 766 sum([int(apollo_sink_events.LOG_FEATURES[x], 16) for x in levels])) 767 try: 768 self.cmd('LogOn %s' % apollo_sink_events.LOG_FEATURES['ALL']) 769 return self.cmd('LogOff %s' % masks) 770 except DeviceError: 771 logging.exception('Disable log failed') 772 773 def write_to_flash(self, file_name=None): 774 """Write file to external flash. 775 776 Note: Assume pv is installed. If not, install it by 777 'apt-get install pv'. 778 779 Args: 780 file_name: Full path file name. 781 782 Returns: 783 Boolean: True if write to partition is successful. False otherwise. 784 """ 785 logging.debug('Inside write_to_flash()...') 786 if not os.path.isfile(file_name): 787 message = 'DFU file %s not found.'.format(file_name) 788 logging.exception(message) 789 return False 790 logging.info( 791 'Write file {} to external flash partition ...'.format(file_name)) 792 image_size = os.path.getsize(file_name) 793 logging.info('image size is {}'.format(image_size)) 794 results = self.cmd('Ota {}'.format(image_size), wait=3) 795 logging.debug('Result of Ota command' + str(results)) 796 if any(OTA_VERIFICATION_FAILED in result for result in results[0]): 797 return False 798 # finished cmd Ota 799 if (any('OTA_ERASE_PARTITION' in result.values() for result in 800 results[0] if 801 isinstance(result, dict)) or 802 any('OTA erasd ptns' in result for result in results[0])): 803 try: 804 # -B: buffer size in bytes, -L rate-limit in B/s. 805 subcmd = ('pv --force -B 160 -L 10000 %s > %s' % 806 (file_name, self.commander_port)) 807 logging.info(subcmd) 808 p = subprocess.Popen(subcmd, stdout=subprocess.PIPE, shell=True) 809 except OSError: 810 logging.exception( 811 'pv not installed, please install by: apt-get install pv') 812 return False 813 try: 814 res = self.read_serial_port(read_until=6) 815 except DeviceError: 816 logging.exception('Unable to read the device port') 817 return False 818 for line in res: 819 if isinstance(line, dict): 820 logging.info(line) 821 else: 822 match = re.search(OTA_RECEIVE_CSR_REGEX, line) 823 if match: 824 logging.info( 825 'OTA Image received. Transfer is in progress...') 826 # Polling during a transfer could miss the final message 827 # when the device reboots, so we wait until the transfer 828 # completes. 829 p.wait() 830 return True 831 # No image transfer in progress. 832 return False 833 else: 834 return False 835 836 def flash_from_file(self, file_name, reconnect=True): 837 """Upgrade Apollo from an image file. 838 839 Args: 840 file_name: DFU file name. eg. /google/data/ro/teams/wearables/ 841 apollo/ota/master/v76/apollo.dfu 842 reconnect: True to reconnect the device after flashing 843 Returns: 844 Bool: True if the upgrade is successful. False otherwise. 845 """ 846 logging.debug('Inside flash_from_file()...') 847 if self.write_to_flash(file_name): 848 logging.info('OTA image transfer is completed') 849 if reconnect: 850 # Transfer is completed; waiting for the device to reboot. 851 logging.info('wait to make sure old connection disappears.') 852 self.wait_for_reset(timeout=150) 853 self.reconnect() 854 logging.info('BudsDevice reboots successfully after OTA.') 855 return True 856 857 def open_mic(self, post_delay=5): 858 """Open Microphone on the device using EvtHex command. 859 860 Args: 861 post_delay: time delay in seconds after the microphone is opened. 862 863 Returns: 864 Returns True or False based on whether the command was executed. 865 """ 866 logging.debug('Inside open_mic()...') 867 success, _ = self._cmd('Voicecmd 1', post_delay) 868 return success 869 870 def close_mic(self, post_delay=5): 871 """Close Microphone on the device using EvtHex command. 872 873 Args: 874 post_delay: time delay in seconds after the microphone is closed. 875 876 Returns: 877 Returns true or false based on whether the command was executed. 878 """ 879 logging.debug('Inside close_mic()...') 880 success, _ = self._cmd('Voicecmd 0', post_delay) 881 return success 882 883 def touch_key_press_event(self, wait=1): 884 """send key press event command. 885 886 Args: 887 wait: Inject delay after key press to simulate real touch event . 888 """ 889 logging.debug('Inside KeyPress()...') 890 self._cmd('Touch 6') 891 self.wait(wait) 892 893 def touch_tap_event(self, wait_if_pause=10): 894 """send key release event after key press to simulate single tap. 895 896 Args: 897 wait_if_pause: Inject delay after avrcp pause was detected. 898 899 Returns: 900 Returns False if avrcp play orp ause not detected else True. 901 """ 902 logging.debug('Inside Touch Tap event()...') 903 self._cmd('Touch 4') 904 for line in self.cmd_log: 905 if 'avrcp play' in line: 906 logging.info('avrcp play detected') 907 return True 908 if 'avrcp pause' in line: 909 logging.info('avrcp pause detected') 910 self.wait(wait_if_pause) 911 return True 912 return False 913 914 def touch_hold_up_event(self): 915 """Open Microphone on the device using touch hold up command. 916 917 Returns: 918 Returns True or False based on whether the command was executed. 919 """ 920 logging.debug('Inside open_mic()...') 921 self._cmd('Touch 3') 922 for line in self.cmd_log: 923 if 'Button 1 LONG_BEGIN' in line: 924 logging.info('mic open success') 925 return True 926 return False 927 928 def touch_hold_down_event(self): 929 """Close Microphone on the device using touch hold down command. 930 931 Returns: 932 Returns true or false based on whether the command was executed. 933 """ 934 logging.debug('Inside close_mic()...') 935 self._cmd('Touch 8') 936 for line in self.cmd_log: 937 if 'Button 1 LONG_END' in line: 938 logging.info('mic close success') 939 return True 940 return False 941 942 def tap(self): 943 """Performs a Tap gesture.""" 944 logging.debug('Inside tap()') 945 self.cmd('ButtonTap 0') 946 947 def hold(self, duration): 948 """Tap and hold a button. 949 950 Args: 951 duration: (int) duration in milliseconds. 952 """ 953 logging.debug('Inside hold()') 954 self.cmd('ButtonHold ' + str(duration)) 955 956 def swipe(self, direction): 957 """Perform a swipe gesture. 958 959 Args: 960 direction: (int) swipe direction 1 forward, 0 backward. 961 """ 962 logging.debug('Inside swipe()') 963 self.cmd('ButtonSwipe ' + direction) 964 965 def get_pskey(self, key): 966 """Fetch value from persistent store.""" 967 try: 968 cmd = 'GetPSHex ' + apollo_sink_events.PSKEY[key] 969 except KeyError: 970 raise DeviceError('PS Key: %s not found' % key) 971 pskey = '' 972 try: 973 ret = self.cmd(cmd) 974 for result in ret[0]: 975 if not re.search(r'pskey', result.lower()) and LOG_REGEX.match( 976 result): 977 # values are broken into words separated by spaces. 978 pskey += LOG_REGEX.match(result).group('msg').replace(' ', 979 '') 980 else: 981 continue 982 except DeviceError: 983 logging.exception('GetPSHex cmd failed') 984 return pskey 985 986 def get_version(self): 987 """Return a device version information. 988 989 Note: Version information is obtained from the firmware loader. Old 990 information is lost when firmware is updated. 991 Returns: 992 A dictionary of device version info. eg. 993 { 994 'Fw Build': '73', 995 'OTA Status': 'No OTA performed before this boot', 996 } 997 998 """ 999 logging.debug('Inside get_version()...') 1000 success, result = self._cmd('GetVer', throw_error=False) 1001 status = {} 1002 if result: 1003 for line in result: 1004 if isinstance(line, dict): 1005 status['build'] = line['vm_build_number'] 1006 status['psoc_build'] = line['psoc_version'] 1007 status['debug'] = line['csr_fw_debug_build'] 1008 status['Fw Build Label'] = line['build_label'] 1009 if 'last_ota_status' in line.keys(): 1010 # Optional value in the proto response 1011 status['OTA Status'] = line['last_ota_status'] 1012 else: 1013 status['OTA Status'] = 'No info' 1014 return success, status 1015 1016 def get_earcon_version(self): 1017 """Return a device Earson version information. 1018 1019 Returns: 1020 Boolean: True if success, False otherwise. 1021 String: Earon Version e.g. 7001 0201 6100 0000 1022 1023 """ 1024 # TODO(nviboonchan): Earcon version format would be changed in the 1025 # future. 1026 logging.debug('Inside get_earcon_version()...') 1027 result = self.get_pskey('PSKEY_EARCON_VERSION') 1028 if result: 1029 return True, result 1030 else: 1031 return False, None 1032 1033 def get_bt_status(self): 1034 """Return a device bluetooth connection information. 1035 1036 Returns: 1037 A dictionary of bluetooth status. eg. 1038 { 1039 'Comp. App': 'FALSE', 1040 'HFP (pri.)', 'FALSE', 1041 'HFP (sec.)': 'FALSE', 1042 'A2DP (pri.)': 'FALSE', 1043 'A2DP (sec.)': 'FALSE', 1044 'A2DP disconnects': '3', 1045 'A2DP Role (pri.)': 'slave', 1046 'A2DP RSSI (pri.)': '-Touch' 1047 } 1048 """ 1049 logging.debug('Inside get_bt_status()...') 1050 return self._get_status('GetBTStatus') 1051 1052 def get_conn_devices(self): 1053 """Gets the BT connected devices. 1054 1055 Returns: 1056 A dictionary of BT connected devices. eg. 1057 { 1058 'HFP Pri': 'xxxx', 1059 'HFP Sec': 'xxxx', 1060 'A2DP Pri': 'xxxx', 1061 'A2DP Sec': 'xxxx', 1062 'RFCOMM devices': 'xxxx', 1063 'CTRL': 'xxxx', 1064 'AUDIO': 'None', 1065 'DEBUG': 'None', 1066 'TRANS': 'None' 1067 } 1068 1069 Raises: 1070 ResponseError: If unexpected response occurs. 1071 """ 1072 response_regex = re.compile('[0-9]+ .+: ') 1073 connected_status = {} 1074 response = self.cmd('GetConnDevices') 1075 if not response: 1076 raise ResponseError( 1077 'No response returned by GetConnDevices command') 1078 for line in response[0]: 1079 if response_regex.search(line): 1080 profile, value = line[line.find(' '):].split(':', 1) 1081 connected_status[profile] = value 1082 if not connected_status: 1083 raise ResponseError('No BT Profile Status in response.') 1084 return connected_status 1085 1086 def _get_status(self, cmd): 1087 """Return a device status information.""" 1088 status = {} 1089 try: 1090 results = self.cmd(cmd) 1091 except DeviceError as ex: 1092 # logging.exception('{} cmd failed'.format(cmd)) 1093 logging.warning('Failed to get device status info.') 1094 raise ex 1095 results = results[0] 1096 for result in results: 1097 match = re.match(STATUS_REGEX, result) 1098 if match: 1099 key = match.group('key') 1100 value = match.group('value') 1101 status.update({key: value}) 1102 return status 1103 1104 def is_streaming(self): 1105 """Returns the music streaming status on Apollo. 1106 1107 Returns: 1108 Boolean: True if device is streaming music. False otherwise. 1109 """ 1110 1111 status = self.cmd('GetDSPStatus') 1112 if any('active feature mask: 0' in log for log in 1113 status[0]): 1114 return False 1115 elif any('active feature mask: 2' in log for log in 1116 status[0]): 1117 return True 1118 else: 1119 return False 1120 1121 def is_in_call(self): 1122 """Returns the phone call status on Apollo. 1123 1124 Returns: 1125 Boolean: True if device has incoming call. False otherwise. 1126 """ 1127 1128 status = self.cmd('GetDSPStatus') 1129 if not any('Inc' or 'out' in log for log in status[0]): 1130 return False 1131 return True 1132 1133 def is_device_limbo(self): 1134 """Check if device is in Limbo state. 1135 1136 Returns: 1137 Boolean: True if device is in limbo state, False otherwise. 1138 """ 1139 device_state = self.get_device_state() 1140 logging.info('BudsDevice "{}" state {}'.format(self.serial_number, 1141 device_state)) 1142 return device_state == 'limbo' 1143 1144 def get_device_state(self): 1145 """Get state of the device. 1146 1147 Returns: 1148 String representing the device state. 1149 1150 Raises: 1151 DeviceError: If command fails. 1152 """ 1153 _, status = self._cmd('GetDSPStatus') 1154 for stat in status: 1155 if isinstance(stat, dict): 1156 logging.info(stat) 1157 return stat['sink_state'].lower() 1158 raise DeviceError('BudsDevice state not found in GetDSPStatus.') 1159 1160 def set_stay_connected(self, value): 1161 """Run command to set the value for SetAlwaysConnected. 1162 1163 Args: 1164 value: (int) 1 to keep connection engages at all time, 1165 0 for restoring 1166 Returns: 1167 the set state of type int (0 or 1) or None if not applicable 1168 """ 1169 1170 if int(self.version) >= 1663: 1171 self._cmd('SetAlwaysConnected {}'.format(value)) 1172 logging.info('Setting sleep on idle to {}'.format(value)) 1173 return value 1174 1175 def get_codec(self): 1176 """Get device's current audio codec. 1177 1178 Returns: 1179 String representing the audio codec. 1180 1181 Raises: 1182 DeviceError: If command fails. 1183 """ 1184 success, status = self._cmd('get_codec') 1185 logging.info('---------------------------------------') 1186 logging.info(status) 1187 logging.info('---------------------------------------') 1188 if success: 1189 for line in status: 1190 if isinstance(line, dict): 1191 logging.info('Codec found: %s'.format(line['codec'])) 1192 return line['codec'] 1193 raise DeviceError('BudsDevice state not found in get_codec.') 1194 1195 def crash_dump_detection(self): 1196 """Reads crash dump determines if a crash is detected. 1197 1198 Returns: 1199 True if crash detection is supported and if a new crash is found. 1200 False otherwise. 1201 """ 1202 # Detects if crashdump output is new 1203 new_crash_regex = r'new crash = ([01]+)' 1204 # filter crashdump for just the trace 1205 crash_stack_regex = r'BASIC(.*)\n[\d]+ APP_STACK(.*)\n' 1206 # remove time stamp commander output 1207 timestamp_remover_regex = '\n[\\d]+ ' 1208 1209 logging.debug('Inside IsCrashDumpDetection()...') 1210 cmd_return = self.cmd('CrashDump', wait=1) 1211 crash_dump_str = '\n'.join(cmd_return[0]) 1212 logging.info(crash_dump_str) 1213 try: 1214 # check for crash 1215 match = re.search(new_crash_regex, crash_dump_str) 1216 if match is not None: 1217 if match.groups()[0] == '1': # new crash found 1218 logging.error('Crash detected!!') 1219 basic, app_stack = re.search(crash_stack_regex, 1220 crash_dump_str, 1221 re.DOTALL).groups() 1222 # remove time stamps from capture 1223 basic = re.sub(timestamp_remover_regex, '', basic) 1224 app_stack = re.sub(timestamp_remover_regex, '', app_stack) 1225 # write to log 1226 # pylint: disable=bad-whitespace 1227 logging.info( 1228 '\n&270d = %s\n&270e = %s\n' % (basic, app_stack)) 1229 # pylint: enable=bad-whitespace 1230 return True 1231 else: # no new crash 1232 logging.info('No crash detected') 1233 return False 1234 except AttributeError: 1235 logging.exception( 1236 'Apollo crash dump output is not in expected format') 1237 raise DeviceError('Apollo crash dump not in expected format') 1238 1239 @property 1240 def version(self): 1241 """Application version. 1242 1243 Returns: 1244 (String) Firmware version. 1245 """ 1246 _, result = self.get_version() 1247 return result['build'] 1248 1249 @property 1250 def bluetooth_address(self): 1251 """Bluetooth MAC address. 1252 1253 Returns: 1254 a string representing 48bit BT MAC address in Hex. 1255 1256 Raises: 1257 DeviceError: Unable to find BT Address 1258 """ 1259 results = self.get_pskey('PSKEY_BDADDR') 1260 if not results: 1261 raise DeviceError('Unable to find BT Address') 1262 logging.info(results) 1263 # Bluetooth lower address part, upper address part and non-significant 1264 # address part. 1265 bt_lap = results[2:8] 1266 bt_uap = results[10:12] 1267 bt_nap = results[12:16] 1268 results = bt_nap + bt_uap + bt_lap 1269 1270 return ':'.join(map(''.join, zip(*[iter(results)] * 2))).upper() 1271 1272 @property 1273 def device_name(self): 1274 """Device Friendly Name. 1275 1276 Returns: 1277 a string representing device friendly name. 1278 1279 Raises: 1280 DeviceError: Unable to find a wearable device name. 1281 """ 1282 result = self.get_pskey('PSKEY_DEVICE_NAME') 1283 if not result: 1284 raise DeviceError('Unable to find BudsDevice Name') 1285 logging.info(_to_ascii(result)) 1286 return _to_ascii(result) 1287 1288 @property 1289 def stay_connected(self): 1290 return self.stay_connected_state 1291 1292 @stay_connected.setter 1293 def stay_connected(self, value): 1294 self.stay_connected_state = self.set_stay_connected(value) 1295 1296 def read_serial_port(self, read_until=None): 1297 """Read serial port until specified read_until value in seconds.""" 1298 # use default read_until value if not specified 1299 if read_until: 1300 time.sleep(read_until) 1301 res = self.connection_handle.read() 1302 buf_read = [] 1303 for line in res: 1304 if apollo_log_decoder.is_automation_protobuf(line): 1305 decoded = apollo_log_decoder.decode(line) 1306 buf_read.append(decoded) 1307 else: 1308 buf_read.append(line) 1309 return buf_read 1310 1311 def wait_for_reset(self, timeout=30): 1312 """waits for the device to reset by check serial enumeration. 1313 1314 Checks every .5 seconds for the port. 1315 1316 Args: 1317 timeout: The max time to wait for the device to disappear. 1318 1319 Returns: 1320 Bool: True if the device reset was detected. False if not. 1321 """ 1322 start_time = time.time() 1323 while True: 1324 res = subprocess.Popen(['ls', self.commander_port], 1325 stdout=subprocess.PIPE, 1326 stderr=subprocess.PIPE) 1327 res.communicate() 1328 if res.returncode != 0: 1329 logging.info('BudsDevice reset detected') 1330 return True 1331 elif (time.time() - start_time) > timeout: 1332 logging.info('Timeout waiting for device to reset.....') 1333 return False 1334 else: 1335 time.sleep(.5) 1336 1337 def set_in_case(self, reconnect=True): 1338 """Simulates setting apollo in case and wait for device to come up. 1339 1340 Args: 1341 reconnect: bool - if method should block until reconnect 1342 """ 1343 logging.info('Setting device in case') 1344 out = self.send('Pow 2') 1345 for i in out: 1346 if 'No OTA wakeup condition' in i: 1347 logging.info('No wake up condition.') 1348 elif 'STM Wakeup 10s' in i: 1349 logging.info('Wake up condition detected.') 1350 if reconnect: 1351 self.wait_for_reset() 1352 self.reconnect() 1353 1354 1355class ParentDevice(BudsDevice): 1356 """Wrapper object for Device that addresses b10 recovery and build flashing. 1357 1358 Recovery mechanism: 1359 In case a serial connection could not be established to b10, the recovery 1360 mechanism is activated ONLY if'recover_device' is set to 'true' and 1361 b29_serial is defined in config file. This helps recover a device that has a 1362 bad build installed. 1363 """ 1364 1365 def __init__(self, serial_number, recover_device=False, b29_serial=None): 1366 # if recover device parameter is supplied and there is an error in 1367 # instantiating B10 try to recover device instantiating b10 has to fail 1368 # at most $tries_before_recovery time before initiating a recovery 1369 # try to run the recovery at most $recovery_times before raising Error 1370 # after the first recovery attempt failure try to reset b29 each 1371 # iteration 1372 self.b29_device = None 1373 if recover_device: 1374 if b29_serial is None: 1375 logging.error('B29 serial not defined') 1376 raise Error( 1377 'Recovery failed because "b29_serial" definition not ' 1378 'present in device manifest file') 1379 else: 1380 self.b29_device = B29Device(b29_serial) 1381 tries_before_recovery = 5 1382 recovery_tries = 5 1383 for attempt in range(tries_before_recovery): 1384 try: 1385 # build crash symptoms varies based on the nature of the 1386 # crash connectError is thrown if the device never shows up 1387 # in /dev/ sometimes device shows and can connect but 1388 # sending commands fails or crashes apollo in that case, 1389 # DeviceError is thrown 1390 super().__init__(serial_number, commander_port=None, 1391 log_port=None, serial_logger=None) 1392 break 1393 except (ConnectError, DeviceError) as ex: 1394 logging.warning( 1395 'Error initializing apollo object - # of attempt ' 1396 'left : %d' % (tries_before_recovery - attempt - 1)) 1397 if attempt + 1 >= tries_before_recovery: 1398 logging.error( 1399 'Retries exhausted - now attempting to restore ' 1400 'golden image') 1401 for recovery_attempt in range(recovery_tries): 1402 if not self.b29_device.restore_golden_image(): 1403 logging.error('Recovery failed - retrying...') 1404 self.b29_device.reset_charger() 1405 continue 1406 # try to instantiate now 1407 try: 1408 super().__init__(serial_number, 1409 commander_port=None, 1410 log_port=None, 1411 serial_logger=None) 1412 break 1413 except (ConnectError, DeviceError): 1414 if recovery_attempt == recovery_tries - 1: 1415 raise Error( 1416 'Recovery failed - ensure that there ' 1417 'is no mismatching serial numbers of ' 1418 'b29 and b10 is specified in config') 1419 else: 1420 logging.warning( 1421 'Recovery attempt failed - retrying...') 1422 time.sleep(2) 1423 else: 1424 super().__init__(serial_number, commander_port=None, log_port=None, 1425 serial_logger=None) 1426 # set this to prevent sleep 1427 self.set_stay_connected(1) 1428 1429 def get_info(self): 1430 information_dictionary = {} 1431 information_dictionary['type'] = self.dut_type 1432 information_dictionary['serial'] = self.serial_number 1433 information_dictionary['log port'] = self.log_port 1434 information_dictionary['command port'] = self.commander_port 1435 information_dictionary['bluetooth address'] = self.bluetooth_address 1436 success, build_dict = self.get_version() 1437 information_dictionary['build'] = build_dict 1438 # Extract the build number as a separate key. Useful for BigQuery. 1439 information_dictionary['firmware build number'] = build_dict.get( 1440 'build', '9999') 1441 information_dictionary['name'] = self.device_name 1442 if self.b29_device: 1443 information_dictionary['b29 serial'] = self.b29_device.serial 1444 information_dictionary['b29 firmware'] = self.b29_device.fw_version 1445 information_dictionary['b29 commander port'] = self.b29_device.port 1446 information_dictionary[ 1447 'b29 app version'] = self.b29_device.app_version 1448 return information_dictionary 1449 1450 def setup(self, **kwargs): 1451 """ 1452 1453 Args: 1454 apollo_build: if specified, will be used in flashing the device to 1455 that build prior to running any of the tests. If not 1456 specified flashing is skipped. 1457 """ 1458 if 'apollo_build' in kwargs and kwargs['apollo_build'] is not None: 1459 build = kwargs['apollo_build'] 1460 X20_REGEX = re.compile(r'/google/data/') 1461 if not os.path.exists(build) or os.stat(build).st_size == 0: 1462 # if x20 path, retry on file-not-found error or if file size is 1463 # zero b/c X20 path does not update immediately 1464 if X20_REGEX.match(build): 1465 for i in range(20): 1466 # wait until file exists and size is > 0 w/ 6 second 1467 # interval on retry 1468 if os.path.exists(build) and os.stat(build).st_size > 0: 1469 break 1470 1471 if i == 19: 1472 logging.error('Build path (%s) does not exist or ' 1473 'file size is 0 - aborted' % build) 1474 1475 raise Error('Specified build path (%s) does not ' 1476 'exist or file size is 0' % build) 1477 else: 1478 logging.warning('Build path (%s) does not exist or ' 1479 'file size is 0 - retrying...' % 1480 build) 1481 time.sleep(6) 1482 else: 1483 raise Error('Specified build path (%s) does not exist or ' 1484 'file size is 0' % build) 1485 self.flash_from_file(file_name=build, reconnect=True) 1486 else: 1487 logging.info('Not flashing apollo.') 1488 1489 def teardown(self, **kwargs): 1490 self.close() 1491 1492 1493def _evt_hex(cmd): 1494 return 'EvtHex ' + apollo_sink_events.SINK_EVENTS[cmd] 1495 1496 1497def _to_ascii(orig): 1498 # Returned value need to be byte swapped. Remove last octet if it is 0. 1499 result = _byte_swap(orig) 1500 result = result[:-2] if result[-2:] == '00' else result 1501 return bytearray.fromhex(result).decode() 1502 1503 1504def _byte_swap(orig): 1505 """Simple function to swap bytes order. 1506 1507 Args: 1508 orig: original string 1509 1510 Returns: 1511 a string with bytes swapped. 1512 eg. orig = '6557276920736952006f'. 1513 After swap, return '57656927732052696f00' 1514 """ 1515 return ''.join( 1516 sum([(c, d, a, b) for a, b, c, d in zip(*[iter(orig)] * 4)], ())) 1517