1#
2# Copyright (C) 2017 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import logging
18import socket
19import time
20import uuid
21
22import httplib2
23from googleapiclient import errors
24
25from host_controller import invocation_thread
26from host_controller.tradefed import remote_operation
27from host_controller.tfc import command_attempt
28
29
30class HostController(object):
31    """The class that relays commands between a TradeFed host and clusters.
32
33    Attributes:
34        _remote_client: The RemoteClient which runs commands.
35        _tfc_client: The TfcClient from which the command tasks are leased.
36        _hostname: A string, the name of the TradeFed host.
37        _cluster_ids: A list of strings, the cluster IDs for leasing tasks.
38        _invocation_threads: The list of running InvocationThread.
39    """
40
41    def __init__(self, remote_client, tfc_client, hostname, cluster_ids):
42        """Initializes the attributes."""
43        self._remote_client = remote_client
44        self._tfc_client = tfc_client
45        self._hostname = hostname
46        self._cluster_ids = cluster_ids
47        self._invocation_threads = []
48
49    @property
50    def hostname(self):
51        """Returns the name of the host."""
52        return self._hostname
53
54    def _JoinInvocationThreads(self):
55        """Removes terminated threads from _invocation_threads."""
56        alive_threads = []
57        for inv_thread in self._invocation_threads:
58            inv_thread.join(0)
59            if inv_thread.is_alive():
60                alive_threads.append(inv_thread)
61        self._invocation_threads = alive_threads
62
63    def _CreateInvocationThread(self, task):
64        """Creates an invocation thread from a command task.
65
66        Args:
67            task: The CommandTask object.
68
69        Returns:
70            An InvocationThread.
71        """
72        attempt_id = uuid.uuid4()
73        attempt = command_attempt.CommandAttempt(
74                task.task_id, attempt_id,
75                self._hostname, task.device_serials[0])
76        inv_thread = invocation_thread.InvocationThread(
77                self._remote_client, self._tfc_client, attempt,
78                task.command_line.split(), task.device_serials)
79        return inv_thread
80
81    def ListDevices(self):
82        """Lists present devices on the host.
83
84        Returns:
85            A list of DeviceInfo.
86        """
87        devices = self._remote_client.ListDevices()
88        return [dev for dev in devices if not dev.IsStub()]
89
90    def ListAvailableDevices(self):
91        """Lists available devices for command tasks.
92
93        Returns:
94            A list of DeviceInfo.
95        """
96        self._JoinInvocationThreads()
97        allocated_serials = set()
98        for inv_thread in self._invocation_threads:
99            allocated_serials.update(inv_thread.device_serials)
100
101        present_devices = self.ListDevices()
102        return [dev for dev in present_devices if
103                dev.IsAvailable() and
104                dev.device_serial not in allocated_serials]
105
106    def LeaseCommandTasks(self):
107        """Leases command tasks and creates threads to execute them.
108
109        Returns:
110            A list of CommandTask. The leased command tasks.
111        """
112        available_devices = self.ListAvailableDevices()
113        if not available_devices:
114            return []
115
116        tasks = self._tfc_client.LeaseHostTasks(
117                self._cluster_ids[0], self._cluster_ids[1:],
118                self._hostname, available_devices)
119        for task in tasks:
120            inv_thread = self._CreateInvocationThread(task)
121            inv_thread.daemon = True
122            inv_thread.start()
123            self._invocation_threads.append(inv_thread)
124        return tasks
125
126    def Run(self, poll_interval):
127        """Starts polling TFC for tasks.
128
129        Args:
130            poll_interval: The poll interval in seconds.
131        """
132        while True:
133            try:
134                self.LeaseCommandTasks()
135            except (socket.error,
136                    remote_operation.RemoteOperationException,
137                    httplib2.HttpLib2Error,
138                    errors.HttpError) as e:
139                logging.exception(e)
140            time.sleep(poll_interval)
141