1# -*- coding: utf-8 -*- 2# Copyright 2014 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Wrapper for inframon's command-line flag based configuration.""" 7 8from __future__ import print_function 9 10import argparse 11import contextlib 12import multiprocessing 13import os 14import socket 15import signal 16import time 17 18from six.moves import queue as Queue 19 20import six 21 22from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging 23from autotest_lib.utils.frozen_chromite.lib import metrics 24from autotest_lib.utils.frozen_chromite.lib import parallel 25 26try: 27 from infra_libs.ts_mon import config 28 from infra_libs.ts_mon import BooleanField 29 from infra_libs.ts_mon import IntegerField 30 from infra_libs.ts_mon import StringField 31 import googleapiclient.discovery 32except (ImportError, RuntimeError) as e: 33 config = None 34 logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e) 35 36 37_WasSetup = False 38_CommonMetricFields = {} 39 40FLUSH_INTERVAL = 60 41 42 43@contextlib.contextmanager 44def TrivialContextManager(): 45 """Context manager with no side effects.""" 46 yield 47 48 49def GetMetricFieldSpec(fields=None): 50 """Return the corresponding field_spec for metric fields. 51 52 Args: 53 fields: Dictionary containing metric fields. 54 55 Returns: 56 field_spec: List containing any *Field object associated with metric. 57 """ 58 field_spec = [] 59 if fields: 60 for key, val in fields.items(): 61 if isinstance(val, bool): 62 field_spec.append(BooleanField(key)) 63 elif isinstance(val, int): 64 field_spec.append(IntegerField(key)) 65 elif isinstance(val, six.string_types): 66 field_spec.append(StringField(key)) 67 else: 68 logging.error("Couldn't classify the metric field %s:%s", 69 key, val) 70 71 return field_spec 72 73def AddCommonFields(fields=None, field_spec=None): 74 """Add cbuildbot-wide common fields to a given field set. 75 76 Args: 77 fields: Dictionary containing metric fields to which common metric fields 78 will be added. 79 field_spec: List containing any *Field object associated with metric. 80 81 Returns: 82 Dictionary containing complete set of metric fields to be applied to 83 metric and a list of corresponding field_spec. 84 """ 85 metric_fields = (dict(_CommonMetricFields) if _CommonMetricFields 86 else {}) 87 88 if metric_fields: 89 metric_fields.update(fields or {}) 90 return metric_fields, GetMetricFieldSpec(metric_fields) 91 else: 92 return fields, field_spec 93 94 95def SetupTsMonGlobalState(service_name, 96 indirect=False, 97 suppress_exception=True, 98 short_lived=False, 99 auto_flush=True, 100 common_metric_fields=None, 101 debug_file=None, 102 task_num=0): 103 """Uses a dummy argument parser to get the default behavior from ts-mon. 104 105 Args: 106 service_name: The name of the task we are sending metrics from. 107 indirect: Whether to create a metrics.METRICS_QUEUE object and a separate 108 process for indirect metrics flushing. Useful for forking, 109 because forking would normally create a duplicate ts_mon thread. 110 suppress_exception: True to silence any exception during the setup. Default 111 is set to True. 112 short_lived: Whether this process is short-lived and should use the autogen 113 hostname prefix. 114 auto_flush: Whether to create a thread to automatically flush metrics every 115 minute. 116 common_metric_fields: Dictionary containing the metric fields that will be 117 added to all metrics. 118 debug_file: If non-none, send metrics to this path instead of to PubSub. 119 task_num: (Default 0) The task_num target field of the metrics to emit. 120 """ 121 if not config: 122 return TrivialContextManager() 123 124 # The flushing subprocess calls .flush manually. 125 if indirect: 126 auto_flush = False 127 128 if common_metric_fields: 129 _CommonMetricFields.update(common_metric_fields) 130 131 # google-api-client has too much noisey logging. 132 options = _GenerateTsMonArgparseOptions( 133 service_name, short_lived, auto_flush, debug_file, task_num) 134 135 if indirect: 136 return _CreateTsMonFlushingProcess(options) 137 else: 138 _SetupTsMonFromOptions(options, suppress_exception) 139 return TrivialContextManager() 140 141 142def _SetupTsMonFromOptions(options, suppress_exception): 143 """Sets up ts-mon global state given parsed argparse options. 144 145 Args: 146 options: An argparse options object containing ts-mon flags. 147 suppress_exception: True to silence any exception during the setup. Default 148 is set to True. 149 """ 150 googleapiclient.discovery.logger.setLevel(logging.WARNING) 151 try: 152 config.process_argparse_options(options) 153 logging.notice('ts_mon was set up.') 154 global _WasSetup # pylint: disable=global-statement 155 _WasSetup = True 156 except Exception as e: 157 logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e, 158 exc_info=True) 159 if not suppress_exception: 160 raise 161 162 163def _GenerateTsMonArgparseOptions(service_name, short_lived, 164 auto_flush, debug_file, task_num): 165 """Generates an arg list for ts-mon to consume. 166 167 Args: 168 service_name: The name of the task we are sending metrics from. 169 short_lived: Whether this process is short-lived and should use the autogen 170 hostname prefix. 171 auto_flush: Whether to create a thread to automatically flush metrics every 172 minute. 173 debug_file: If non-none, send metrics to this path instead of to PubSub. 174 task_num: Override the default task num of 0. 175 """ 176 parser = argparse.ArgumentParser() 177 config.add_argparse_options(parser) 178 179 args = [ 180 '--ts-mon-target-type', 'task', 181 '--ts-mon-task-service-name', service_name, 182 '--ts-mon-task-job-name', service_name, 183 ] 184 185 if debug_file: 186 args.extend(['--ts-mon-endpoint', 'file://' + debug_file]) 187 188 # Short lived processes will have autogen: prepended to their hostname and 189 # use task-number=PID to trigger shorter retention policies under 190 # chrome-infra@, and used by a Monarch precomputation to group across the 191 # task number. 192 # Furthermore, we assume they manually call ts_mon.Flush(), because the 193 # ts_mon thread will drop messages if the process exits before it flushes. 194 if short_lived: 195 auto_flush = False 196 fqdn = socket.getfqdn().lower() 197 host = fqdn.split('.')[0] 198 args.extend(['--ts-mon-task-hostname', 'autogen:' + host, 199 '--ts-mon-task-number', str(os.getpid())]) 200 elif task_num: 201 args.extend(['--ts-mon-task-number', str(task_num)]) 202 203 args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual']) 204 return parser.parse_args(args=args) 205 206 207@contextlib.contextmanager 208def _CreateTsMonFlushingProcess(options): 209 """Creates a separate process to flush ts_mon metrics. 210 211 Useful for multiprocessing scenarios where we don't want multiple ts-mon 212 threads send contradictory metrics. Instead, functions in 213 chromite.lib.metrics will send their calls to a Queue, which is consumed by a 214 dedicated flushing process. 215 216 Args: 217 options: An argparse options object to configure ts-mon with. 218 219 Side effects: 220 Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions 221 to send their calls to the Queue instead of creating the metrics. 222 """ 223 # If this is nested, we don't need to create another queue and another 224 # message consumer. Do nothing to continue to use the existing queue. 225 if metrics.MESSAGE_QUEUE or metrics.FLUSHING_PROCESS: 226 return 227 228 with parallel.Manager() as manager: 229 message_q = manager.Queue() 230 231 metrics.FLUSHING_PROCESS = multiprocessing.Process( 232 target=lambda: _SetupAndConsumeMessages(message_q, options)) 233 metrics.FLUSHING_PROCESS.start() 234 235 # this makes the chromite.lib.metric functions use the queue. 236 # note - we have to do this *after* forking the ConsumeMessages process. 237 metrics.MESSAGE_QUEUE = message_q 238 239 try: 240 yield message_q 241 finally: 242 _CleanupMetricsFlushingProcess() 243 244 245def _CleanupMetricsFlushingProcess(): 246 """Sends sentinal value to flushing process and .joins it.""" 247 # Now that there is no longer a process to listen to the Queue, re-set it 248 # to None so that any future metrics are created within this process. 249 message_q = metrics.MESSAGE_QUEUE 250 flushing_process = metrics.FLUSHING_PROCESS 251 metrics.MESSAGE_QUEUE = None 252 metrics.FLUSHING_PROCESS = None 253 254 # If the process has already died, we don't need to try to clean it up. 255 if not flushing_process.is_alive(): 256 return 257 258 # Send the sentinal value for "flush one more time and exit". 259 try: 260 message_q.put(None) 261 # If the flushing process quits, the message Queue can become full. 262 except IOError: 263 if not flushing_process.is_alive(): 264 return 265 266 logging.info('Waiting for ts_mon flushing process to finish...') 267 flushing_process.join(timeout=FLUSH_INTERVAL*2) 268 if flushing_process.is_alive(): 269 flushing_process.terminate() 270 if flushing_process.exitcode: 271 logging.warning('ts_mon_config flushing process did not exit cleanly.') 272 logging.info('Finished waiting for ts_mon process.') 273 274 275def _SetupAndConsumeMessages(message_q, options): 276 """Sets up ts-mon, and starts a MetricConsumer loop. 277 278 Args: 279 message_q: The metric multiprocessing.Queue to read from. 280 options: An argparse options object to configure ts-mon with. 281 """ 282 # Configure ts-mon, but don't start up a sending thread. 283 _SetupTsMonFromOptions(options, suppress_exception=True) 284 if not _WasSetup: 285 return 286 287 return MetricConsumer(message_q).Consume() 288 289 290class MetricConsumer(object): 291 """Configures ts_mon and gets metrics from a message queue. 292 293 This class is meant to be used in a subprocess. It configures itself 294 to receive a SIGHUP signal when the parent process dies, and catches the 295 signal in order to have a chance to flush any pending metrics one more time 296 before quitting. 297 """ 298 def __init__(self, message_q): 299 # If our parent dies, finish flushing before exiting. 300 self.reset_after_flush = [] 301 self.last_flush = 0 302 self.pending = False 303 self.message_q = message_q 304 305 if parallel.ExitWithParent(signal.SIGHUP): 306 signal.signal(signal.SIGHUP, lambda _sig, _stack: self._WaitToFlush()) 307 308 309 def Consume(self): 310 """Emits metrics from self.message_q, flushing periodically. 311 312 The loop is terminated by a None entry on the Queue, which is a friendly 313 signal from the parent process that it's time to shut down. Before 314 returning, we wait to flush one more time to make sure that all the 315 metrics were sent. 316 """ 317 message = self.message_q.get() 318 while message: 319 self._CallMetric(message) 320 message = self._WaitForNextMessage() 321 322 if self.pending: 323 self._WaitToFlush() 324 325 326 def _CallMetric(self, message): 327 """Calls the metric method from |message|, ignoring exceptions.""" 328 try: 329 cls = getattr(metrics, message.metric_name) 330 message.method_kwargs.setdefault('fields', {}) 331 message.metric_kwargs.setdefault('field_spec', []) 332 message.method_kwargs['fields'], message.metric_kwargs['field_spec'] = ( 333 AddCommonFields(message.method_kwargs['fields'], 334 message.metric_kwargs['field_spec'])) 335 metric = cls(*message.metric_args, **message.metric_kwargs) 336 if message.reset_after: 337 self.reset_after_flush.append(metric) 338 getattr(metric, message.method)( 339 *message.method_args, 340 **message.method_kwargs) 341 self.pending = True 342 except Exception: 343 logging.exception('Caught an exception while running %s', 344 _MethodCallRepr(message)) 345 346 347 def _WaitForNextMessage(self): 348 """Waits for a new message, flushing every |FLUSH_INTERVAL| seconds.""" 349 while True: 350 time_delta = self._FlushIfReady() 351 try: 352 timeout = FLUSH_INTERVAL - time_delta 353 message = self.message_q.get(timeout=timeout) 354 return message 355 except Queue.Empty: 356 pass 357 358 359 def _WaitToFlush(self): 360 """Sleeps until the next time we can call metrics.Flush(), then flushes.""" 361 time_delta = time.time() - self.last_flush 362 time.sleep(max(0, FLUSH_INTERVAL - time_delta)) 363 metrics.Flush(reset_after=self.reset_after_flush) 364 365 366 def _FlushIfReady(self): 367 """Call metrics.Flush() if we are ready and have pending metrics. 368 369 This allows us to only call flush every FLUSH_INTERVAL seconds. 370 """ 371 now = time.time() 372 time_delta = now - self.last_flush 373 if time_delta > FLUSH_INTERVAL: 374 self.last_flush = now 375 time_delta = 0 376 metrics.Flush(reset_after=self.reset_after_flush) 377 self.pending = False 378 return time_delta 379 380 381def _MethodCallRepr(message): 382 """Gives a string representation of |obj|.|method|(*|args|, **|kwargs|) 383 384 Args: 385 message: A MetricCall object. 386 """ 387 if not message: 388 return repr(message) 389 obj = message.metric_name 390 method = message.method 391 args = message.method_args 392 kwargs = message.method_kwargs 393 394 args_strings = ([repr(x) for x in args] + 395 [(str(k) + '=' + repr(v)) 396 for k, v in kwargs.items()]) 397 return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings)) 398