1#!/usr/bin/env python3
2
3#   Copyright 2016- The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""A helper module to communicate over telnet with AttenuatorInstruments.
17
18User code shouldn't need to directly access this class.
19"""
20
21import logging
22import telnetlib
23import re
24from acts.controllers import attenuator
25from acts.libs.proc import job
26
27
28def _ascii_string(uc_string):
29    return str(uc_string).encode('ASCII')
30
31
32class _TNHelper(object):
33    """An internal helper class for Telnet+SCPI command-based instruments.
34
35    It should only be used by those implementation control libraries and not by
36    any user code directly.
37    """
38
39    def __init__(self, tx_cmd_separator='\n', rx_cmd_separator='\n',
40                 prompt=''):
41        self._tn = None
42        self._ip_address = None
43        self._port = None
44
45        self.tx_cmd_separator = tx_cmd_separator
46        self.rx_cmd_separator = rx_cmd_separator
47        self.prompt = prompt
48
49    def open(self, host, port=23):
50        self._ip_address = host
51        self._port = port
52        if self._tn:
53            self._tn.close()
54        logging.debug("Telnet Server IP = %s" % host)
55        self._tn = telnetlib.Telnet()
56        self._tn.open(host, port, 10)
57
58    def is_open(self):
59        return bool(self._tn)
60
61    def close(self):
62        if self._tn:
63            self._tn.close()
64            self._tn = None
65
66    def diagnose_telnet(self):
67        """Function that diagnoses telnet connections.
68
69        This function diagnoses telnet connections and can be used in case of
70        command failures. The function checks if the devices is still reachable
71        via ping, and whether or not it can close and reopen the telnet
72        connection.
73
74        Returns:
75            False when telnet server is unreachable or unresponsive
76            True when telnet server is reachable and telnet connection has been
77            successfully reopened
78        """
79        logging.debug('Diagnosing telnet connection')
80        try:
81            job_result = job.run('ping {} -c 5'.format(self._ip_address))
82        except:
83            logging.error("Unable to ping telnet server.")
84            return False
85        ping_output = job_result.stdout
86        if not re.search(r' 0% packet loss', ping_output):
87            logging.error('Ping Packets Lost. Result: {}'.format(ping_output))
88            return False
89        try:
90            self.close()
91        except:
92            logging.error('Cannot close telnet connection.')
93            return False
94        try:
95            self.open(self._ip_address, self._port)
96        except:
97            logging.error('Cannot reopen telnet connection.')
98            return False
99        logging.debug('Telnet connection likely recovered')
100        return True
101
102    def cmd(self, cmd_str, wait_ret=True):
103        if not isinstance(cmd_str, str):
104            raise TypeError('Invalid command string', cmd_str)
105
106        if not self.is_open():
107            raise attenuator.InvalidOperationError(
108                'Telnet connection not open for commands')
109
110        cmd_str.strip(self.tx_cmd_separator)
111        self._tn.read_until(_ascii_string(self.prompt), 2)
112        self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator))
113
114        if wait_ret is False:
115            return None
116
117        match_idx, match_val, ret_text = self._tn.expect(
118            [_ascii_string('\S+' + self.rx_cmd_separator)], 1)
119
120        if match_idx == -1:
121            logging.debug('Telnet Command: {}'.format(cmd_str))
122            logging.debug('Telnet Reply: ({},{},{})'.format(
123                match_idx, match_val, ret_text))
124            self.diagnose_telnet()
125            raise attenuator.InvalidDataError(
126                'Telnet command failed to return valid data')
127
128        ret_text = ret_text.decode()
129        ret_text = ret_text.strip(
130            self.tx_cmd_separator + self.rx_cmd_separator + self.prompt)
131
132        return ret_text