xref: /aosp_15_r20/external/perfetto/infra/ci/controller/main.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1# Copyright (C) 2019 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import flask
17import logging
18import re
19import urllib.parse
20
21from datetime import datetime, timedelta
22from common_utils import init_logging, defer, req_async, utc_now_iso, parse_iso_time, SCOPES
23from config import DB, GERRIT_HOST, GERRIT_PROJECT, PROJECT
24from config import CI_SITE, GERRIT_VOTING_ENABLED, JOB_CONFIGS, LOGS_TTL_DAYS
25from config import TRUSTED_EMAILS, GCS_ARTIFACTS, JOB_TIMEOUT_SEC
26from config import CL_TIMEOUT_SEC
27from functools import wraps
28from stackdriver_metrics import STACKDRIVER_METRICS
29
30STACKDRIVER_API = 'https://monitoring.googleapis.com/v3/projects/%s' % PROJECT
31
32SCOPES.append('https://www.googleapis.com/auth/firebase.database')
33SCOPES.append('https://www.googleapis.com/auth/userinfo.email')
34SCOPES.append('https://www.googleapis.com/auth/datastore')
35SCOPES.append('https://www.googleapis.com/auth/monitoring')
36SCOPES.append('https://www.googleapis.com/auth/monitoring.write')
37
38app = flask.Flask(__name__)
39
40is_handling_route = {}
41
42# ------------------------------------------------------------------------------
43# Misc utility functions
44# ------------------------------------------------------------------------------
45
46
47def is_trusted(email):
48  return re.match(TRUSTED_EMAILS, email)
49
50
51def no_concurrency(f):
52  route_name = f.__name__
53  is_handling_route[route_name] = False
54
55  @wraps(f)
56  async def decorated_function(*args, **kwargs):
57    if is_handling_route[route_name]:
58      return flask.abort(
59          423, description='Handler %s already running' % route_name)
60    is_handling_route[route_name] = True
61    try:
62      return await f(*args, **kwargs)
63    finally:
64      is_handling_route[route_name] = False
65
66  return decorated_function
67
68
69# ------------------------------------------------------------------------------
70# HTTP handlers
71# ------------------------------------------------------------------------------
72
73
74@app.route('/_ah/start', methods=['GET', 'POST'])
75async def http_start():
76  init_logging()
77  await create_stackdriver_metric_definitions()
78  return 'OK ' + datetime.now().isoformat()
79
80
81@app.route('/controller/tick', methods=['GET', 'POST'])
82@no_concurrency
83async def http_tick():
84  # The tick is invoked by cron.yaml every 1 minute, it doesn't allow sub-minute
85  # jobs. Here we want to poll every 15 seconds to be more responsive. So every
86  # tick keeps repeating the polling for a minute.
87  deadline = datetime.now() + timedelta(seconds=55)
88  while datetime.now() < deadline:
89    await check_new_cls()
90    await check_pending_cls()
91    await update_queue_metrics()
92    asyncio.sleep(15)
93  return 'OK ' + datetime.now().isoformat()
94
95
96@app.route('/controller/queue_postsubmit_jobs', methods=['GET', 'POST'])
97@no_concurrency
98async def http_queue_postsubmit_jobs():
99  await queue_postsubmit_jobs('main')
100  return 'OK ' + datetime.now().isoformat()
101
102
103@app.route('/controller/delete_stale_jobs', methods=['GET', 'POST'])
104@no_concurrency
105async def http_delete_stale_jobs():
106  await delete_stale_jobs()
107  return 'OK ' + datetime.now().isoformat()
108
109
110@app.route('/controller/delete_stale_workers', methods=['GET', 'POST'])
111@no_concurrency
112async def http_delete_stale_workers():
113  await delete_stale_workers()
114  return 'OK ' + datetime.now().isoformat()
115
116
117@app.route('/controller/delete_expired_logs', methods=['GET', 'POST'])
118@no_concurrency
119async def http_delete_expired_logs():
120  await delete_expired_logs(LOGS_TTL_DAYS)
121  return 'OK ' + datetime.now().isoformat()
122
123
124# Enddpoints below are only for manual testing & mainteinance.
125
126
127@app.route(
128    '/controller/delete_expired_logs/<int:ttl_days>', methods=['GET', 'POST'])
129async def http_delete_expired_logs_ttl(ttl_days):
130  await delete_expired_logs(ttl_days)
131  return 'OK ' + datetime.now().isoformat()
132
133
134@app.route('/controller/delete_job_logs/<job_id>', methods=['GET', 'POST'])
135async def http_delete_job_logs(job_id):
136  await delete_job_logs(job_id)
137  return 'OK ' + datetime.now().isoformat()
138
139
140# This is to test HTTP timeouts
141@app.route('/controller/sleep/<int:sleep_sec>', methods=['GET', 'POST'])
142async def http_sleep(sleep_sec):
143  await asyncio.sleep(sleep_sec)
144  return 'OK ' + datetime.now().isoformat()
145
146
147@app.route('/controller/sleep_locked/<int:sleep_sec>', methods=['GET', 'POST'])
148@no_concurrency
149async def http_sleep_locked(sleep_sec):
150  await asyncio.sleep(sleep_sec)
151  return 'OK ' + datetime.now().isoformat()
152
153
154# ------------------------------------------------------------------------------
155# Deferred jobs
156# ------------------------------------------------------------------------------
157
158
159async def check_new_cls():
160  ''' Poll for new CLs and asynchronously enqueue jobs for them.'''
161  logging.info('Polling for new Gerrit CLs')
162  date_limit = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
163  url = 'https://%s/a/changes/' % GERRIT_HOST
164  url += '?o=CURRENT_REVISION&o=DETAILED_ACCOUNTS&o=LABELS&n=32'
165  url += '&q=branch:main+project:%s' % GERRIT_PROJECT
166  url += '+is:open+after:%s' % date_limit
167  resp = await req_async('GET', url, gerrit=True)
168  tasks = []
169  for change in (change for change in resp if 'revisions' in change):
170    rev_hash = list(change['revisions'].keys())[0]
171    rev = change['revisions'][rev_hash]
172    owner = rev['uploader']['email']
173    prs_ready = change['labels'].get('Presubmit-Ready', {}).get('approved', {})
174    prs_owner = prs_ready.get('email', '')
175    # Only submit jobs for patchsets that are either uploaded by a trusted
176    # account or are marked as Presubmit-Verified by a trustd account.
177    if not is_trusted(owner) and not is_trusted(prs_owner):
178      continue
179    tasks.append(
180        defer(
181            check_new_cl(
182                cl=str(change['_number']),
183                patchset=str(rev['_number']),
184                change_id=change['id'],
185                rev_hash=rev_hash,
186                ref=rev['ref'],
187                wants_vote=True if prs_ready else False)))
188  await asyncio.gather(*tasks)
189
190
191def append_jobs(patch_obj, src, git_ref, now=None):
192  '''Creates the worker jobs (defined in config.py) for the given CL.
193
194  Jobs are keyed by timestamp-cl-patchset-config to get a fair schedule (workers
195  pull jobs ordered by the key above).
196  It dosn't directly write into the DB, it just appends keys to the passed
197  |patch_obj|, so the whole set of CL descriptor + jobs can be added atomically
198  to the datastore.
199  src: is cls/1234/1 (cl and patchset number).
200  '''
201  logging.info('Enqueueing jobs fos cl %s', src)
202  timestamp = (now or datetime.utcnow()).strftime('%Y%m%d%H%M%S')
203  for cfg_name, env in JOB_CONFIGS.items():
204    job_id = '%s--%s--%s' % (timestamp, src.replace('/', '-'), cfg_name)
205    logging.info('Enqueueing job %s', job_id)
206    patch_obj['jobs/' + job_id] = {
207        'src': src,
208        'type': cfg_name,
209        'env': dict(env, PERFETTO_TEST_GIT_REF=git_ref),
210        'status': 'QUEUED',
211        'time_queued': utc_now_iso(),
212    }
213    patch_obj['jobs_queued/' + job_id] = 0
214    patch_obj[src]['jobs'][job_id] = 0
215
216
217async def check_new_cl(change_id: str, rev_hash: str, cl: str, patchset: str,
218                       ref: str, wants_vote: bool):
219  '''Creates the CL + jobs entries in the DB for the given CL if doesn't exist
220
221  If exists check if a Presubmit-Ready label has been added and if so updates it
222  with the message + vote.
223  '''
224  # We want to do two things here:
225  # 1) If the CL doesn't exist (hence vote_prop is None) carry on below and
226  #    enqueue jobs for it.
227  # 2) If the CL exists, we don't need to kick new jobs. However, the user
228  #    might have addeed a Presubmit-Ready label after we created the CL. In
229  #    this case update the |wants_vote| flag and return.
230  logging.debug('check_new_cl(%s-%s)', cl, patchset)
231  vote_prop = await req_async(
232      'GET', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset))
233  if vote_prop is not None:
234    if vote_prop != wants_vote and wants_vote:
235      logging.info('Updating wants_vote flag on %s-%s', cl, patchset)
236      await req_async(
237          'PUT', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset), body=True)
238      # If the label is applied after we have finished running all the jobs just
239      # jump straight to the voting.
240      await check_pending_cl(cl_and_ps='%s-%s' % (cl, patchset))
241    logging.debug('check_new_cl(%s-%s): already queued', cl, patchset)
242    return
243
244  # This is the first time we see this patchset, enqueue jobs for it.
245
246  # Dequeue jobs for older patchsets, if any.
247  await cancel_older_jobs(cl=cl, patchset=patchset)
248
249  src = 'cls/%s-%s' % (cl, patchset)
250  # Enqueue jobs for the latest patchset.
251  patch_obj = {}
252  patch_obj['cls_pending/%s-%s' % (cl, patchset)] = 0
253  patch_obj[src] = {
254      'change_id': change_id,
255      'revision_id': rev_hash,
256      'time_queued': utc_now_iso(),
257      'jobs': {},
258      'wants_vote': wants_vote,
259  }
260  append_jobs(patch_obj, src, ref)
261  logging.debug('check_new_cl(%s-%s): queueing jobs', cl, patchset)
262  await req_async('PATCH', DB + '.json', body=patch_obj)
263
264
265async def cancel_older_jobs(cl: str, patchset: str):
266  first_key = '%s-0' % cl
267  last_key = '%s-z' % cl
268  filt = 'orderBy="$key"&startAt="%s"&endAt="%s"' % (first_key, last_key)
269  cl_objs = await req_async('GET', '%s/cls.json?%s' % (DB, filt)) or {}
270  tasks = []
271  for cl_and_ps, cl_obj in cl_objs.items():
272    ps = int(cl_and_ps.split('-')[-1])
273    if cl_obj.get('time_ended') or ps >= int(patchset):
274      continue
275    logging.info('Cancelling jobs for previous patchset %s', cl_and_ps)
276    for job_id in cl_obj['jobs'].keys():
277      tasks.append(defer(cancel_job(job_id=job_id)))
278  await asyncio.gather(*tasks)
279
280
281async def check_pending_cls():
282  # Check if any pending CL has completed (all jobs are done). If so publish
283  # the comment and vote on the CL.
284  pending_cls = await req_async('GET', '%s/cls_pending.json' % DB) or {}
285  tasks = []
286  for cl_and_ps, _ in pending_cls.items():
287    tasks.append(defer(check_pending_cl(cl_and_ps=cl_and_ps)))
288  await asyncio.gather(*tasks)
289
290
291async def check_pending_cl(cl_and_ps: str):
292  # This function can be called twice on the same CL, e.g., in the case when the
293  # Presubmit-Ready label is applied after we have finished running all the
294  # jobs (we run presubmit regardless, only the voting is conditioned by PR).
295  cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps))
296  all_jobs = cl_obj.get('jobs', {}).keys()
297  pending_jobs = []
298  interrupted_jobs = []
299  for job_id in all_jobs:
300    job_status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id))
301    pending_jobs += [job_id] if job_status in ('QUEUED', 'STARTED') else []
302    interrupted_jobs += [job_id] if job_status in ('INTERRUPTED') else []
303
304  # Interrupted jobs are due to VMs being shutdown (usually due to a scale-down)
305  # Automatically re-queue them so they get picked up by some other vm.
306  await asyncio.gather(*[requeue_job(job_id) for job_id in interrupted_jobs])
307
308  if pending_jobs:
309    # If the CL has been pending for too long cancel all its jobs. Upon the next
310    # scan it will be deleted and optionally voted on.
311    t_queued = parse_iso_time(cl_obj['time_queued'])
312    age_sec = (datetime.utcnow() - t_queued).total_seconds()
313    if age_sec > CL_TIMEOUT_SEC:
314      logging.warning('Canceling %s, it has been pending for too long (%s sec)',
315                      cl_and_ps, int(age_sec))
316      tasks = [defer(cancel_job(job_id)) for job_id in pending_jobs]
317      await asyncio.gather(*tasks)
318
319  if pending_jobs or interrupted_jobs:
320    return
321  logging.info('All jobs completed for CL %s', cl_and_ps)
322
323  # Remove the CL from the pending queue and update end time.
324  patch_obj = {
325      'cls_pending/%s' % cl_and_ps: {},  # = DELETE
326      'cls/%s/time_ended' % cl_and_ps: cl_obj.get('time_ended', utc_now_iso()),
327  }
328  await req_async('PATCH', '%s.json' % DB, body=patch_obj)
329  await update_cl_metrics(src='cls/' + cl_and_ps)
330  tasks = [defer(update_job_metrics(job_id)) for job_id in all_jobs]
331  await asyncio.gather(*tasks)
332  if cl_obj.get('wants_vote'):
333    await comment_and_vote_cl(cl_and_ps=cl_and_ps)
334
335
336async def comment_and_vote_cl(cl_and_ps: str):
337  cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps))
338
339  if cl_obj.get('voted'):
340    logging.error('Already voted on CL %s', cl_and_ps)
341    return
342
343  if not cl_obj['wants_vote'] or not GERRIT_VOTING_ENABLED:
344    logging.info('Skipping voting on CL %s', cl_and_ps)
345    return
346
347  cl_vote = 1
348  passed_jobs = []
349  failed_jobs = {}
350  ui_links = []
351  cancelled = False
352  for job_id in cl_obj['jobs'].keys():
353    job_obj = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id))
354    job_config = JOB_CONFIGS.get(job_obj['type'], {})
355    if job_obj['status'] == 'CANCELLED':
356      cancelled = True
357    if '-ui-' in job_id:
358      ui_links.append('https://storage.googleapis.com/%s/%s/ui/index.html' %
359                      (GCS_ARTIFACTS, job_id))
360      ui_links.append(
361          'https://storage.googleapis.com/%s/%s/ui-test-artifacts/index.html' %
362          (GCS_ARTIFACTS, job_id))
363    if job_obj['status'] == 'COMPLETED':
364      passed_jobs.append(job_id)
365    elif not job_config.get('SKIP_VOTING', False):
366      cl_vote = -1
367      failed_jobs[job_id] = job_obj['status']
368
369  msg = ''
370  if cancelled:
371    msg += 'Some jobs in this CI run were cancelled. This likely happened '
372    msg += 'because a new patchset has been uploaded. Skipping vote.\n'
373  log_url = CI_SITE + '/#!/logs'
374  if failed_jobs:
375    msg += 'FAIL:\n'
376    msg += ''.join([
377        '- %s/%s (%s)\n' % (log_url, job_id, status)
378        for (job_id, status) in failed_jobs.items()
379    ])
380  if passed_jobs:
381    msg += '#\nPASS:\n'
382    msg += ''.join(['- %s/%s\n' % (log_url, job_id) for job_id in passed_jobs])
383  if ui_links:
384    msg += '\nArtifacts:\n' + ''.join('- %s\n' % link for link in ui_links)
385  msg += 'CI page for this CL:\n'
386  msg += '- https://ci.perfetto.dev/#!/cls/%s\n' % cl_and_ps.split('-')[0]
387  body = {'labels': {}, 'message': msg}
388  if not cancelled:
389    body['labels']['Code-Review'] = cl_vote
390  logging.info('Posting results for CL %s', cl_and_ps)
391  url = 'https://%s/a/changes/%s/revisions/%s/review' % (
392      GERRIT_HOST, cl_obj['change_id'], cl_obj['revision_id'])
393  await req_async('POST', url, body=body, gerrit=True)
394  await req_async('PUT', '%s/cls/%s/voted.json' % (DB, cl_and_ps), body=True)
395
396
397async def queue_postsubmit_jobs(branch: str, revision: str = None):
398  '''Creates the jobs entries in the DB for the given branch or revision
399
400  Can be called in two modes:
401    1. ?branch=main: Will retrieve the SHA1 of main and call the one below.
402    2. ?branch=main&rev=deadbeef1234: queues jobs for the given revision.
403  '''
404  prj = urllib.parse.quote(GERRIT_PROJECT, '')
405  assert (branch)
406
407  if not revision:
408    # Get the commit SHA1 of the head of the branch.
409    url = 'https://%s/a/projects/%s/branches/%s' % (GERRIT_HOST, prj, branch)
410    revision = (await req_async('GET', url, gerrit=True))['revision']
411    assert (revision)
412    # If the latest entry matches the revision, quit without queueing another
413    # set of jobs for the same CL. This is an optimization to avoid wasting
414    # compute over the weekend to rebuild the same revision every hour.
415    filt = 'orderBy="$key"&limitToLast=1'
416    cl_objs = await req_async('GET', '%s/branches.json?%s' % (DB, filt)) or {}
417    if cl_objs and next(iter(cl_objs.values())).get('rev') == revision:
418      logging.debug('Skipping postsubmits for %s: already run', revision)
419      return
420    await queue_postsubmit_jobs(branch=branch, revision=revision)
421    return
422
423  # Get the committer datetime for the given revision.
424  url = 'https://%s/a/projects/%s/commits/%s' % (GERRIT_HOST, prj, revision)
425  commit_info = await req_async('GET', url, gerrit=True)
426  time_committed = commit_info['committer']['date'].split('.')[0]
427  time_committed = datetime.strptime(time_committed, '%Y-%m-%d %H:%M:%S')
428
429  # Enqueue jobs.
430  src = 'branches/%s-%s' % (branch, time_committed.strftime('%Y%m%d%H%M%S'))
431  now = datetime.utcnow()
432  patch_obj = {
433      src: {
434          'rev': revision,
435          'subject': commit_info['subject'][:100],
436          'author': commit_info['author'].get('email', 'N/A'),
437          'time_committed': utc_now_iso(time_committed),
438          'time_queued': utc_now_iso(),
439          'jobs': {},
440      }
441  }
442  ref = 'refs/heads/' + branch
443  append_jobs(patch_obj, src, ref, now)
444  await req_async('PATCH', DB + '.json', body=patch_obj)
445
446
447async def delete_expired_logs(ttl_days=LOGS_TTL_DAYS):
448  url = '%s/logs.json?limitToFirst=1000&shallow=true' % (DB)
449  logs = await req_async('GET', url) or {}
450  tasks = []
451  logging.debug('delete_expired_logs: got %d keys', len(logs.keys()))
452  for job_id in logs.keys():
453    age_days = (datetime.now() - datetime.strptime(job_id[:8], '%Y%m%d')).days
454    if age_days > ttl_days:
455      logging.debug('Delete log %s', job_id)
456      tasks.append(defer(delete_job_logs(job_id=job_id)))
457  await asyncio.gather(*tasks)
458
459
460async def delete_stale_jobs():
461  '''Deletes jobs that are left in the running queue for too long
462
463  This is usually due to a crash in the VM that handles them.
464  '''
465  running_jobs = await req_async('GET', '%s/jobs_running.json?shallow=true' %
466                                 (DB)) or {}
467  tasks = []
468  for job_id in running_jobs.keys():
469    job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id))
470    time_started = parse_iso_time(job.get('time_started', utc_now_iso()))
471    age = (datetime.now() - time_started).total_seconds()
472    if age > JOB_TIMEOUT_SEC * 2:
473      tasks.append(defer(cancel_job(job_id=job_id)))
474  await asyncio.gather(*tasks)
475
476
477async def delete_stale_workers():
478  '''Deletes workers that have been inactive for too long
479
480  This is usually due to a crash in the VM that handles them.
481  '''
482  workers = await req_async('GET', '%s/workers.json' % (DB)) or {}
483  patch_obj = {}
484  for worker_id, worker in workers.items():
485    last_update = parse_iso_time(worker.get('last_update', utc_now_iso()))
486    age = (datetime.now() - last_update).total_seconds()
487    if age > 60 * 60 * 12:
488      patch_obj['workers/' + worker_id] = {}  # DELETE
489  if len(patch_obj) == 0:
490    return
491  logging.info('Purging %d inactive workers', len(patch_obj))
492  await req_async('PATCH', DB + '.json', body=patch_obj)
493
494
495async def cancel_job(job_id: str):
496  '''Cancels a job if not completed or failed.
497
498  This function is racy: workers can complete the queued jobs while we mark them
499  as cancelled. The result of such race is still acceptable.'''
500  status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id))
501  patch_obj = {
502      'jobs_running/%s' % job_id: {},  # = DELETE,
503      'jobs_queued/%s' % job_id: {},  # = DELETE,
504  }
505  if status in ('QUEUED', 'STARTED'):
506    patch_obj['jobs/%s/status' % job_id] = 'CANCELLED'
507    patch_obj['jobs/%s/time_ended' % job_id] = utc_now_iso()
508  await req_async('PATCH', DB + '.json', body=patch_obj)
509
510
511async def requeue_job(job_id: str):
512  '''Re-queues a job that was previously interrupted due to a VM shutdown.'''
513  logging.info('Requeuing interrupted job %s', job_id)
514  patch_obj = {
515      'jobs_running/%s' % job_id: {},  # = DELETE,
516      'jobs_queued/%s' % job_id: 0,
517      'jobs/%s/status' % job_id: 'QUEUED',
518      'jobs/%s/time_queued' % job_id: utc_now_iso(),
519      'jobs/%s/time_started' % job_id: {},  # = DELETE
520      'jobs/%s/time_ended' % job_id: {},  # = DELETE
521      'jobs/%s/worker' % job_id: {},  # = DELETE
522  }
523  await req_async('PATCH', DB + '.json', body=patch_obj)
524
525
526async def delete_job_logs(job_id: str):
527  await req_async('DELETE',
528                  '%s/logs/%s.json?writeSizeLimit=unlimited' % (DB, job_id))
529
530
531async def update_cl_metrics(src: str):
532  cl_obj = await req_async('GET', '%s/%s.json' % (DB, src))
533  t_queued = parse_iso_time(cl_obj['time_queued'])
534  t_ended = parse_iso_time(cl_obj['time_ended'])
535  await write_metrics({
536      'ci_cl_completion_time': {
537          'l': {},
538          'v': int((t_ended - t_queued).total_seconds())
539      }
540  })
541
542
543async def update_job_metrics(job_id: str):
544  job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id))
545  metrics = {}
546  if 'time_queued' in job and 'time_started' in job:
547    t_queued = parse_iso_time(job['time_queued'])
548    t_started = parse_iso_time(job['time_started'])
549    metrics['ci_job_queue_time'] = {
550        'l': {
551            'job_type': job['type']
552        },
553        'v': int((t_started - t_queued).total_seconds())
554    }
555  if 'time_ended' in job and 'time_started' in job:
556    t_started = parse_iso_time(job['time_started'])
557    t_ended = parse_iso_time(job['time_ended'])
558    metrics['ci_job_run_time'] = {
559        'l': {
560            'job_type': job['type']
561        },
562        'v': int((t_ended - t_started).total_seconds())
563    }
564  if metrics:
565    await write_metrics(metrics)
566
567
568async def update_queue_metrics():
569  # Update the stackdriver metric that will drive the autoscaler.
570  queued = await req_async('GET', DB + '/jobs_queued.json?shallow=true') or {}
571  running = await req_async('GET', DB + '/jobs_running.json?shallow=true') or {}
572  logging.debug('ci_job_queue_len: %d + %d', len(queued), len(running))
573  await write_metrics({'ci_job_queue_len': {'v': len(queued) + len(running)}})
574
575
576async def create_stackdriver_metric_definitions():
577  logging.info('Creating Stackdriver metric definitions')
578  for name, metric in STACKDRIVER_METRICS.items():
579    logging.info('Creating metric %s', name)
580    await req_async('POST', STACKDRIVER_API + '/metricDescriptors', body=metric)
581
582
583async def write_metrics(metric_dict):
584  now = utc_now_iso()
585  desc = {'timeSeries': []}
586  for key, spec in metric_dict.items():
587    desc['timeSeries'] += [{
588        'metric': {
589            'type': STACKDRIVER_METRICS[key]['type'],
590            'labels': spec.get('l', {})
591        },
592        'resource': {
593            'type': 'global'
594        },
595        'points': [{
596            'interval': {
597                'endTime': now
598            },
599            'value': {
600                'int64Value': str(spec['v'])
601            }
602        }]
603    }]
604  try:
605    await req_async('POST', STACKDRIVER_API + '/timeSeries', body=desc)
606  except Exception as e:
607    # Metric updates can easily fail due to Stackdriver API limitations.
608    msg = str(e)
609    if 'written more frequently than the maximum sampling' not in msg:
610      logging.error('Metrics update failed: %s', msg)
611