1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import contextlib 18import io 19import time 20from acts import logger 21from acts import utils 22 23SHORT_SLEEP = 1 24CHAMBER_SLEEP = 30 25 26 27def create(configs): 28 """Factory method for OTA chambers. 29 30 Args: 31 configs: list of dicts with chamber settings. settings must contain the 32 following: type (string denoting type of chamber) 33 """ 34 objs = [] 35 for config in configs: 36 try: 37 chamber_class = globals()[config['model']] 38 except KeyError: 39 raise KeyError('Invalid chamber configuration.') 40 objs.append(chamber_class(config)) 41 return objs 42 43 44def detroy(objs): 45 return 46 47 48class OtaChamber(object): 49 """Base class implementation for OTA chamber. 50 51 Base class provides functions whose implementation is shared by all 52 chambers. 53 """ 54 def reset_chamber(self): 55 """Resets the chamber to its zero/home state.""" 56 raise NotImplementedError 57 58 def set_orientation(self, orientation): 59 """Set orientation for turn table in OTA chamber. 60 61 Args: 62 angle: desired turn table orientation in degrees 63 """ 64 raise NotImplementedError 65 66 def set_stirrer_pos(self, stirrer_id, position): 67 """Starts turntables and stirrers in OTA chamber.""" 68 raise NotImplementedError 69 70 def start_continuous_stirrers(self): 71 """Starts turntables and stirrers in OTA chamber.""" 72 raise NotImplementedError 73 74 def stop_continuous_stirrers(self): 75 """Stops turntables and stirrers in OTA chamber.""" 76 raise NotImplementedError 77 78 def step_stirrers(self, steps): 79 """Move stepped stirrers in OTA chamber to next step.""" 80 raise NotImplementedError 81 82 83class MockChamber(OtaChamber): 84 """Class that implements mock chamber for test development and debug.""" 85 def __init__(self, config): 86 self.config = config.copy() 87 self.device_id = self.config['device_id'] 88 self.log = logger.create_tagged_trace_logger('OtaChamber|{}'.format( 89 self.device_id)) 90 self.current_mode = None 91 92 def set_orientation(self, orientation): 93 self.log.info('Setting orientation to {} degrees.'.format(orientation)) 94 95 def reset_chamber(self): 96 self.log.info('Resetting chamber to home state') 97 98 def set_stirrer_pos(self, stirrer_id, position): 99 """Starts turntables and stirrers in OTA chamber.""" 100 self.log.info('Setting stirrer {} to {}.'.format(stirrer_id, position)) 101 102 def start_continuous_stirrers(self): 103 """Starts turntables and stirrers in OTA chamber.""" 104 self.log.info('Starting continuous stirrer motion') 105 106 def stop_continuous_stirrers(self): 107 """Stops turntables and stirrers in OTA chamber.""" 108 self.log.info('Stopping continuous stirrer motion') 109 110 def configure_stepped_stirrers(self, steps): 111 """Programs parameters for stepped stirrers in OTA chamber.""" 112 self.log.info('Configuring stepped stirrers') 113 114 def step_stirrers(self, steps): 115 """Move stepped stirrers in OTA chamber to next step.""" 116 self.log.info('Moving stirrers to the next step') 117 118 119class OctoboxChamber(OtaChamber): 120 """Class that implements Octobox chamber.""" 121 def __init__(self, config): 122 self.config = config.copy() 123 self.device_id = self.config['device_id'] 124 self.log = logger.create_tagged_trace_logger('OtaChamber|{}'.format( 125 self.device_id)) 126 self.TURNTABLE_FILE_PATH = '/usr/local/bin/fnPerformaxCmd' 127 utils.exe_cmd('sudo {} -d {} -i 0'.format(self.TURNTABLE_FILE_PATH, 128 self.device_id)) 129 self.current_mode = None 130 131 def set_orientation(self, orientation): 132 self.log.info('Setting orientation to {} degrees.'.format(orientation)) 133 utils.exe_cmd('sudo {} -d {} -p {}'.format(self.TURNTABLE_FILE_PATH, 134 self.device_id, 135 orientation)) 136 137 def reset_chamber(self): 138 self.log.info('Resetting chamber to home state') 139 self.set_orientation(0) 140 141 142class ChamberAutoConnect(object): 143 def __init__(self, chamber, chamber_config): 144 self._chamber = chamber 145 self._config = chamber_config 146 147 def __getattr__(self, item): 148 def chamber_call(*args, **kwargs): 149 self._chamber.connect(self._config['ip_address'], 150 self._config['username'], 151 self._config['password']) 152 return getattr(self._chamber, item)(*args, **kwargs) 153 154 return chamber_call 155 156 157class BluetestChamber(OtaChamber): 158 """Class that implements Octobox chamber.""" 159 def __init__(self, config): 160 import flow 161 self.config = config.copy() 162 self.log = logger.create_tagged_trace_logger('OtaChamber|{}'.format( 163 self.config['ip_address'])) 164 self.chamber = ChamberAutoConnect(flow.Flow(), self.config) 165 self.stirrer_ids = [0, 1, 2] 166 self.current_mode = None 167 168 # Capture print output decorator 169 @staticmethod 170 def _capture_output(func, *args, **kwargs): 171 """Creates a decorator to capture stdout from bluetest module""" 172 f = io.StringIO() 173 with contextlib.redirect_stdout(f): 174 func(*args, **kwargs) 175 output = f.getvalue() 176 return output 177 178 def _connect(self): 179 self.chamber.connect(self.config['ip_address'], 180 self.config['username'], self.config['password']) 181 182 def _init_manual_mode(self): 183 self.current_mode = 'manual' 184 for stirrer_id in self.stirrer_ids: 185 out = self._capture_output( 186 self.chamber.chamber_stirring_manual_init, stirrer_id) 187 if "failed" in out: 188 self.log.warning("Initialization error: {}".format(out)) 189 time.sleep(CHAMBER_SLEEP) 190 191 def _init_continuous_mode(self): 192 self.current_mode = 'continuous' 193 self.chamber.chamber_stirring_continuous_init() 194 195 def _init_stepped_mode(self, steps): 196 self.current_mode = 'stepped' 197 self.current_stepped_pos = 0 198 self.chamber.chamber_stirring_stepped_init(steps, False) 199 200 def set_stirrer_pos(self, stirrer_id, position): 201 if self.current_mode != 'manual': 202 self._init_manual_mode() 203 self.log.info('Setting stirrer {} to {}.'.format(stirrer_id, position)) 204 out = self._capture_output( 205 self.chamber.chamber_stirring_manual_set_pos, stirrer_id, position) 206 if "failed" in out: 207 self.log.warning("Bluetest error: {}".format(out)) 208 self.log.warning("Set position failed. Retrying.") 209 self.current_mode = None 210 self.set_stirrer_pos(stirrer_id, position) 211 else: 212 self._capture_output(self.chamber.chamber_stirring_manual_wait, 213 CHAMBER_SLEEP) 214 self.log.warning('Stirrer {} at {}.'.format(stirrer_id, position)) 215 216 def set_orientation(self, orientation): 217 self.set_stirrer_pos(2, orientation * 100 / 360) 218 219 def start_continuous_stirrers(self): 220 if self.current_mode != 'continuous': 221 self._init_continuous_mode() 222 self.chamber.chamber_stirring_continuous_start() 223 224 def stop_continuous_stirrers(self): 225 self.chamber.chamber_stirring_continuous_stop() 226 227 def step_stirrers(self, steps): 228 if self.current_mode != 'stepped': 229 self._init_stepped_mode(steps) 230 if self.current_stepped_pos == 0: 231 self.current_stepped_pos += 1 232 return 233 self.current_stepped_pos += 1 234 self.chamber.chamber_stirring_stepped_next_pos() 235 236 def reset_chamber(self): 237 if self.current_mode == 'continuous': 238 self._init_continuous_mode() 239 time.sleep(SHORT_SLEEP) 240 self._init_continuous_mode() 241 else: 242 self._init_manual_mode()