1#
2# Copyright (C) 2017 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import logging
18import socket
19import threading
20
21import httplib2
22from googleapiclient import errors
23
24from host_controller.tfc import command_attempt
25from host_controller.tradefed import remote_operation
26
27
28class InvocationThread(threading.Thread):
29    """The thread that remotely executes a command task.
30
31    Attributes:
32        _remote_client: The RemoteClient which executes the command.
33        _tfc_client: The TfcClient to which the command events are sent.
34        _attempt: The CommandAttempt whose events are sent to TFC.
35        _command: A list of strings, the command and arguments.
36        device_serials: A list of strings, the serial numbers of the devices
37                        which need to be allocated to the task.
38        _allocated_serials: A list of strings, the serial numbers of the devices
39                            which are successfully allocated.
40        _tfc_heartbeat_interval: The interval of TestRunInProgress events in
41                                 seconds.
42    """
43
44    def __init__(self,
45                 remote_client,
46                 tfc_client,
47                 attempt,
48                 command,
49                 device_serials,
50                 tfc_heartbeat_interval=5 * 60):
51        """Initializes the attributes."""
52        super(InvocationThread, self).__init__()
53        self._remote_client = remote_client
54        self._tfc_client = tfc_client
55        self._attempt = attempt
56        self._command = command
57        self.device_serials = device_serials
58        self._allocated_serials = None
59        # The value in Java implementation is 5 minutes.
60        self._tfc_heartbeat_interval = tfc_heartbeat_interval
61
62    def _AllocateDevices(self):
63        """Allocates all of device_serial."""
64        for serial in self.device_serials:
65            self._remote_client.SendOperation(
66                    remote_operation.AllocateDevice(serial))
67            self._allocated_serials.append(serial)
68
69    def _StartInvocation(self):
70        """Starts executing command and sends the event to TFC."""
71        self._remote_client.SendOperation(
72                remote_operation.ExecuteCommand(self.device_serials[0],
73                                                *self._command))
74        event = self._attempt.CreateCommandEvent(
75                command_attempt.EventType.INVOCATION_STARTED)
76        self._tfc_client.SubmitCommandEvents([event])
77
78    def _WaitForCommandResult(self):
79        """Waits for command result and keeps sending heartbeat to TFC
80
81        Returns:
82            A JSON object returned from TradeFed remote manager.
83        """
84        while True:
85            result = self._remote_client.WaitForCommandResult(
86                    self.device_serials[0], self._tfc_heartbeat_interval)
87            if result:
88                return result
89            event = self._attempt.CreateCommandEvent(
90                    command_attempt.EventType.TEST_RUN_IN_PROGRESS)
91            self._tfc_client.SubmitCommandEvents([event])
92
93    def _CompleteInvocation(self, result):
94        """Sends InvocationCompleted event according to the result.
95
96        Args:
97            result: A JSON object returned from TradeFed remote manager.
98        """
99        if result["status"] == "INVOCATION_SUCCESS":
100            event = self._attempt.CreateInvocationCompletedEvent(
101                    str(result), 1, 0)
102        else:
103            event = self._attempt.CreateInvocationCompletedEvent(
104                    str(result), 1, 1, error=str(result))
105        self._tfc_client.SubmitCommandEvents([event])
106
107    def _FreeAllocatedDevices(self):
108        """Frees allocated devices and tolerates RemoteOperationException."""
109        for serial in self._allocated_serials:
110            try:
111                self._remote_client.SendOperation(
112                        remote_operation.FreeDevice(serial))
113            except remote_operation.RemoteOperationException as e:
114                logging.exception(e)
115            except socket.error as e:
116                logging.exception(e)
117                break
118        self._allocated_serials = []
119
120    def _SubmitErrorEvent(self, event_type, error_msg):
121        """Submits an error event and tolerates http exceptions.
122
123        Args:
124            event_type: A string, the type of the command event.
125            error_msg: A string, the error message.
126        """
127        try:
128            self._tfc_client.SubmitCommandEvents(
129                [self._attempt.CreateCommandEvent(event_type, error_msg)])
130        except (httplib2.HttpLib2Error, errors.HttpError) as e:
131            logging.exception(e)
132
133    # @Override
134    def run(self):
135        """Executes a command task with exception handling."""
136        self._allocated_serials = []
137        last_error = None
138        error_event = command_attempt.EventType.ALLOCATION_FAILED
139        try:
140            self._AllocateDevices()
141            error_event = command_attempt.EventType.EXECUTE_FAILED
142            self._StartInvocation()
143            result = self._WaitForCommandResult()
144            self._CompleteInvocation(result)
145            error_event = None
146        except errors.HttpError as e:
147            logging.exception(e)
148            last_error = e
149        except remote_operation.RemoteOperationException as e:
150            logging.exception(e)
151            last_error = e
152            # ConfigurationException on TradeFed remote manager.
153            if str(e).startswith("Config error: "):
154                error_event = command_attempt.EventType.CONFIGURATION_ERROR
155        except httplib2.HttpLib2Error as e:
156            logging.exception("Cannot communicate with TradeFed cluster: %s\n"
157                              "Skip submitting event %s.", e, error_event)
158            last_error = e
159            error_event = None
160        except socket.error as e:
161            logging.exception("Cannot communicate with TradeFed remote "
162                              "manager: %s\nSkip freeing devices %s.",
163                              e, self._allocated_serials)
164            last_error = e
165            self._allocated_serials = []
166        finally:
167            if error_event:
168                self._SubmitErrorEvent(error_event, str(last_error))
169            self._FreeAllocatedDevices()
170