1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Python client library to write logs to Clearcut.
16
17This class is intended to be general-purpose, usable for any Clearcut LogSource.
18
19    Typical usage example:
20
21    client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE)
22    client.log(my_event)
23    client.flush_events()
24"""
25
26import logging
27import threading
28import time
29
30from urllib.request import urlopen
31from urllib.request import Request
32from urllib.request import HTTPError
33from urllib.request import URLError
34
35from proto import clientanalytics_pb2
36
37_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log'
38_DEFAULT_BUFFER_SIZE = 100  # Maximum number of events to be buffered.
39_DEFAULT_FLUSH_INTERVAL_SEC = 60  # 1 Minute.
40_BUFFER_FLUSH_RATIO = 0.5  # Flush buffer when we exceed this ratio.
41_CLIENT_TYPE = 6
42
43class Clearcut:
44    """Handles logging to Clearcut."""
45
46    def __init__(self, log_source, url=None, buffer_size=None,
47                 flush_interval_sec=None):
48        """Initializes a Clearcut client.
49
50        Args:
51            log_source: The log source.
52            url: The Clearcut url to connect to.
53            buffer_size: The size of the client buffer in number of events.
54            flush_interval_sec: The flush interval in seconds.
55        """
56        self._clearcut_url = url if url else _CLEARCUT_PROD_URL
57        self._log_source = log_source
58        self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE
59        self._pending_events = []
60        if flush_interval_sec:
61            self._flush_interval_sec = flush_interval_sec
62        else:
63            self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC
64        self._pending_events_lock = threading.Lock()
65        self._scheduled_flush_thread = None
66        self._scheduled_flush_time = float('inf')
67        self._min_next_request_time = 0
68
69    def log(self, event):
70        """Logs events to Clearcut.
71
72        Logging an event can potentially trigger a flush of queued events.
73        Flushing is triggered when the buffer is more than half full or
74        after the flush interval has passed.
75
76        Args:
77          event: A LogEvent to send to Clearcut.
78        """
79        self._append_events_to_buffer([event])
80
81    def flush_events(self):
82        """ Cancel whatever is scheduled and schedule an immediate flush."""
83        if self._scheduled_flush_thread:
84            self._scheduled_flush_thread.cancel()
85        self._min_next_request_time = 0
86        self._schedule_flush_thread(0)
87
88    def _serialize_events_to_proto(self, events):
89        log_request = clientanalytics_pb2.LogRequest()
90        log_request.request_time_ms = int(time.time() * 1000)
91        # pylint: disable=no-member
92        log_request.client_info.client_type = _CLIENT_TYPE
93        log_request.log_source = self._log_source
94        log_request.log_event.extend(events)
95        return log_request
96
97    def _append_events_to_buffer(self, events, retry=False):
98        with self._pending_events_lock:
99            self._pending_events.extend(events)
100            if len(self._pending_events) > self._buffer_size:
101                index = len(self._pending_events) - self._buffer_size
102                del self._pending_events[:index]
103            self._schedule_flush(retry)
104
105    def _schedule_flush(self, retry):
106        if (not retry
107                and len(self._pending_events) >= int(self._buffer_size *
108                                                     _BUFFER_FLUSH_RATIO)
109                and self._scheduled_flush_time > time.time()):
110            # Cancel whatever is scheduled and schedule an immediate flush.
111            if self._scheduled_flush_thread:
112                self._scheduled_flush_thread.cancel()
113            self._schedule_flush_thread(0)
114        elif self._pending_events and not self._scheduled_flush_thread:
115            # Schedule a flush to run later.
116            self._schedule_flush_thread(self._flush_interval_sec)
117
118    def _schedule_flush_thread(self, time_from_now):
119        min_wait_sec = self._min_next_request_time - time.time()
120        if min_wait_sec > time_from_now:
121            time_from_now = min_wait_sec
122        logging.debug('Scheduling thread to run in %f seconds', time_from_now)
123        self._scheduled_flush_thread = threading.Timer(
124            time_from_now, self._flush)
125        self._scheduled_flush_time = time.time() + time_from_now
126        self._scheduled_flush_thread.start()
127
128    def _flush(self):
129        """Flush buffered events to Clearcut.
130
131        If the sent request is unsuccessful, the events will be appended to
132        buffer and rescheduled for next flush.
133        """
134        with self._pending_events_lock:
135            self._scheduled_flush_time = float('inf')
136            self._scheduled_flush_thread = None
137            events = self._pending_events
138            self._pending_events = []
139        if self._min_next_request_time > time.time():
140            self._append_events_to_buffer(events, retry=True)
141            return
142        log_request = self._serialize_events_to_proto(events)
143        self._send_to_clearcut(log_request.SerializeToString())
144
145    #pylint: disable=broad-except
146    def _send_to_clearcut(self, data):
147        """Sends a POST request with data as the body.
148
149        Args:
150            data: The serialized proto to send to Clearcut.
151        """
152        request = Request(self._clearcut_url, data=data)
153        try:
154            response = urlopen(request)
155            msg = response.read()
156            logging.debug('LogRequest successfully sent to Clearcut.')
157            log_response = clientanalytics_pb2.LogResponse()
158            log_response.ParseFromString(msg)
159            # pylint: disable=no-member
160            # Throttle based on next_request_wait_millis value.
161            self._min_next_request_time = (log_response.next_request_wait_millis
162                                           / 1000 + time.time())
163            logging.debug('LogResponse: %s', log_response)
164        except HTTPError as e:
165            logging.debug('Failed to push events to Clearcut. Error code: %d',
166                          e.code)
167        except URLError:
168            logging.debug('Failed to push events to Clearcut.')
169        except Exception as e:
170            logging.debug(e)
171