1#!/usr/bin/env python3 2# 3# Copyright 2016- The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from concurrent.futures import ThreadPoolExecutor 18import queue 19import re 20import threading 21import time 22 23from acts import logger 24from acts.controllers.sl4a_lib import rpc_client 25 26 27class EventDispatcherError(Exception): 28 """The base class for all EventDispatcher exceptions.""" 29 30 31class IllegalStateError(EventDispatcherError): 32 """Raise when user tries to put event_dispatcher into an illegal state.""" 33 34 35class DuplicateError(EventDispatcherError): 36 """Raise when two event handlers have been assigned to an event name.""" 37 38 39class EventDispatcher: 40 """A class for managing the events for an SL4A Session. 41 42 Attributes: 43 _serial: The serial of the device. 44 _rpc_client: The rpc client for that session. 45 _started: A bool that holds whether or not the event dispatcher is 46 running. 47 _executor: The thread pool executor for running event handlers and 48 polling. 49 _event_dict: A dictionary of str eventName = Queue<Event> eventQueue 50 _handlers: A dictionary of str eventName => (lambda, args) handler 51 _lock: A lock that prevents multiple reads/writes to the event queues. 52 log: The EventDispatcher's logger. 53 """ 54 55 DEFAULT_TIMEOUT = 60 56 57 def __init__(self, serial, rpc_client): 58 self._serial = serial 59 self._rpc_client = rpc_client 60 self._started = False 61 self._executor = None 62 self._event_dict = {} 63 self._handlers = {} 64 self._lock = threading.RLock() 65 66 def _log_formatter(message): 67 """Defines the formatting used in the logger.""" 68 return '[E Dispatcher|%s|%s] %s' % (self._serial, 69 self._rpc_client.uid, message) 70 71 self.log = logger.create_logger(_log_formatter) 72 73 def poll_events(self): 74 """Continuously polls all types of events from sl4a. 75 76 Events are sorted by name and store in separate queues. 77 If there are registered handlers, the handlers will be called with 78 corresponding event immediately upon event discovery, and the event 79 won't be stored. If exceptions occur, stop the dispatcher and return 80 """ 81 while self._started: 82 try: 83 # 60000 in ms, timeout in second 84 event_obj = self._rpc_client.eventWait(60000, timeout=120) 85 except rpc_client.Sl4aConnectionError as e: 86 if self._rpc_client.is_alive: 87 self.log.warning('Closing due to closed session.') 88 break 89 else: 90 self.log.warning('Closing due to error: %s.' % e) 91 self.close() 92 raise e 93 if not event_obj: 94 continue 95 elif 'name' not in event_obj: 96 self.log.error('Received Malformed event {}'.format(event_obj)) 97 continue 98 else: 99 event_name = event_obj['name'] 100 # if handler registered, process event 101 if event_name == 'EventDispatcherShutdown': 102 self.log.debug('Received shutdown signal.') 103 # closeSl4aSession has been called, which closes the event 104 # dispatcher. Stop execution on this polling thread. 105 return 106 if event_name in self._handlers: 107 self.log.debug( 108 'Using handler %s for event: %r' % 109 (self._handlers[event_name].__name__, event_obj)) 110 self.handle_subscribed_event(event_obj, event_name) 111 else: 112 self.log.debug('Queuing event: %r' % event_obj) 113 self._lock.acquire() 114 if event_name in self._event_dict: # otherwise, cache event 115 self._event_dict[event_name].put(event_obj) 116 else: 117 q = queue.Queue() 118 q.put(event_obj) 119 self._event_dict[event_name] = q 120 self._lock.release() 121 122 def register_handler(self, handler, event_name, args): 123 """Registers an event handler. 124 125 One type of event can only have one event handler associated with it. 126 127 Args: 128 handler: The event handler function to be registered. 129 event_name: Name of the event the handler is for. 130 args: User arguments to be passed to the handler when it's called. 131 132 Raises: 133 IllegalStateError: Raised if attempts to register a handler after 134 the dispatcher starts running. 135 DuplicateError: Raised if attempts to register more than one 136 handler for one type of event. 137 """ 138 if self._started: 139 raise IllegalStateError('Cannot register service after polling is ' 140 'started.') 141 self._lock.acquire() 142 try: 143 if event_name in self._handlers: 144 raise DuplicateError( 145 'A handler for {} already exists'.format(event_name)) 146 self._handlers[event_name] = (handler, args) 147 finally: 148 self._lock.release() 149 150 def start(self): 151 """Starts the event dispatcher. 152 153 Initiates executor and start polling events. 154 155 Raises: 156 IllegalStateError: Can't start a dispatcher again when it's already 157 running. 158 """ 159 if not self._started: 160 self._started = True 161 self._executor = ThreadPoolExecutor(max_workers=32) 162 self._executor.submit(self.poll_events) 163 else: 164 raise IllegalStateError("Dispatcher is already started.") 165 166 def close(self): 167 """Clean up and release resources. 168 169 This function should only be called after a 170 rpc_client.closeSl4aSession() call. 171 """ 172 if not self._started: 173 return 174 self._started = False 175 self._executor.shutdown(wait=True) 176 self.clear_all_events() 177 178 def pop_event(self, event_name, timeout=DEFAULT_TIMEOUT): 179 """Pop an event from its queue. 180 181 Return and remove the oldest entry of an event. 182 Block until an event of specified name is available or 183 times out if timeout is set. 184 185 Args: 186 event_name: Name of the event to be popped. 187 timeout: Number of seconds to wait when event is not present. 188 Never times out if None. 189 190 Returns: 191 event: The oldest entry of the specified event. None if timed out. 192 193 Raises: 194 IllegalStateError: Raised if pop is called before the dispatcher 195 starts polling. 196 """ 197 if not self._started: 198 raise IllegalStateError( 199 'Dispatcher needs to be started before popping.') 200 201 e_queue = self.get_event_q(event_name) 202 203 if not e_queue: 204 raise IllegalStateError( 205 'Failed to get an event queue for {}'.format(event_name)) 206 207 try: 208 # Block for timeout 209 if timeout: 210 return e_queue.get(True, timeout) 211 # Non-blocking poll for event 212 elif timeout == 0: 213 return e_queue.get(False) 214 else: 215 # Block forever on event wait 216 return e_queue.get(True) 217 except queue.Empty: 218 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 219 timeout, event_name)) 220 221 def wait_for_event(self, 222 event_name, 223 predicate, 224 timeout=DEFAULT_TIMEOUT, 225 *args, 226 **kwargs): 227 """Wait for an event that satisfies a predicate to appear. 228 229 Continuously pop events of a particular name and check against the 230 predicate until an event that satisfies the predicate is popped or 231 timed out. Note this will remove all the events of the same name that 232 do not satisfy the predicate in the process. 233 234 Args: 235 event_name: Name of the event to be popped. 236 predicate: A function that takes an event and returns True if the 237 predicate is satisfied, False otherwise. 238 timeout: Number of seconds to wait. 239 *args: Optional positional args passed to predicate(). 240 **kwargs: Optional keyword args passed to predicate(). 241 consume_ignored_events: Whether or not to consume events while 242 searching for the desired event. Defaults to True if unset. 243 244 Returns: 245 The event that satisfies the predicate. 246 247 Raises: 248 queue.Empty: Raised if no event that satisfies the predicate was 249 found before time out. 250 """ 251 deadline = time.time() + timeout 252 ignored_events = [] 253 consume_events = kwargs.pop('consume_ignored_events', True) 254 while True: 255 event = None 256 try: 257 event = self.pop_event(event_name, 1) 258 if consume_events: 259 self.log.debug('Consuming event: %r' % event) 260 else: 261 self.log.debug('Peeking at event: %r' % event) 262 ignored_events.append(event) 263 except queue.Empty: 264 pass 265 266 if event and predicate(event, *args, **kwargs): 267 for ignored_event in ignored_events: 268 self.get_event_q(event_name).put(ignored_event) 269 self.log.debug('Matched event: %r with %s' % 270 (event, predicate.__name__)) 271 return event 272 273 if time.time() > deadline: 274 for ignored_event in ignored_events: 275 self.get_event_q(event_name).put(ignored_event) 276 raise queue.Empty( 277 'Timeout after {}s waiting for event: {}'.format( 278 timeout, event_name)) 279 280 def pop_events(self, regex_pattern, timeout, freq=1): 281 """Pop events whose names match a regex pattern. 282 283 If such event(s) exist, pop one event from each event queue that 284 satisfies the condition. Otherwise, wait for an event that satisfies 285 the condition to occur, with timeout. 286 287 Results are sorted by timestamp in ascending order. 288 289 Args: 290 regex_pattern: The regular expression pattern that an event name 291 should match in order to be popped. 292 timeout: Number of seconds to wait for events in case no event 293 matching the condition exits when the function is called. 294 295 Returns: 296 results: Pop events whose names match a regex pattern. 297 Empty if none exist and the wait timed out. 298 299 Raises: 300 IllegalStateError: Raised if pop is called before the dispatcher 301 starts polling. 302 queue.Empty: Raised if no event was found before time out. 303 """ 304 if not self._started: 305 raise IllegalStateError( 306 "Dispatcher needs to be started before popping.") 307 deadline = time.time() + timeout 308 while True: 309 # TODO: fix the sleep loop 310 results = self._match_and_pop(regex_pattern) 311 if len(results) != 0 or time.time() > deadline: 312 break 313 time.sleep(freq) 314 if len(results) == 0: 315 raise queue.Empty('Timeout after {}s waiting for event: {}'.format( 316 timeout, regex_pattern)) 317 318 return sorted(results, key=lambda event: event['time']) 319 320 def _match_and_pop(self, regex_pattern): 321 """Pop one event from each of the event queues whose names 322 match (in a sense of regular expression) regex_pattern. 323 """ 324 results = [] 325 self._lock.acquire() 326 for name in self._event_dict.keys(): 327 if re.match(regex_pattern, name): 328 q = self._event_dict[name] 329 if q: 330 try: 331 results.append(q.get(False)) 332 except queue.Empty: 333 pass 334 self._lock.release() 335 return results 336 337 def get_event_q(self, event_name): 338 """Obtain the queue storing events of the specified name. 339 340 If no event of this name has been polled, wait for one to. 341 342 Returns: A queue storing all the events of the specified name. 343 """ 344 self._lock.acquire() 345 if (event_name not in self._event_dict 346 or self._event_dict[event_name] is None): 347 self._event_dict[event_name] = queue.Queue() 348 self._lock.release() 349 350 event_queue = self._event_dict[event_name] 351 return event_queue 352 353 def handle_subscribed_event(self, event_obj, event_name): 354 """Execute the registered handler of an event. 355 356 Retrieve the handler and its arguments, and execute the handler in a 357 new thread. 358 359 Args: 360 event_obj: Json object of the event. 361 event_name: Name of the event to call handler for. 362 """ 363 handler, args = self._handlers[event_name] 364 self._executor.submit(handler, event_obj, *args) 365 366 def _handle(self, event_handler, event_name, user_args, event_timeout, 367 cond, cond_timeout): 368 """Pop an event of specified type and calls its handler on it. If 369 condition is not None, block until condition is met or timeout. 370 """ 371 if cond: 372 cond.wait(cond_timeout) 373 event = self.pop_event(event_name, event_timeout) 374 return event_handler(event, *user_args) 375 376 def handle_event(self, 377 event_handler, 378 event_name, 379 user_args, 380 event_timeout=None, 381 cond=None, 382 cond_timeout=None): 383 """Handle events that don't have registered handlers 384 385 In a new thread, poll one event of specified type from its queue and 386 execute its handler. If no such event exists, the thread waits until 387 one appears. 388 389 Args: 390 event_handler: Handler for the event, which should take at least 391 one argument - the event json object. 392 event_name: Name of the event to be handled. 393 user_args: User arguments for the handler; to be passed in after 394 the event json. 395 event_timeout: Number of seconds to wait for the event to come. 396 cond: A condition to wait on before executing the handler. Should 397 be a threading.Event object. 398 cond_timeout: Number of seconds to wait before the condition times 399 out. Never times out if None. 400 401 Returns: 402 worker: A concurrent.Future object associated with the handler. 403 If blocking call worker.result() is triggered, the handler 404 needs to return something to unblock. 405 """ 406 worker = self._executor.submit(self._handle, event_handler, event_name, 407 user_args, event_timeout, cond, 408 cond_timeout) 409 return worker 410 411 def pop_all(self, event_name): 412 """Return and remove all stored events of a specified name. 413 414 Pops all events from their queue. May miss the latest ones. 415 If no event is available, return immediately. 416 417 Args: 418 event_name: Name of the events to be popped. 419 420 Returns: 421 results: List of the desired events. 422 423 Raises: 424 IllegalStateError: Raised if pop is called before the dispatcher 425 starts polling. 426 """ 427 if not self._started: 428 raise IllegalStateError(("Dispatcher needs to be started before " 429 "popping.")) 430 results = [] 431 try: 432 self._lock.acquire() 433 while True: 434 e = self._event_dict[event_name].get(block=False) 435 results.append(e) 436 except (queue.Empty, KeyError): 437 return results 438 finally: 439 self._lock.release() 440 441 def clear_events(self, event_name): 442 """Clear all events of a particular name. 443 444 Args: 445 event_name: Name of the events to be popped. 446 """ 447 self._lock.acquire() 448 try: 449 q = self.get_event_q(event_name) 450 q.queue.clear() 451 except queue.Empty: 452 return 453 finally: 454 self._lock.release() 455 456 def clear_all_events(self): 457 """Clear all event queues and their cached events.""" 458 self._lock.acquire() 459 self._event_dict.clear() 460 self._lock.release() 461