1# Copyright 2018, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Python client library to write logs to Clearcut. 16 17This class is intended to be general-purpose, usable for any Clearcut LogSource. 18 19 Typical usage example: 20 21 client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE) 22 client.log(my_event) 23 client.flush_events() 24""" 25 26import logging 27import ssl 28import threading 29import time 30 31try: 32 from urllib.request import urlopen 33 from urllib.request import Request 34 from urllib.request import HTTPError 35 from urllib.request import URLError 36except ImportError: 37 # for compatibility of asuite_metrics_lib_tests and asuite_cc_lib_tests. 38 from urllib2 import Request 39 from urllib2 import urlopen 40 from urllib2 import HTTPError 41 from urllib2 import URLError 42 43from atest.proto import clientanalytics_pb2 44 45_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log' 46_DEFAULT_BUFFER_SIZE = 100 # Maximum number of events to be buffered. 47_DEFAULT_FLUSH_INTERVAL_SEC = 60 # 1 Minute. 48_BUFFER_FLUSH_RATIO = 0.5 # Flush buffer when we exceed this ratio. 49_CLIENT_TYPE = 6 50 51 52class Clearcut: 53 """Handles logging to Clearcut.""" 54 55 def __init__( 56 self, log_source, url=None, buffer_size=None, flush_interval_sec=None 57 ): 58 """Initializes a Clearcut client. 59 60 Args: 61 log_source: The log source. 62 url: The Clearcut url to connect to. 63 buffer_size: The size of the client buffer in number of events. 64 flush_interval_sec: The flush interval in seconds. 65 """ 66 self._clearcut_url = url if url else _CLEARCUT_PROD_URL 67 self._log_source = log_source 68 self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE 69 self._pending_events = [] 70 if flush_interval_sec: 71 self._flush_interval_sec = flush_interval_sec 72 else: 73 self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC 74 self._pending_events_lock = threading.Lock() 75 self._scheduled_flush_thread = None 76 self._scheduled_flush_time = float('inf') 77 self._min_next_request_time = 0 78 79 def log(self, event): 80 """Logs events to Clearcut. 81 82 Logging an event can potentially trigger a flush of queued events. 83 Flushing is triggered when the buffer is more than half full or 84 after the flush interval has passed. 85 86 Args: 87 event: A LogEvent to send to Clearcut. 88 """ 89 self._append_events_to_buffer([event]) 90 91 def flush_events(self): 92 """Cancel whatever is scheduled and schedule an immediate flush.""" 93 if self._scheduled_flush_thread: 94 self._scheduled_flush_thread.cancel() 95 self._min_next_request_time = 0 96 self._schedule_flush_thread(0) 97 98 def _serialize_events_to_proto(self, events): 99 log_request = clientanalytics_pb2.LogRequest() 100 log_request.request_time_ms = int(time.time() * 1000) 101 # pylint: disable=no-member 102 log_request.client_info.client_type = _CLIENT_TYPE 103 log_request.log_source = self._log_source 104 log_request.log_event.extend(events) 105 return log_request 106 107 def _append_events_to_buffer(self, events, retry=False): 108 with self._pending_events_lock: 109 self._pending_events.extend(events) 110 if len(self._pending_events) > self._buffer_size: 111 index = len(self._pending_events) - self._buffer_size 112 del self._pending_events[:index] 113 self._schedule_flush(retry) 114 115 def _schedule_flush(self, retry): 116 if ( 117 not retry 118 and len(self._pending_events) 119 >= int(self._buffer_size * _BUFFER_FLUSH_RATIO) 120 and self._scheduled_flush_time > time.time() 121 ): 122 # Cancel whatever is scheduled and schedule an immediate flush. 123 if self._scheduled_flush_thread: 124 self._scheduled_flush_thread.cancel() 125 self._schedule_flush_thread(0) 126 elif self._pending_events and not self._scheduled_flush_thread: 127 # Schedule a flush to run later. 128 self._schedule_flush_thread(self._flush_interval_sec) 129 130 def _schedule_flush_thread(self, time_from_now): 131 min_wait_sec = self._min_next_request_time - time.time() 132 if min_wait_sec > time_from_now: 133 time_from_now = min_wait_sec 134 logging.debug('Scheduling thread to run in %f seconds', time_from_now) 135 self._scheduled_flush_thread = threading.Timer(time_from_now, self._flush) 136 self._scheduled_flush_time = time.time() + time_from_now 137 self._scheduled_flush_thread.start() 138 139 def _flush(self): 140 """Flush buffered events to Clearcut. 141 142 If the sent request is unsuccessful, the events will be appended to 143 buffer and rescheduled for next flush. 144 """ 145 with self._pending_events_lock: 146 self._scheduled_flush_time = float('inf') 147 self._scheduled_flush_thread = None 148 events = self._pending_events 149 self._pending_events = [] 150 if self._min_next_request_time > time.time(): 151 self._append_events_to_buffer(events, retry=True) 152 return 153 log_request = self._serialize_events_to_proto(events) 154 self._send_to_clearcut(log_request.SerializeToString()) 155 156 # pylint: disable=broad-except 157 # pylint: disable=protected-access 158 def _send_to_clearcut(self, data): 159 """Sends a POST request with data as the body. 160 161 Args: 162 data: The serialized proto to send to Clearcut. 163 """ 164 request = Request(self._clearcut_url, data=data) 165 try: 166 ssl._create_default_https_context = ssl._create_unverified_context 167 response = urlopen(request) 168 msg = response.read() 169 logging.debug('LogRequest successfully sent to Clearcut.') 170 log_response = clientanalytics_pb2.LogResponse() 171 log_response.ParseFromString(msg) 172 # pylint: disable=no-member 173 # Throttle based on next_request_wait_millis value. 174 self._min_next_request_time = ( 175 log_response.next_request_wait_millis / 1000 + time.time() 176 ) 177 logging.debug('LogResponse: %s', log_response) 178 except HTTPError as e: 179 logging.warning( 180 'Failed to push events to Clearcut. Error code: %d', e.code 181 ) 182 except URLError as e: 183 logging.warning('Failed to push events to Clearcut. Reason: %s', e) 184 except Exception as e: 185 logging.warning(e) 186