1# 2# Copyright (C) 2017 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import logging 18import socket 19import threading 20 21import httplib2 22from googleapiclient import errors 23 24from host_controller.tfc import command_attempt 25from host_controller.tradefed import remote_operation 26 27 28class InvocationThread(threading.Thread): 29 """The thread that remotely executes a command task. 30 31 Attributes: 32 _remote_client: The RemoteClient which executes the command. 33 _tfc_client: The TfcClient to which the command events are sent. 34 _attempt: The CommandAttempt whose events are sent to TFC. 35 _command: A list of strings, the command and arguments. 36 device_serials: A list of strings, the serial numbers of the devices 37 which need to be allocated to the task. 38 _allocated_serials: A list of strings, the serial numbers of the devices 39 which are successfully allocated. 40 _tfc_heartbeat_interval: The interval of TestRunInProgress events in 41 seconds. 42 """ 43 44 def __init__(self, 45 remote_client, 46 tfc_client, 47 attempt, 48 command, 49 device_serials, 50 tfc_heartbeat_interval=5 * 60): 51 """Initializes the attributes.""" 52 super(InvocationThread, self).__init__() 53 self._remote_client = remote_client 54 self._tfc_client = tfc_client 55 self._attempt = attempt 56 self._command = command 57 self.device_serials = device_serials 58 self._allocated_serials = None 59 # The value in Java implementation is 5 minutes. 60 self._tfc_heartbeat_interval = tfc_heartbeat_interval 61 62 def _AllocateDevices(self): 63 """Allocates all of device_serial.""" 64 for serial in self.device_serials: 65 self._remote_client.SendOperation( 66 remote_operation.AllocateDevice(serial)) 67 self._allocated_serials.append(serial) 68 69 def _StartInvocation(self): 70 """Starts executing command and sends the event to TFC.""" 71 self._remote_client.SendOperation( 72 remote_operation.ExecuteCommand(self.device_serials[0], 73 *self._command)) 74 event = self._attempt.CreateCommandEvent( 75 command_attempt.EventType.INVOCATION_STARTED) 76 self._tfc_client.SubmitCommandEvents([event]) 77 78 def _WaitForCommandResult(self): 79 """Waits for command result and keeps sending heartbeat to TFC 80 81 Returns: 82 A JSON object returned from TradeFed remote manager. 83 """ 84 while True: 85 result = self._remote_client.WaitForCommandResult( 86 self.device_serials[0], self._tfc_heartbeat_interval) 87 if result: 88 return result 89 event = self._attempt.CreateCommandEvent( 90 command_attempt.EventType.TEST_RUN_IN_PROGRESS) 91 self._tfc_client.SubmitCommandEvents([event]) 92 93 def _CompleteInvocation(self, result): 94 """Sends InvocationCompleted event according to the result. 95 96 Args: 97 result: A JSON object returned from TradeFed remote manager. 98 """ 99 if result["status"] == "INVOCATION_SUCCESS": 100 event = self._attempt.CreateInvocationCompletedEvent( 101 str(result), 1, 0) 102 else: 103 event = self._attempt.CreateInvocationCompletedEvent( 104 str(result), 1, 1, error=str(result)) 105 self._tfc_client.SubmitCommandEvents([event]) 106 107 def _FreeAllocatedDevices(self): 108 """Frees allocated devices and tolerates RemoteOperationException.""" 109 for serial in self._allocated_serials: 110 try: 111 self._remote_client.SendOperation( 112 remote_operation.FreeDevice(serial)) 113 except remote_operation.RemoteOperationException as e: 114 logging.exception(e) 115 except socket.error as e: 116 logging.exception(e) 117 break 118 self._allocated_serials = [] 119 120 def _SubmitErrorEvent(self, event_type, error_msg): 121 """Submits an error event and tolerates http exceptions. 122 123 Args: 124 event_type: A string, the type of the command event. 125 error_msg: A string, the error message. 126 """ 127 try: 128 self._tfc_client.SubmitCommandEvents( 129 [self._attempt.CreateCommandEvent(event_type, error_msg)]) 130 except (httplib2.HttpLib2Error, errors.HttpError) as e: 131 logging.exception(e) 132 133 # @Override 134 def run(self): 135 """Executes a command task with exception handling.""" 136 self._allocated_serials = [] 137 last_error = None 138 error_event = command_attempt.EventType.ALLOCATION_FAILED 139 try: 140 self._AllocateDevices() 141 error_event = command_attempt.EventType.EXECUTE_FAILED 142 self._StartInvocation() 143 result = self._WaitForCommandResult() 144 self._CompleteInvocation(result) 145 error_event = None 146 except errors.HttpError as e: 147 logging.exception(e) 148 last_error = e 149 except remote_operation.RemoteOperationException as e: 150 logging.exception(e) 151 last_error = e 152 # ConfigurationException on TradeFed remote manager. 153 if str(e).startswith("Config error: "): 154 error_event = command_attempt.EventType.CONFIGURATION_ERROR 155 except httplib2.HttpLib2Error as e: 156 logging.exception("Cannot communicate with TradeFed cluster: %s\n" 157 "Skip submitting event %s.", e, error_event) 158 last_error = e 159 error_event = None 160 except socket.error as e: 161 logging.exception("Cannot communicate with TradeFed remote " 162 "manager: %s\nSkip freeing devices %s.", 163 e, self._allocated_serials) 164 last_error = e 165 self._allocated_serials = [] 166 finally: 167 if error_event: 168 self._SubmitErrorEvent(error_event, str(last_error)) 169 self._FreeAllocatedDevices() 170