1#!/usr/bin/env python3 2 3# Copyright 2016- The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""A helper module to communicate over telnet with AttenuatorInstruments. 17 18User code shouldn't need to directly access this class. 19""" 20 21import logging 22import telnetlib 23import re 24from acts.controllers import attenuator 25from acts.libs.proc import job 26 27 28def _ascii_string(uc_string): 29 return str(uc_string).encode('ASCII') 30 31 32class _TNHelper(object): 33 """An internal helper class for Telnet+SCPI command-based instruments. 34 35 It should only be used by those implementation control libraries and not by 36 any user code directly. 37 """ 38 39 def __init__(self, tx_cmd_separator='\n', rx_cmd_separator='\n', 40 prompt=''): 41 self._tn = None 42 self._ip_address = None 43 self._port = None 44 45 self.tx_cmd_separator = tx_cmd_separator 46 self.rx_cmd_separator = rx_cmd_separator 47 self.prompt = prompt 48 49 def open(self, host, port=23): 50 self._ip_address = host 51 self._port = port 52 if self._tn: 53 self._tn.close() 54 logging.debug("Telnet Server IP = %s" % host) 55 self._tn = telnetlib.Telnet() 56 self._tn.open(host, port, 10) 57 58 def is_open(self): 59 return bool(self._tn) 60 61 def close(self): 62 if self._tn: 63 self._tn.close() 64 self._tn = None 65 66 def diagnose_telnet(self): 67 """Function that diagnoses telnet connections. 68 69 This function diagnoses telnet connections and can be used in case of 70 command failures. The function checks if the devices is still reachable 71 via ping, and whether or not it can close and reopen the telnet 72 connection. 73 74 Returns: 75 False when telnet server is unreachable or unresponsive 76 True when telnet server is reachable and telnet connection has been 77 successfully reopened 78 """ 79 logging.debug('Diagnosing telnet connection') 80 try: 81 job_result = job.run('ping {} -c 5'.format(self._ip_address)) 82 except: 83 logging.error("Unable to ping telnet server.") 84 return False 85 ping_output = job_result.stdout 86 if not re.search(r' 0% packet loss', ping_output): 87 logging.error('Ping Packets Lost. Result: {}'.format(ping_output)) 88 return False 89 try: 90 self.close() 91 except: 92 logging.error('Cannot close telnet connection.') 93 return False 94 try: 95 self.open(self._ip_address, self._port) 96 except: 97 logging.error('Cannot reopen telnet connection.') 98 return False 99 logging.debug('Telnet connection likely recovered') 100 return True 101 102 def cmd(self, cmd_str, wait_ret=True): 103 if not isinstance(cmd_str, str): 104 raise TypeError('Invalid command string', cmd_str) 105 106 if not self.is_open(): 107 raise attenuator.InvalidOperationError( 108 'Telnet connection not open for commands') 109 110 cmd_str.strip(self.tx_cmd_separator) 111 self._tn.read_until(_ascii_string(self.prompt), 2) 112 self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator)) 113 114 if wait_ret is False: 115 return None 116 117 match_idx, match_val, ret_text = self._tn.expect( 118 [_ascii_string('\S+' + self.rx_cmd_separator)], 1) 119 120 if match_idx == -1: 121 logging.debug('Telnet Command: {}'.format(cmd_str)) 122 logging.debug('Telnet Reply: ({},{},{})'.format( 123 match_idx, match_val, ret_text)) 124 self.diagnose_telnet() 125 raise attenuator.InvalidDataError( 126 'Telnet command failed to return valid data') 127 128 ret_text = ret_text.decode() 129 ret_text = ret_text.strip( 130 self.tx_cmd_separator + self.rx_cmd_separator + self.prompt) 131 132 return ret_text