1#!/usr/bin/env python3
2#
3#   Copyright 2016- The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from concurrent.futures import ThreadPoolExecutor
18import queue
19import re
20import threading
21import time
22
23from acts import logger
24from acts.controllers.sl4a_lib import rpc_client
25
26
27class EventDispatcherError(Exception):
28    """The base class for all EventDispatcher exceptions."""
29
30
31class IllegalStateError(EventDispatcherError):
32    """Raise when user tries to put event_dispatcher into an illegal state."""
33
34
35class DuplicateError(EventDispatcherError):
36    """Raise when two event handlers have been assigned to an event name."""
37
38
39class EventDispatcher:
40    """A class for managing the events for an SL4A Session.
41
42    Attributes:
43        _serial: The serial of the device.
44        _rpc_client: The rpc client for that session.
45        _started: A bool that holds whether or not the event dispatcher is
46                  running.
47        _executor: The thread pool executor for running event handlers and
48                   polling.
49        _event_dict: A dictionary of str eventName = Queue<Event> eventQueue
50        _handlers: A dictionary of str eventName => (lambda, args) handler
51        _lock: A lock that prevents multiple reads/writes to the event queues.
52        log: The EventDispatcher's logger.
53    """
54
55    DEFAULT_TIMEOUT = 60
56
57    def __init__(self, serial, rpc_client):
58        self._serial = serial
59        self._rpc_client = rpc_client
60        self._started = False
61        self._executor = None
62        self._event_dict = {}
63        self._handlers = {}
64        self._lock = threading.RLock()
65
66        def _log_formatter(message):
67            """Defines the formatting used in the logger."""
68            return '[E Dispatcher|%s|%s] %s' % (self._serial,
69                                                self._rpc_client.uid, message)
70
71        self.log = logger.create_logger(_log_formatter)
72
73    def poll_events(self):
74        """Continuously polls all types of events from sl4a.
75
76        Events are sorted by name and store in separate queues.
77        If there are registered handlers, the handlers will be called with
78        corresponding event immediately upon event discovery, and the event
79        won't be stored. If exceptions occur, stop the dispatcher and return
80        """
81        while self._started:
82            try:
83                # 60000 in ms, timeout in second
84                event_obj = self._rpc_client.eventWait(60000, timeout=120)
85            except rpc_client.Sl4aConnectionError as e:
86                if self._rpc_client.is_alive:
87                    self.log.warning('Closing due to closed session.')
88                    break
89                else:
90                    self.log.warning('Closing due to error: %s.' % e)
91                    self.close()
92                    raise e
93            if not event_obj:
94                continue
95            elif 'name' not in event_obj:
96                self.log.error('Received Malformed event {}'.format(event_obj))
97                continue
98            else:
99                event_name = event_obj['name']
100            # if handler registered, process event
101            if event_name == 'EventDispatcherShutdown':
102                self.log.debug('Received shutdown signal.')
103                # closeSl4aSession has been called, which closes the event
104                # dispatcher. Stop execution on this polling thread.
105                return
106            if event_name in self._handlers:
107                self.log.debug(
108                    'Using handler %s for event: %r' %
109                    (self._handlers[event_name].__name__, event_obj))
110                self.handle_subscribed_event(event_obj, event_name)
111            else:
112                self.log.debug('Queuing event: %r' % event_obj)
113                self._lock.acquire()
114                if event_name in self._event_dict:  # otherwise, cache event
115                    self._event_dict[event_name].put(event_obj)
116                else:
117                    q = queue.Queue()
118                    q.put(event_obj)
119                    self._event_dict[event_name] = q
120                self._lock.release()
121
122    def register_handler(self, handler, event_name, args):
123        """Registers an event handler.
124
125        One type of event can only have one event handler associated with it.
126
127        Args:
128            handler: The event handler function to be registered.
129            event_name: Name of the event the handler is for.
130            args: User arguments to be passed to the handler when it's called.
131
132        Raises:
133            IllegalStateError: Raised if attempts to register a handler after
134                the dispatcher starts running.
135            DuplicateError: Raised if attempts to register more than one
136                handler for one type of event.
137        """
138        if self._started:
139            raise IllegalStateError('Cannot register service after polling is '
140                                    'started.')
141        self._lock.acquire()
142        try:
143            if event_name in self._handlers:
144                raise DuplicateError(
145                    'A handler for {} already exists'.format(event_name))
146            self._handlers[event_name] = (handler, args)
147        finally:
148            self._lock.release()
149
150    def start(self):
151        """Starts the event dispatcher.
152
153        Initiates executor and start polling events.
154
155        Raises:
156            IllegalStateError: Can't start a dispatcher again when it's already
157                running.
158        """
159        if not self._started:
160            self._started = True
161            self._executor = ThreadPoolExecutor(max_workers=32)
162            self._executor.submit(self.poll_events)
163        else:
164            raise IllegalStateError("Dispatcher is already started.")
165
166    def close(self):
167        """Clean up and release resources.
168
169        This function should only be called after a
170        rpc_client.closeSl4aSession() call.
171        """
172        if not self._started:
173            return
174        self._started = False
175        self._executor.shutdown(wait=True)
176        self.clear_all_events()
177
178    def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT):
179        """Pop an event from its queue.
180
181        Return and remove the oldest entry of an event.
182        Block until an event of specified name is available or
183        times out if timeout is set.
184
185        Args:
186            event_name: Name of the event to be popped.
187            timeout: Number of seconds to wait when event is not present.
188                Never times out if None.
189
190        Returns:
191            event: The oldest entry of the specified event. None if timed out.
192
193        Raises:
194            IllegalStateError: Raised if pop is called before the dispatcher
195                starts polling.
196        """
197        if not self._started:
198            raise IllegalStateError(
199                'Dispatcher needs to be started before popping.')
200
201        e_queue = self.get_event_q(event_name)
202
203        if not e_queue:
204            raise IllegalStateError(
205                'Failed to get an event queue for {}'.format(event_name))
206
207        try:
208            # Block for timeout
209            if timeout:
210                return e_queue.get(True, timeout)
211            # Non-blocking poll for event
212            elif timeout == 0:
213                return e_queue.get(False)
214            else:
215                # Block forever on event wait
216                return e_queue.get(True)
217        except queue.Empty:
218            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
219                timeout, event_name))
220
221    def wait_for_event(self,
222                       event_name,
223                       predicate,
224                       timeout=DEFAULT_TIMEOUT,
225                       *args,
226                       **kwargs):
227        """Wait for an event that satisfies a predicate to appear.
228
229        Continuously pop events of a particular name and check against the
230        predicate until an event that satisfies the predicate is popped or
231        timed out. Note this will remove all the events of the same name that
232        do not satisfy the predicate in the process.
233
234        Args:
235            event_name: Name of the event to be popped.
236            predicate: A function that takes an event and returns True if the
237                predicate is satisfied, False otherwise.
238            timeout: Number of seconds to wait.
239            *args: Optional positional args passed to predicate().
240            **kwargs: Optional keyword args passed to predicate().
241                consume_ignored_events: Whether or not to consume events while
242                    searching for the desired event. Defaults to True if unset.
243
244        Returns:
245            The event that satisfies the predicate.
246
247        Raises:
248            queue.Empty: Raised if no event that satisfies the predicate was
249                found before time out.
250        """
251        deadline = time.time() + timeout
252        ignored_events = []
253        consume_events = kwargs.pop('consume_ignored_events', True)
254        while True:
255            event = None
256            try:
257                event = self.pop_event(event_name, 1)
258                if consume_events:
259                    self.log.debug('Consuming event: %r' % event)
260                else:
261                    self.log.debug('Peeking at event: %r' % event)
262                    ignored_events.append(event)
263            except queue.Empty:
264                pass
265
266            if event and predicate(event, *args, **kwargs):
267                for ignored_event in ignored_events:
268                    self.get_event_q(event_name).put(ignored_event)
269                self.log.debug('Matched event: %r with %s' %
270                               (event, predicate.__name__))
271                return event
272
273            if time.time() > deadline:
274                for ignored_event in ignored_events:
275                    self.get_event_q(event_name).put(ignored_event)
276                raise queue.Empty(
277                    'Timeout after {}s waiting for event: {}'.format(
278                        timeout, event_name))
279
280    def pop_events(self, regex_pattern, timeout, freq=1):
281        """Pop events whose names match a regex pattern.
282
283        If such event(s) exist, pop one event from each event queue that
284        satisfies the condition. Otherwise, wait for an event that satisfies
285        the condition to occur, with timeout.
286
287        Results are sorted by timestamp in ascending order.
288
289        Args:
290            regex_pattern: The regular expression pattern that an event name
291                should match in order to be popped.
292            timeout: Number of seconds to wait for events in case no event
293                matching the condition exits when the function is called.
294
295        Returns:
296            results: Pop events whose names match a regex pattern.
297                Empty if none exist and the wait timed out.
298
299        Raises:
300            IllegalStateError: Raised if pop is called before the dispatcher
301                starts polling.
302            queue.Empty: Raised if no event was found before time out.
303        """
304        if not self._started:
305            raise IllegalStateError(
306                "Dispatcher needs to be started before popping.")
307        deadline = time.time() + timeout
308        while True:
309            # TODO: fix the sleep loop
310            results = self._match_and_pop(regex_pattern)
311            if len(results) != 0 or time.time() > deadline:
312                break
313            time.sleep(freq)
314        if len(results) == 0:
315            raise queue.Empty('Timeout after {}s waiting for event: {}'.format(
316                timeout, regex_pattern))
317
318        return sorted(results, key=lambda event: event['time'])
319
320    def _match_and_pop(self, regex_pattern):
321        """Pop one event from each of the event queues whose names
322        match (in a sense of regular expression) regex_pattern.
323        """
324        results = []
325        self._lock.acquire()
326        for name in self._event_dict.keys():
327            if re.match(regex_pattern, name):
328                q = self._event_dict[name]
329                if q:
330                    try:
331                        results.append(q.get(False))
332                    except queue.Empty:
333                        pass
334        self._lock.release()
335        return results
336
337    def get_event_q(self, event_name):
338        """Obtain the queue storing events of the specified name.
339
340        If no event of this name has been polled, wait for one to.
341
342        Returns: A queue storing all the events of the specified name.
343        """
344        self._lock.acquire()
345        if (event_name not in self._event_dict
346                or self._event_dict[event_name] is None):
347            self._event_dict[event_name] = queue.Queue()
348        self._lock.release()
349
350        event_queue = self._event_dict[event_name]
351        return event_queue
352
353    def handle_subscribed_event(self, event_obj, event_name):
354        """Execute the registered handler of an event.
355
356        Retrieve the handler and its arguments, and execute the handler in a
357            new thread.
358
359        Args:
360            event_obj: Json object of the event.
361            event_name: Name of the event to call handler for.
362        """
363        handler, args = self._handlers[event_name]
364        self._executor.submit(handler, event_obj, *args)
365
366    def _handle(self, event_handler, event_name, user_args, event_timeout,
367                cond, cond_timeout):
368        """Pop an event of specified type and calls its handler on it. If
369        condition is not None, block until condition is met or timeout.
370        """
371        if cond:
372            cond.wait(cond_timeout)
373        event = self.pop_event(event_name, event_timeout)
374        return event_handler(event, *user_args)
375
376    def handle_event(self,
377                     event_handler,
378                     event_name,
379                     user_args,
380                     event_timeout=None,
381                     cond=None,
382                     cond_timeout=None):
383        """Handle events that don't have registered handlers
384
385        In a new thread, poll one event of specified type from its queue and
386        execute its handler. If no such event exists, the thread waits until
387        one appears.
388
389        Args:
390            event_handler: Handler for the event, which should take at least
391                one argument - the event json object.
392            event_name: Name of the event to be handled.
393            user_args: User arguments for the handler; to be passed in after
394                the event json.
395            event_timeout: Number of seconds to wait for the event to come.
396            cond: A condition to wait on before executing the handler. Should
397                be a threading.Event object.
398            cond_timeout: Number of seconds to wait before the condition times
399                out. Never times out if None.
400
401        Returns:
402            worker: A concurrent.Future object associated with the handler.
403                If blocking call worker.result() is triggered, the handler
404                needs to return something to unblock.
405        """
406        worker = self._executor.submit(self._handle, event_handler, event_name,
407                                       user_args, event_timeout, cond,
408                                       cond_timeout)
409        return worker
410
411    def pop_all(self, event_name):
412        """Return and remove all stored events of a specified name.
413
414        Pops all events from their queue. May miss the latest ones.
415        If no event is available, return immediately.
416
417        Args:
418            event_name: Name of the events to be popped.
419
420        Returns:
421           results: List of the desired events.
422
423        Raises:
424            IllegalStateError: Raised if pop is called before the dispatcher
425                starts polling.
426        """
427        if not self._started:
428            raise IllegalStateError(("Dispatcher needs to be started before "
429                                     "popping."))
430        results = []
431        try:
432            self._lock.acquire()
433            while True:
434                e = self._event_dict[event_name].get(block=False)
435                results.append(e)
436        except (queue.Empty, KeyError):
437            return results
438        finally:
439            self._lock.release()
440
441    def clear_events(self, event_name):
442        """Clear all events of a particular name.
443
444        Args:
445            event_name: Name of the events to be popped.
446        """
447        self._lock.acquire()
448        try:
449            q = self.get_event_q(event_name)
450            q.queue.clear()
451        except queue.Empty:
452            return
453        finally:
454            self._lock.release()
455
456    def clear_all_events(self):
457        """Clear all event queues and their cached events."""
458        self._lock.acquire()
459        self._event_dict.clear()
460        self._lock.release()
461