1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import contextlib
18import io
19import time
20from acts import logger
21from acts import utils
22
23SHORT_SLEEP = 1
24CHAMBER_SLEEP = 30
25
26
27def create(configs):
28    """Factory method for OTA chambers.
29
30    Args:
31        configs: list of dicts with chamber settings. settings must contain the
32        following: type (string denoting type of chamber)
33    """
34    objs = []
35    for config in configs:
36        try:
37            chamber_class = globals()[config['model']]
38        except KeyError:
39            raise KeyError('Invalid chamber configuration.')
40        objs.append(chamber_class(config))
41    return objs
42
43
44def detroy(objs):
45    return
46
47
48class OtaChamber(object):
49    """Base class implementation for OTA chamber.
50
51    Base class provides functions whose implementation is shared by all
52    chambers.
53    """
54    def reset_chamber(self):
55        """Resets the chamber to its zero/home state."""
56        raise NotImplementedError
57
58    def set_orientation(self, orientation):
59        """Set orientation for turn table in OTA chamber.
60
61        Args:
62            angle: desired turn table orientation in degrees
63        """
64        raise NotImplementedError
65
66    def set_stirrer_pos(self, stirrer_id, position):
67        """Starts turntables and stirrers in OTA chamber."""
68        raise NotImplementedError
69
70    def start_continuous_stirrers(self):
71        """Starts turntables and stirrers in OTA chamber."""
72        raise NotImplementedError
73
74    def stop_continuous_stirrers(self):
75        """Stops turntables and stirrers in OTA chamber."""
76        raise NotImplementedError
77
78    def step_stirrers(self, steps):
79        """Move stepped stirrers in OTA chamber to next step."""
80        raise NotImplementedError
81
82
83class MockChamber(OtaChamber):
84    """Class that implements mock chamber for test development and debug."""
85    def __init__(self, config):
86        self.config = config.copy()
87        self.device_id = self.config['device_id']
88        self.log = logger.create_tagged_trace_logger('OtaChamber|{}'.format(
89            self.device_id))
90        self.current_mode = None
91
92    def set_orientation(self, orientation):
93        self.log.info('Setting orientation to {} degrees.'.format(orientation))
94
95    def reset_chamber(self):
96        self.log.info('Resetting chamber to home state')
97
98    def set_stirrer_pos(self, stirrer_id, position):
99        """Starts turntables and stirrers in OTA chamber."""
100        self.log.info('Setting stirrer {} to {}.'.format(stirrer_id, position))
101
102    def start_continuous_stirrers(self):
103        """Starts turntables and stirrers in OTA chamber."""
104        self.log.info('Starting continuous stirrer motion')
105
106    def stop_continuous_stirrers(self):
107        """Stops turntables and stirrers in OTA chamber."""
108        self.log.info('Stopping continuous stirrer motion')
109
110    def configure_stepped_stirrers(self, steps):
111        """Programs parameters for stepped stirrers in OTA chamber."""
112        self.log.info('Configuring stepped stirrers')
113
114    def step_stirrers(self, steps):
115        """Move stepped stirrers in OTA chamber to next step."""
116        self.log.info('Moving stirrers to the next step')
117
118
119class OctoboxChamber(OtaChamber):
120    """Class that implements Octobox chamber."""
121    def __init__(self, config):
122        self.config = config.copy()
123        self.device_id = self.config['device_id']
124        self.log = logger.create_tagged_trace_logger('OtaChamber|{}'.format(
125            self.device_id))
126        self.TURNTABLE_FILE_PATH = '/usr/local/bin/fnPerformaxCmd'
127        utils.exe_cmd('sudo {} -d {} -i 0'.format(self.TURNTABLE_FILE_PATH,
128                                                  self.device_id))
129        self.current_mode = None
130
131    def set_orientation(self, orientation):
132        self.log.info('Setting orientation to {} degrees.'.format(orientation))
133        utils.exe_cmd('sudo {} -d {} -p {}'.format(self.TURNTABLE_FILE_PATH,
134                                                   self.device_id,
135                                                   orientation))
136
137    def reset_chamber(self):
138        self.log.info('Resetting chamber to home state')
139        self.set_orientation(0)
140
141
142class ChamberAutoConnect(object):
143    def __init__(self, chamber, chamber_config):
144        self._chamber = chamber
145        self._config = chamber_config
146
147    def __getattr__(self, item):
148        def chamber_call(*args, **kwargs):
149            self._chamber.connect(self._config['ip_address'],
150                                  self._config['username'],
151                                  self._config['password'])
152            return getattr(self._chamber, item)(*args, **kwargs)
153
154        return chamber_call
155
156
157class BluetestChamber(OtaChamber):
158    """Class that implements Octobox chamber."""
159    def __init__(self, config):
160        import flow
161        self.config = config.copy()
162        self.log = logger.create_tagged_trace_logger('OtaChamber|{}'.format(
163            self.config['ip_address']))
164        self.chamber = ChamberAutoConnect(flow.Flow(), self.config)
165        self.stirrer_ids = [0, 1, 2]
166        self.current_mode = None
167
168    # Capture print output decorator
169    @staticmethod
170    def _capture_output(func, *args, **kwargs):
171        """Creates a decorator to capture stdout from bluetest module"""
172        f = io.StringIO()
173        with contextlib.redirect_stdout(f):
174            func(*args, **kwargs)
175        output = f.getvalue()
176        return output
177
178    def _connect(self):
179        self.chamber.connect(self.config['ip_address'],
180                             self.config['username'], self.config['password'])
181
182    def _init_manual_mode(self):
183        self.current_mode = 'manual'
184        for stirrer_id in self.stirrer_ids:
185            out = self._capture_output(
186                self.chamber.chamber_stirring_manual_init, stirrer_id)
187            if "failed" in out:
188                self.log.warning("Initialization error: {}".format(out))
189        time.sleep(CHAMBER_SLEEP)
190
191    def _init_continuous_mode(self):
192        self.current_mode = 'continuous'
193        self.chamber.chamber_stirring_continuous_init()
194
195    def _init_stepped_mode(self, steps):
196        self.current_mode = 'stepped'
197        self.current_stepped_pos = 0
198        self.chamber.chamber_stirring_stepped_init(steps, False)
199
200    def set_stirrer_pos(self, stirrer_id, position):
201        if self.current_mode != 'manual':
202            self._init_manual_mode()
203        self.log.info('Setting stirrer {} to {}.'.format(stirrer_id, position))
204        out = self._capture_output(
205            self.chamber.chamber_stirring_manual_set_pos, stirrer_id, position)
206        if "failed" in out:
207            self.log.warning("Bluetest error: {}".format(out))
208            self.log.warning("Set position failed. Retrying.")
209            self.current_mode = None
210            self.set_stirrer_pos(stirrer_id, position)
211        else:
212            self._capture_output(self.chamber.chamber_stirring_manual_wait,
213                                 CHAMBER_SLEEP)
214            self.log.warning('Stirrer {} at {}.'.format(stirrer_id, position))
215
216    def set_orientation(self, orientation):
217        self.set_stirrer_pos(2, orientation * 100 / 360)
218
219    def start_continuous_stirrers(self):
220        if self.current_mode != 'continuous':
221            self._init_continuous_mode()
222        self.chamber.chamber_stirring_continuous_start()
223
224    def stop_continuous_stirrers(self):
225        self.chamber.chamber_stirring_continuous_stop()
226
227    def step_stirrers(self, steps):
228        if self.current_mode != 'stepped':
229            self._init_stepped_mode(steps)
230        if self.current_stepped_pos == 0:
231            self.current_stepped_pos += 1
232            return
233        self.current_stepped_pos += 1
234        self.chamber.chamber_stirring_stepped_next_pos()
235
236    def reset_chamber(self):
237        if self.current_mode == 'continuous':
238            self._init_continuous_mode()
239            time.sleep(SHORT_SLEEP)
240            self._init_continuous_mode()
241        else:
242            self._init_manual_mode()