#!/usr/bin/python # Copyright (C) 2014 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import fcntl import logging logging.getLogger().setLevel(logging.ERROR) import os.path import select import stat import struct import sys import time import collections import socket import glob import signal import serial # http://pyserial.sourceforge.net/ #Set to True if you want log output to go to screen: LOG_TO_SCREEN = False TIMEOUT_SERIAL = 1 #seconds #ignore SIG CONTINUE signals for signum in [signal.SIGCONT]: signal.signal(signum, signal.SIG_IGN) try: from . import Abstract_Power_Monitor except: sys.exit("You cannot run 'monsoon.py' directly. Run 'execut_power_tests.py' instead.") class Power_Monitor(Abstract_Power_Monitor): """ Provides a simple class to use the power meter, e.g. mon = monsoon.Power_Monitor() mon.SetVoltage(3.7) mon.StartDataCollection() mydata = [] while len(mydata) < 1000: mydata.extend(mon.CollectData()) mon.StopDataCollection() """ _do_log = False @staticmethod def lock( device ): tmpname = "/tmp/monsoon.%s.%s" % ( os.uname()[0], os.path.basename(device)) lockfile = open(tmpname, "w") try: # use a lockfile to ensure exclusive access fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) logging.debug("Locked device %s"%device) except IOError as e: self.log("device %s is in use" % dev) sys.exit('device in use') return lockfile def to_string(self): return self._devicename def __init__(self, device = None, wait = False, log_file_id= None ): """ Establish a connection to a Power_Monitor. By default, opens the first available port, waiting if none are ready. A particular port can be specified with "device". With wait=0, IOError is thrown if a device is not immediately available. """ self._lockfile = None self._logfile = None self.ser = None for signum in [signal.SIGALRM, signal.SIGHUP, signal.SIGINT, signal.SIGILL, signal.SIGQUIT, signal.SIGTRAP,signal.SIGABRT, signal.SIGIOT, signal.SIGBUS, signal.SIGFPE, signal.SIGSEGV, signal.SIGUSR2, signal.SIGPIPE, signal.SIGTERM]: signal.signal(signum, self.handle_signal) self._coarse_ref = self._fine_ref = self._coarse_zero = self._fine_zero = 0 self._coarse_scale = self._fine_scale = 0 self._last_seq = 0 self.start_voltage = 0 if device: if isinstance( device, serial.Serial ): self.ser = device else: device_list = None while not device_list: device_list = Power_Monitor.Discover() if not device_list and wait: time.sleep(1.0) logging.info("No power monitor serial devices found. Retrying...") elif not device_list and not wait: logging.error("No power monitor serial devices found. Exiting") self.Close() sys.exit("No power monitor serial devices found") if device_list: if len(device_list) > 1: logging.error("=======================================") logging.error("More than one power monitor discovered!") logging.error("Test may not execute properly.Aborting test.") logging.error("=======================================") sys.exit("More than one power monitor connected.") device = device_list[0].to_string() # choose the first one if len(device_list) > 1: logging.info("More than one device found. Using %s"%device) else: logging.info("Power monitor @ %s"%device) else: raise IOError("No device found") self._lockfile = Power_Monitor.lock( device ) if log_file_id is not None: self._logfilename = "/tmp/monsoon_%s_%s.%s.log" % (os.uname()[0], os.path.basename(device), log_file_id) self._logfile = open(self._logfilename,'a') else: self._logfile = None try: self.ser = serial.Serial(device, timeout= TIMEOUT_SERIAL) except Exception as e: self.log( "error opening device %s: %s" % (dev, e)) self._lockfile.close() raise logging.debug("Setting up power monitor...") self._devicename = device #just in case, stop any active data collection on monsoon self._dataCollectionActive = True self.StopDataCollection() logging.debug("Flushing input...") self._FlushInput() # discard stale input logging.debug("Getting status....") status = self.GetStatus() if not status: self.log( "no response from device %s" % device) self._lockfile.close() raise IOError("Failed to get status from device") self.start_voltage = status["voltage1"] def __del__(self): self.Close() def Close(self): if self._logfile: print("=============\n"+\ "Power Monitor log file can be found at '%s'"%self._logfilename + "=============\n") self._logfile.close() self._logfile = None if (self.ser): #self.StopDataCollection() self.ser.flush() self.ser.close() self.ser = None if self._lockfile: self._lockfile.close() def log(self, msg , debug = False): if self._logfile: self._logfile.write( msg + "\n") if not debug and LOG_TO_SCREEN: logging.error( msg ) else: logging.debug(msg) def handle_signal( self, signum, frame): if self.ser: self.ser.flush() self.ser.close() self.ser = None self.log("Got signal %d"%signum) sys.exit("\nGot signal %d\n"%signum) @staticmethod def Discover(): monsoon_list = [] elapsed = 0 logging.info("Discovering power monitor(s)...") ser_device_list = glob.glob("/dev/ttyACM*") logging.info("Seeking devices %s"%ser_device_list) for dev in ser_device_list: try: lockfile = Power_Monitor.lock( dev ) except: logging.info( "... device %s in use, skipping"%dev) continue tries = 0 ser = None while ser is None and tries < 100: try: # try to open the device ser = serial.Serial( dev, timeout=TIMEOUT_SERIAL) except Exception as e: logging.error( "error opening device %s: %s" % (dev, e) ) tries += 1 time.sleep(2); ser = None logging.info("... found device %s"%dev) lockfile.close()#will be re-locked once monsoon instance created logging.debug("unlocked") if not ser: continue if ser is not None: try: monsoon = Power_Monitor(device = dev) status = monsoon.GetStatus() if not status: monsoon.log("... no response from device %s, skipping") continue else: logging.info("... found power monitor @ %s"%dev) monsoon_list.append( monsoon ) except: import traceback traceback.print_exc() logging.error("... %s appears to not be a monsoon device"%dev) logging.debug("Returning list of %s"%monsoon_list) return monsoon_list def GetStatus(self): """ Requests and waits for status. Returns status dictionary. """ # status packet format self.log("Getting status...", debug = True) STATUS_FORMAT = ">BBBhhhHhhhHBBBxBbHBHHHHBbbHHBBBbbbbbbbbbBH" STATUS_FIELDS = [ "packetType", "firmwareVersion", "protocolVersion", "mainFineCurrent", "usbFineCurrent", "auxFineCurrent", "voltage1", "mainCoarseCurrent", "usbCoarseCurrent", "auxCoarseCurrent", "voltage2", "outputVoltageSetting", "temperature", "status", "leds", "mainFineResistor", "serialNumber", "sampleRate", "dacCalLow", "dacCalHigh", "powerUpCurrentLimit", "runTimeCurrentLimit", "powerUpTime", "usbFineResistor", "auxFineResistor", "initialUsbVoltage", "initialAuxVoltage", "hardwareRevision", "temperatureLimit", "usbPassthroughMode", "mainCoarseResistor", "usbCoarseResistor", "auxCoarseResistor", "defMainFineResistor", "defUsbFineResistor", "defAuxFineResistor", "defMainCoarseResistor", "defUsbCoarseResistor", "defAuxCoarseResistor", "eventCode", "eventData", ] self._SendStruct("BBB", 0x01, 0x00, 0x00) while True: # Keep reading, discarding non-status packets bytes = self._ReadPacket() if not bytes: return None if len(bytes) != struct.calcsize(STATUS_FORMAT) or bytes[0] != "\x10": self.log("wanted status, dropped type=0x%02x, len=%d" % ( ord(bytes[0]), len(bytes))) continue status = dict(zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, bytes))) assert status["packetType"] == 0x10 for k in status.keys(): if k.endswith("VoltageSetting"): status[k] = 2.0 + status[k] * 0.01 elif k.endswith("FineCurrent"): pass # needs calibration data elif k.endswith("CoarseCurrent"): pass # needs calibration data elif k.startswith("voltage") or k.endswith("Voltage"): status[k] = status[k] * 0.000125 elif k.endswith("Resistor"): status[k] = 0.05 + status[k] * 0.0001 if k.startswith("aux") or k.startswith("defAux"): status[k] += 0.05 elif k.endswith("CurrentLimit"): status[k] = 8 * (1023 - status[k]) / 1023.0 #self.log( "Returning requested status: \n %s"%(status), debug = True) return status def RampVoltage(self, start, end): v = start if v < 3.0: v = 3.0 # protocol doesn't support lower than this while (v < end): self.SetVoltage(v) v += .1 time.sleep(.1) self.SetVoltage(end) def SetVoltage(self, v): """ Set the output voltage, 0 to disable. """ self.log("Setting voltage to %s..."%v, debug = True) if v == 0: self._SendStruct("BBB", 0x01, 0x01, 0x00) else: self._SendStruct("BBB", 0x01, 0x01, int((v - 2.0) * 100)) self.log("...Set voltage", debug = True) def SetMaxCurrent(self, i): """Set the max output current.""" assert i >= 0 and i <= 8 self.log("Setting max current to %s..."%i, debug = True) val = 1023 - int((i/8)*1023) self._SendStruct("BBB", 0x01, 0x0a, val & 0xff) self._SendStruct("BBB", 0x01, 0x0b, val >> 8) self.log("...Set max current.", debug = True) def SetUsbPassthrough(self, val): """ Set the USB passthrough mode: 0 = off, 1 = on, 2 = auto. """ self._SendStruct("BBB", 0x01, 0x10, val) def StartDataCollection(self): """ Tell the device to start collecting and sending measurement data. """ self.log("Starting data collection...", debug = True) self._SendStruct("BBB", 0x01, 0x1b, 0x01) # Mystery command self._SendStruct("BBBBBBB", 0x02, 0xff, 0xff, 0xff, 0xff, 0x03, 0xe8) self.log("...started", debug = True) self._dataCollectionActive = True def StopDataCollection(self): """ Tell the device to stop collecting measurement data. """ self._SendStruct("BB", 0x03, 0x00) # stop if self._dataCollectionActive: while self.CollectData(False) is not None: pass self._dataCollectionActive = False def CollectData(self, verbose = True): """ Return some current samples. Call StartDataCollection() first. """ #self.log("Collecting data ...", debug = True) while True: # loop until we get data or a timeout bytes = self._ReadPacket(verbose) if not bytes: return None if len(bytes) < 4 + 8 + 1 or bytes[0] < "\x20" or bytes[0] > "\x2F": if verbose: self.log( "wanted data, dropped type=0x%02x, len=%d" % ( ord(bytes[0]), len(bytes)), debug=verbose) continue seq, type, x, y = struct.unpack("BBBB", bytes[:4]) data = [struct.unpack(">hhhh", bytes[x:x+8]) for x in range(4, len(bytes) - 8, 8)] if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF: self.log( "data sequence skipped, lost packet?" ) self._last_seq = seq if type == 0: if not self._coarse_scale or not self._fine_scale: self.log("waiting for calibration, dropped data packet") continue out = [] for main, usb, aux, voltage in data: if main & 1: out.append(((main & ~1) - self._coarse_zero) * self._coarse_scale) else: out.append((main - self._fine_zero) * self._fine_scale) #self.log("...Collected %d samples"%(len(out)), debug = True) return out elif type == 1: self._fine_zero = data[0][0] self._coarse_zero = data[1][0] elif type == 2: self._fine_ref = data[0][0] self._coarse_ref = data[1][0] else: self.log( "discarding data packet type=0x%02x" % type) continue if self._coarse_ref != self._coarse_zero: self._coarse_scale = 2.88 / (self._coarse_ref - self._coarse_zero) if self._fine_ref != self._fine_zero: self._fine_scale = 0.0332 / (self._fine_ref - self._fine_zero) def _SendStruct(self, fmt, *args): """ Pack a struct (without length or checksum) and send it. """ data = struct.pack(fmt, *args) data_len = len(data) + 1 checksum = (data_len + sum(struct.unpack("B" * len(data), data))) % 256 out = struct.pack("B", data_len) + data + struct.pack("B", checksum) self.ser.write(out) self.ser.flush() def _ReadPacket(self, verbose = True): """ Read a single data record as a string (without length or checksum). """ len_char = self.ser.read(1) if not len_char: if verbose: self.log( "timeout reading from serial port" ) return None data_len = struct.unpack("B", len_char) data_len = ord(len_char) if not data_len: return "" result = self.ser.read(data_len) if len(result) != data_len: return None body = result[:-1] checksum = (data_len + sum(struct.unpack("B" * len(body), body))) % 256 if result[-1] != struct.pack("B", checksum): self.log( "Invalid checksum from serial port" ) return None return result[:-1] def _FlushInput(self): """ Flush all read data until no more available. """ self.ser.flushInput() flushed = 0 self.log("Flushing input...", debug = True) while True: ready_r, ready_w, ready_x = select.select([self.ser], [], [self.ser], 0) if len(ready_x) > 0: self.log( "exception from serial port" ) return None elif len(ready_r) > 0: flushed += 1 self.ser.read(1) # This may cause underlying buffering. self.ser.flush() # Flush the underlying buffer too. else: break if flushed > 0: self.log( "flushed >%d bytes" % flushed, debug = True )