xref: /aosp_15_r20/tools/asuite/atest/metrics/clearcut_client.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1# Copyright 2018, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Python client library to write logs to Clearcut.
16
17This class is intended to be general-purpose, usable for any Clearcut LogSource.
18
19    Typical usage example:
20
21    client = clearcut.Clearcut(clientanalytics_pb2.LogRequest.MY_LOGSOURCE)
22    client.log(my_event)
23    client.flush_events()
24"""
25
26import logging
27import ssl
28import threading
29import time
30
31try:
32  from urllib.request import urlopen
33  from urllib.request import Request
34  from urllib.request import HTTPError
35  from urllib.request import URLError
36except ImportError:
37  # for compatibility of asuite_metrics_lib_tests and asuite_cc_lib_tests.
38  from urllib2 import Request
39  from urllib2 import urlopen
40  from urllib2 import HTTPError
41  from urllib2 import URLError
42
43from atest.proto import clientanalytics_pb2
44
45_CLEARCUT_PROD_URL = 'https://play.googleapis.com/log'
46_DEFAULT_BUFFER_SIZE = 100  # Maximum number of events to be buffered.
47_DEFAULT_FLUSH_INTERVAL_SEC = 60  # 1 Minute.
48_BUFFER_FLUSH_RATIO = 0.5  # Flush buffer when we exceed this ratio.
49_CLIENT_TYPE = 6
50
51
52class Clearcut:
53  """Handles logging to Clearcut."""
54
55  def __init__(
56      self, log_source, url=None, buffer_size=None, flush_interval_sec=None
57  ):
58    """Initializes a Clearcut client.
59
60    Args:
61        log_source: The log source.
62        url: The Clearcut url to connect to.
63        buffer_size: The size of the client buffer in number of events.
64        flush_interval_sec: The flush interval in seconds.
65    """
66    self._clearcut_url = url if url else _CLEARCUT_PROD_URL
67    self._log_source = log_source
68    self._buffer_size = buffer_size if buffer_size else _DEFAULT_BUFFER_SIZE
69    self._pending_events = []
70    if flush_interval_sec:
71      self._flush_interval_sec = flush_interval_sec
72    else:
73      self._flush_interval_sec = _DEFAULT_FLUSH_INTERVAL_SEC
74    self._pending_events_lock = threading.Lock()
75    self._scheduled_flush_thread = None
76    self._scheduled_flush_time = float('inf')
77    self._min_next_request_time = 0
78
79  def log(self, event):
80    """Logs events to Clearcut.
81
82    Logging an event can potentially trigger a flush of queued events.
83    Flushing is triggered when the buffer is more than half full or
84    after the flush interval has passed.
85
86    Args:
87      event: A LogEvent to send to Clearcut.
88    """
89    self._append_events_to_buffer([event])
90
91  def flush_events(self):
92    """Cancel whatever is scheduled and schedule an immediate flush."""
93    if self._scheduled_flush_thread:
94      self._scheduled_flush_thread.cancel()
95    self._min_next_request_time = 0
96    self._schedule_flush_thread(0)
97
98  def _serialize_events_to_proto(self, events):
99    log_request = clientanalytics_pb2.LogRequest()
100    log_request.request_time_ms = int(time.time() * 1000)
101    # pylint: disable=no-member
102    log_request.client_info.client_type = _CLIENT_TYPE
103    log_request.log_source = self._log_source
104    log_request.log_event.extend(events)
105    return log_request
106
107  def _append_events_to_buffer(self, events, retry=False):
108    with self._pending_events_lock:
109      self._pending_events.extend(events)
110      if len(self._pending_events) > self._buffer_size:
111        index = len(self._pending_events) - self._buffer_size
112        del self._pending_events[:index]
113      self._schedule_flush(retry)
114
115  def _schedule_flush(self, retry):
116    if (
117        not retry
118        and len(self._pending_events)
119        >= int(self._buffer_size * _BUFFER_FLUSH_RATIO)
120        and self._scheduled_flush_time > time.time()
121    ):
122      # Cancel whatever is scheduled and schedule an immediate flush.
123      if self._scheduled_flush_thread:
124        self._scheduled_flush_thread.cancel()
125      self._schedule_flush_thread(0)
126    elif self._pending_events and not self._scheduled_flush_thread:
127      # Schedule a flush to run later.
128      self._schedule_flush_thread(self._flush_interval_sec)
129
130  def _schedule_flush_thread(self, time_from_now):
131    min_wait_sec = self._min_next_request_time - time.time()
132    if min_wait_sec > time_from_now:
133      time_from_now = min_wait_sec
134    logging.debug('Scheduling thread to run in %f seconds', time_from_now)
135    self._scheduled_flush_thread = threading.Timer(time_from_now, self._flush)
136    self._scheduled_flush_time = time.time() + time_from_now
137    self._scheduled_flush_thread.start()
138
139  def _flush(self):
140    """Flush buffered events to Clearcut.
141
142    If the sent request is unsuccessful, the events will be appended to
143    buffer and rescheduled for next flush.
144    """
145    with self._pending_events_lock:
146      self._scheduled_flush_time = float('inf')
147      self._scheduled_flush_thread = None
148      events = self._pending_events
149      self._pending_events = []
150    if self._min_next_request_time > time.time():
151      self._append_events_to_buffer(events, retry=True)
152      return
153    log_request = self._serialize_events_to_proto(events)
154    self._send_to_clearcut(log_request.SerializeToString())
155
156  # pylint: disable=broad-except
157  # pylint: disable=protected-access
158  def _send_to_clearcut(self, data):
159    """Sends a POST request with data as the body.
160
161    Args:
162        data: The serialized proto to send to Clearcut.
163    """
164    request = Request(self._clearcut_url, data=data)
165    try:
166      ssl._create_default_https_context = ssl._create_unverified_context
167      response = urlopen(request)
168      msg = response.read()
169      logging.debug('LogRequest successfully sent to Clearcut.')
170      log_response = clientanalytics_pb2.LogResponse()
171      log_response.ParseFromString(msg)
172      # pylint: disable=no-member
173      # Throttle based on next_request_wait_millis value.
174      self._min_next_request_time = (
175          log_response.next_request_wait_millis / 1000 + time.time()
176      )
177      logging.debug('LogResponse: %s', log_response)
178    except HTTPError as e:
179      logging.warning(
180          'Failed to push events to Clearcut. Error code: %d', e.code
181      )
182    except URLError as e:
183      logging.warning('Failed to push events to Clearcut. Reason: %s', e)
184    except Exception as e:
185      logging.warning(e)
186