xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/lib/ts_mon_config.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# -*- coding: utf-8 -*-
2# Copyright 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Wrapper for inframon's command-line flag based configuration."""
7
8from __future__ import print_function
9
10import argparse
11import contextlib
12import multiprocessing
13import os
14import socket
15import signal
16import time
17
18from six.moves import queue as Queue
19
20import six
21
22from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
23from autotest_lib.utils.frozen_chromite.lib import metrics
24from autotest_lib.utils.frozen_chromite.lib import parallel
25
26try:
27  from infra_libs.ts_mon import config
28  from infra_libs.ts_mon import BooleanField
29  from infra_libs.ts_mon import IntegerField
30  from infra_libs.ts_mon import StringField
31  import googleapiclient.discovery
32except (ImportError, RuntimeError) as e:
33  config = None
34  logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e)
35
36
37_WasSetup = False
38_CommonMetricFields = {}
39
40FLUSH_INTERVAL = 60
41
42
43@contextlib.contextmanager
44def TrivialContextManager():
45  """Context manager with no side effects."""
46  yield
47
48
49def GetMetricFieldSpec(fields=None):
50  """Return the corresponding field_spec for metric fields.
51
52  Args:
53    fields: Dictionary containing metric fields.
54
55  Returns:
56    field_spec: List containing any *Field object associated with metric.
57  """
58  field_spec = []
59  if fields:
60    for key, val in fields.items():
61      if isinstance(val, bool):
62        field_spec.append(BooleanField(key))
63      elif isinstance(val, int):
64        field_spec.append(IntegerField(key))
65      elif isinstance(val, six.string_types):
66        field_spec.append(StringField(key))
67      else:
68        logging.error("Couldn't classify the metric field %s:%s",
69                      key, val)
70
71  return field_spec
72
73def AddCommonFields(fields=None, field_spec=None):
74  """Add cbuildbot-wide common fields to a given field set.
75
76  Args:
77    fields: Dictionary containing metric fields to which common metric fields
78            will be added.
79    field_spec: List containing any *Field object associated with metric.
80
81  Returns:
82    Dictionary containing complete set of metric fields to be applied to
83    metric and a list of corresponding field_spec.
84  """
85  metric_fields = (dict(_CommonMetricFields) if _CommonMetricFields
86                   else {})
87
88  if metric_fields:
89    metric_fields.update(fields or {})
90    return metric_fields, GetMetricFieldSpec(metric_fields)
91  else:
92    return fields, field_spec
93
94
95def SetupTsMonGlobalState(service_name,
96                          indirect=False,
97                          suppress_exception=True,
98                          short_lived=False,
99                          auto_flush=True,
100                          common_metric_fields=None,
101                          debug_file=None,
102                          task_num=0):
103  """Uses a dummy argument parser to get the default behavior from ts-mon.
104
105  Args:
106    service_name: The name of the task we are sending metrics from.
107    indirect: Whether to create a metrics.METRICS_QUEUE object and a separate
108              process for indirect metrics flushing. Useful for forking,
109              because forking would normally create a duplicate ts_mon thread.
110    suppress_exception: True to silence any exception during the setup. Default
111              is set to True.
112    short_lived: Whether this process is short-lived and should use the autogen
113              hostname prefix.
114    auto_flush: Whether to create a thread to automatically flush metrics every
115              minute.
116    common_metric_fields: Dictionary containing the metric fields that will be
117              added to all metrics.
118    debug_file: If non-none, send metrics to this path instead of to PubSub.
119    task_num: (Default 0) The task_num target field of the metrics to emit.
120  """
121  if not config:
122    return TrivialContextManager()
123
124  # The flushing subprocess calls .flush manually.
125  if indirect:
126    auto_flush = False
127
128  if common_metric_fields:
129    _CommonMetricFields.update(common_metric_fields)
130
131  # google-api-client has too much noisey logging.
132  options = _GenerateTsMonArgparseOptions(
133      service_name, short_lived, auto_flush, debug_file, task_num)
134
135  if indirect:
136    return _CreateTsMonFlushingProcess(options)
137  else:
138    _SetupTsMonFromOptions(options, suppress_exception)
139    return TrivialContextManager()
140
141
142def _SetupTsMonFromOptions(options, suppress_exception):
143  """Sets up ts-mon global state given parsed argparse options.
144
145  Args:
146    options: An argparse options object containing ts-mon flags.
147    suppress_exception: True to silence any exception during the setup. Default
148                        is set to True.
149  """
150  googleapiclient.discovery.logger.setLevel(logging.WARNING)
151  try:
152    config.process_argparse_options(options)
153    logging.notice('ts_mon was set up.')
154    global _WasSetup  # pylint: disable=global-statement
155    _WasSetup = True
156  except Exception as e:
157    logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e,
158                    exc_info=True)
159    if not suppress_exception:
160      raise
161
162
163def _GenerateTsMonArgparseOptions(service_name, short_lived,
164                                  auto_flush, debug_file, task_num):
165  """Generates an arg list for ts-mon to consume.
166
167  Args:
168    service_name: The name of the task we are sending metrics from.
169    short_lived: Whether this process is short-lived and should use the autogen
170                 hostname prefix.
171    auto_flush: Whether to create a thread to automatically flush metrics every
172                minute.
173    debug_file: If non-none, send metrics to this path instead of to PubSub.
174    task_num: Override the default task num of 0.
175  """
176  parser = argparse.ArgumentParser()
177  config.add_argparse_options(parser)
178
179  args = [
180      '--ts-mon-target-type', 'task',
181      '--ts-mon-task-service-name', service_name,
182      '--ts-mon-task-job-name', service_name,
183  ]
184
185  if debug_file:
186    args.extend(['--ts-mon-endpoint', 'file://' + debug_file])
187
188  # Short lived processes will have autogen: prepended to their hostname and
189  # use task-number=PID to trigger shorter retention policies under
190  # chrome-infra@, and used by a Monarch precomputation to group across the
191  # task number.
192  # Furthermore, we assume they manually call ts_mon.Flush(), because the
193  # ts_mon thread will drop messages if the process exits before it flushes.
194  if short_lived:
195    auto_flush = False
196    fqdn = socket.getfqdn().lower()
197    host = fqdn.split('.')[0]
198    args.extend(['--ts-mon-task-hostname', 'autogen:' + host,
199                 '--ts-mon-task-number', str(os.getpid())])
200  elif task_num:
201    args.extend(['--ts-mon-task-number', str(task_num)])
202
203  args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual'])
204  return parser.parse_args(args=args)
205
206
207@contextlib.contextmanager
208def _CreateTsMonFlushingProcess(options):
209  """Creates a separate process to flush ts_mon metrics.
210
211  Useful for multiprocessing scenarios where we don't want multiple ts-mon
212  threads send contradictory metrics. Instead, functions in
213  chromite.lib.metrics will send their calls to a Queue, which is consumed by a
214  dedicated flushing process.
215
216  Args:
217    options: An argparse options object to configure ts-mon with.
218
219  Side effects:
220    Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions
221    to send their calls to the Queue instead of creating the metrics.
222  """
223  # If this is nested, we don't need to create another queue and another
224  # message consumer. Do nothing to continue to use the existing queue.
225  if metrics.MESSAGE_QUEUE or metrics.FLUSHING_PROCESS:
226    return
227
228  with parallel.Manager() as manager:
229    message_q = manager.Queue()
230
231    metrics.FLUSHING_PROCESS = multiprocessing.Process(
232        target=lambda: _SetupAndConsumeMessages(message_q, options))
233    metrics.FLUSHING_PROCESS.start()
234
235    # this makes the chromite.lib.metric functions use the queue.
236    # note - we have to do this *after* forking the ConsumeMessages process.
237    metrics.MESSAGE_QUEUE = message_q
238
239    try:
240      yield message_q
241    finally:
242      _CleanupMetricsFlushingProcess()
243
244
245def _CleanupMetricsFlushingProcess():
246  """Sends sentinal value to flushing process and .joins it."""
247  # Now that there is no longer a process to listen to the Queue, re-set it
248  # to None so that any future metrics are created within this process.
249  message_q = metrics.MESSAGE_QUEUE
250  flushing_process = metrics.FLUSHING_PROCESS
251  metrics.MESSAGE_QUEUE = None
252  metrics.FLUSHING_PROCESS = None
253
254  # If the process has already died, we don't need to try to clean it up.
255  if not flushing_process.is_alive():
256    return
257
258  # Send the sentinal value for "flush one more time and exit".
259  try:
260    message_q.put(None)
261  # If the flushing process quits, the message Queue can become full.
262  except IOError:
263    if not flushing_process.is_alive():
264      return
265
266  logging.info('Waiting for ts_mon flushing process to finish...')
267  flushing_process.join(timeout=FLUSH_INTERVAL*2)
268  if flushing_process.is_alive():
269    flushing_process.terminate()
270  if flushing_process.exitcode:
271    logging.warning('ts_mon_config flushing process did not exit cleanly.')
272  logging.info('Finished waiting for ts_mon process.')
273
274
275def _SetupAndConsumeMessages(message_q, options):
276  """Sets up ts-mon, and starts a MetricConsumer loop.
277
278  Args:
279    message_q: The metric multiprocessing.Queue to read from.
280    options: An argparse options object to configure ts-mon with.
281  """
282  # Configure ts-mon, but don't start up a sending thread.
283  _SetupTsMonFromOptions(options, suppress_exception=True)
284  if not _WasSetup:
285    return
286
287  return MetricConsumer(message_q).Consume()
288
289
290class MetricConsumer(object):
291  """Configures ts_mon and gets metrics from a message queue.
292
293  This class is meant to be used in a subprocess. It configures itself
294  to receive a SIGHUP signal when the parent process dies, and catches the
295  signal in order to have a chance to flush any pending metrics one more time
296  before quitting.
297  """
298  def __init__(self, message_q):
299    # If our parent dies, finish flushing before exiting.
300    self.reset_after_flush = []
301    self.last_flush = 0
302    self.pending = False
303    self.message_q = message_q
304
305    if parallel.ExitWithParent(signal.SIGHUP):
306      signal.signal(signal.SIGHUP, lambda _sig, _stack: self._WaitToFlush())
307
308
309  def Consume(self):
310    """Emits metrics from self.message_q, flushing periodically.
311
312    The loop is terminated by a None entry on the Queue, which is a friendly
313    signal from the parent process that it's time to shut down. Before
314    returning, we wait to flush one more time to make sure that all the
315    metrics were sent.
316    """
317    message = self.message_q.get()
318    while message:
319      self._CallMetric(message)
320      message = self._WaitForNextMessage()
321
322    if self.pending:
323      self._WaitToFlush()
324
325
326  def _CallMetric(self, message):
327    """Calls the metric method from |message|, ignoring exceptions."""
328    try:
329      cls = getattr(metrics, message.metric_name)
330      message.method_kwargs.setdefault('fields', {})
331      message.metric_kwargs.setdefault('field_spec', [])
332      message.method_kwargs['fields'], message.metric_kwargs['field_spec'] = (
333          AddCommonFields(message.method_kwargs['fields'],
334                          message.metric_kwargs['field_spec']))
335      metric = cls(*message.metric_args, **message.metric_kwargs)
336      if message.reset_after:
337        self.reset_after_flush.append(metric)
338      getattr(metric, message.method)(
339          *message.method_args,
340          **message.method_kwargs)
341      self.pending = True
342    except Exception:
343      logging.exception('Caught an exception while running %s',
344                        _MethodCallRepr(message))
345
346
347  def _WaitForNextMessage(self):
348    """Waits for a new message, flushing every |FLUSH_INTERVAL| seconds."""
349    while True:
350      time_delta = self._FlushIfReady()
351      try:
352        timeout = FLUSH_INTERVAL - time_delta
353        message = self.message_q.get(timeout=timeout)
354        return message
355      except Queue.Empty:
356        pass
357
358
359  def _WaitToFlush(self):
360    """Sleeps until the next time we can call metrics.Flush(), then flushes."""
361    time_delta = time.time() - self.last_flush
362    time.sleep(max(0, FLUSH_INTERVAL - time_delta))
363    metrics.Flush(reset_after=self.reset_after_flush)
364
365
366  def _FlushIfReady(self):
367    """Call metrics.Flush() if we are ready and have pending metrics.
368
369    This allows us to only call flush every FLUSH_INTERVAL seconds.
370    """
371    now = time.time()
372    time_delta = now - self.last_flush
373    if time_delta > FLUSH_INTERVAL:
374      self.last_flush = now
375      time_delta = 0
376      metrics.Flush(reset_after=self.reset_after_flush)
377      self.pending = False
378    return time_delta
379
380
381def _MethodCallRepr(message):
382  """Gives a string representation of |obj|.|method|(*|args|, **|kwargs|)
383
384  Args:
385    message: A MetricCall object.
386  """
387  if not message:
388    return repr(message)
389  obj = message.metric_name
390  method = message.method
391  args = message.method_args
392  kwargs = message.method_kwargs
393
394  args_strings = ([repr(x) for x in args] +
395                  [(str(k) + '=' + repr(v))
396                   for k, v in kwargs.items()])
397  return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))
398