1# Copyright 2018, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Python client library to write logs to Clearcut. 16 17This class is intended to be general-purpose, usable for any Clearcut LogSource. 18 19 Typical usage example: 20 21 client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE) 22 client.log(my_event) 23 client.flush_events() 24""" 25 26import logging 27import threading 28import time 29 30from urllib.request import urlopen 31from urllib.request import Request 32from urllib.request import HTTPError 33from urllib.request import URLError 34 35from proto import clientanalytics_pb2 36 37_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log' 38_DEFAULT_BUFFER_SIZE = 100 # Maximum number of events to be buffered. 39_DEFAULT_FLUSH_INTERVAL_SEC = 60 # 1 Minute. 40_BUFFER_FLUSH_RATIO = 0.5 # Flush buffer when we exceed this ratio. 41_CLIENT_TYPE = 6 42 43class Clearcut: 44 """Handles logging to Clearcut.""" 45 46 def __init__(self, log_source, url=None, buffer_size=None, 47 flush_interval_sec=None): 48 """Initializes a Clearcut client. 49 50 Args: 51 log_source: The log source. 52 url: The Clearcut url to connect to. 53 buffer_size: The size of the client buffer in number of events. 54 flush_interval_sec: The flush interval in seconds. 55 """ 56 self._clearcut_url = url if url else _CLEARCUT_PROD_URL 57 self._log_source = log_source 58 self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE 59 self._pending_events = [] 60 if flush_interval_sec: 61 self._flush_interval_sec = flush_interval_sec 62 else: 63 self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC 64 self._pending_events_lock = threading.Lock() 65 self._scheduled_flush_thread = None 66 self._scheduled_flush_time = float('inf') 67 self._min_next_request_time = 0 68 69 def log(self, event): 70 """Logs events to Clearcut. 71 72 Logging an event can potentially trigger a flush of queued events. 73 Flushing is triggered when the buffer is more than half full or 74 after the flush interval has passed. 75 76 Args: 77 event: A LogEvent to send to Clearcut. 78 """ 79 self._append_events_to_buffer([event]) 80 81 def flush_events(self): 82 """ Cancel whatever is scheduled and schedule an immediate flush.""" 83 if self._scheduled_flush_thread: 84 self._scheduled_flush_thread.cancel() 85 self._min_next_request_time = 0 86 self._schedule_flush_thread(0) 87 88 def _serialize_events_to_proto(self, events): 89 log_request = clientanalytics_pb2.LogRequest() 90 log_request.request_time_ms = int(time.time() * 1000) 91 # pylint: disable=no-member 92 log_request.client_info.client_type = _CLIENT_TYPE 93 log_request.log_source = self._log_source 94 log_request.log_event.extend(events) 95 return log_request 96 97 def _append_events_to_buffer(self, events, retry=False): 98 with self._pending_events_lock: 99 self._pending_events.extend(events) 100 if len(self._pending_events) > self._buffer_size: 101 index = len(self._pending_events) - self._buffer_size 102 del self._pending_events[:index] 103 self._schedule_flush(retry) 104 105 def _schedule_flush(self, retry): 106 if (not retry 107 and len(self._pending_events) >= int(self._buffer_size * 108 _BUFFER_FLUSH_RATIO) 109 and self._scheduled_flush_time > time.time()): 110 # Cancel whatever is scheduled and schedule an immediate flush. 111 if self._scheduled_flush_thread: 112 self._scheduled_flush_thread.cancel() 113 self._schedule_flush_thread(0) 114 elif self._pending_events and not self._scheduled_flush_thread: 115 # Schedule a flush to run later. 116 self._schedule_flush_thread(self._flush_interval_sec) 117 118 def _schedule_flush_thread(self, time_from_now): 119 min_wait_sec = self._min_next_request_time - time.time() 120 if min_wait_sec > time_from_now: 121 time_from_now = min_wait_sec 122 logging.debug('Scheduling thread to run in %f seconds', time_from_now) 123 self._scheduled_flush_thread = threading.Timer( 124 time_from_now, self._flush) 125 self._scheduled_flush_time = time.time() + time_from_now 126 self._scheduled_flush_thread.start() 127 128 def _flush(self): 129 """Flush buffered events to Clearcut. 130 131 If the sent request is unsuccessful, the events will be appended to 132 buffer and rescheduled for next flush. 133 """ 134 with self._pending_events_lock: 135 self._scheduled_flush_time = float('inf') 136 self._scheduled_flush_thread = None 137 events = self._pending_events 138 self._pending_events = [] 139 if self._min_next_request_time > time.time(): 140 self._append_events_to_buffer(events, retry=True) 141 return 142 log_request = self._serialize_events_to_proto(events) 143 self._send_to_clearcut(log_request.SerializeToString()) 144 145 #pylint: disable=broad-except 146 def _send_to_clearcut(self, data): 147 """Sends a POST request with data as the body. 148 149 Args: 150 data: The serialized proto to send to Clearcut. 151 """ 152 request = Request(self._clearcut_url, data=data) 153 try: 154 response = urlopen(request) 155 msg = response.read() 156 logging.debug('LogRequest successfully sent to Clearcut.') 157 log_response = clientanalytics_pb2.LogResponse() 158 log_response.ParseFromString(msg) 159 # pylint: disable=no-member 160 # Throttle based on next_request_wait_millis value. 161 self._min_next_request_time = (log_response.next_request_wait_millis 162 / 1000 + time.time()) 163 logging.debug('LogResponse: %s', log_response) 164 except HTTPError as e: 165 logging.debug('Failed to push events to Clearcut. Error code: %d', 166 e.code) 167 except URLError: 168 logging.debug('Failed to push events to Clearcut.') 169 except Exception as e: 170 logging.debug(e) 171