1# Copyright (C) 2019 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import flask 17import logging 18import re 19import urllib.parse 20 21from datetime import datetime, timedelta 22from common_utils import init_logging, defer, req_async, utc_now_iso, parse_iso_time, SCOPES 23from config import DB, GERRIT_HOST, GERRIT_PROJECT, PROJECT 24from config import CI_SITE, GERRIT_VOTING_ENABLED, JOB_CONFIGS, LOGS_TTL_DAYS 25from config import TRUSTED_EMAILS, GCS_ARTIFACTS, JOB_TIMEOUT_SEC 26from config import CL_TIMEOUT_SEC 27from functools import wraps 28from stackdriver_metrics import STACKDRIVER_METRICS 29 30STACKDRIVER_API = 'https://monitoring.googleapis.com/v3/projects/%s' % PROJECT 31 32SCOPES.append('https://www.googleapis.com/auth/firebase.database') 33SCOPES.append('https://www.googleapis.com/auth/userinfo.email') 34SCOPES.append('https://www.googleapis.com/auth/datastore') 35SCOPES.append('https://www.googleapis.com/auth/monitoring') 36SCOPES.append('https://www.googleapis.com/auth/monitoring.write') 37 38app = flask.Flask(__name__) 39 40is_handling_route = {} 41 42# ------------------------------------------------------------------------------ 43# Misc utility functions 44# ------------------------------------------------------------------------------ 45 46 47def is_trusted(email): 48 return re.match(TRUSTED_EMAILS, email) 49 50 51def no_concurrency(f): 52 route_name = f.__name__ 53 is_handling_route[route_name] = False 54 55 @wraps(f) 56 async def decorated_function(*args, **kwargs): 57 if is_handling_route[route_name]: 58 return flask.abort( 59 423, description='Handler %s already running' % route_name) 60 is_handling_route[route_name] = True 61 try: 62 return await f(*args, **kwargs) 63 finally: 64 is_handling_route[route_name] = False 65 66 return decorated_function 67 68 69# ------------------------------------------------------------------------------ 70# HTTP handlers 71# ------------------------------------------------------------------------------ 72 73 74@app.route('/_ah/start', methods=['GET', 'POST']) 75async def http_start(): 76 init_logging() 77 await create_stackdriver_metric_definitions() 78 return 'OK ' + datetime.now().isoformat() 79 80 81@app.route('/controller/tick', methods=['GET', 'POST']) 82@no_concurrency 83async def http_tick(): 84 # The tick is invoked by cron.yaml every 1 minute, it doesn't allow sub-minute 85 # jobs. Here we want to poll every 15 seconds to be more responsive. So every 86 # tick keeps repeating the polling for a minute. 87 deadline = datetime.now() + timedelta(seconds=55) 88 while datetime.now() < deadline: 89 await check_new_cls() 90 await check_pending_cls() 91 await update_queue_metrics() 92 asyncio.sleep(15) 93 return 'OK ' + datetime.now().isoformat() 94 95 96@app.route('/controller/queue_postsubmit_jobs', methods=['GET', 'POST']) 97@no_concurrency 98async def http_queue_postsubmit_jobs(): 99 await queue_postsubmit_jobs('main') 100 return 'OK ' + datetime.now().isoformat() 101 102 103@app.route('/controller/delete_stale_jobs', methods=['GET', 'POST']) 104@no_concurrency 105async def http_delete_stale_jobs(): 106 await delete_stale_jobs() 107 return 'OK ' + datetime.now().isoformat() 108 109 110@app.route('/controller/delete_stale_workers', methods=['GET', 'POST']) 111@no_concurrency 112async def http_delete_stale_workers(): 113 await delete_stale_workers() 114 return 'OK ' + datetime.now().isoformat() 115 116 117@app.route('/controller/delete_expired_logs', methods=['GET', 'POST']) 118@no_concurrency 119async def http_delete_expired_logs(): 120 await delete_expired_logs(LOGS_TTL_DAYS) 121 return 'OK ' + datetime.now().isoformat() 122 123 124# Enddpoints below are only for manual testing & mainteinance. 125 126 127@app.route( 128 '/controller/delete_expired_logs/<int:ttl_days>', methods=['GET', 'POST']) 129async def http_delete_expired_logs_ttl(ttl_days): 130 await delete_expired_logs(ttl_days) 131 return 'OK ' + datetime.now().isoformat() 132 133 134@app.route('/controller/delete_job_logs/<job_id>', methods=['GET', 'POST']) 135async def http_delete_job_logs(job_id): 136 await delete_job_logs(job_id) 137 return 'OK ' + datetime.now().isoformat() 138 139 140# This is to test HTTP timeouts 141@app.route('/controller/sleep/<int:sleep_sec>', methods=['GET', 'POST']) 142async def http_sleep(sleep_sec): 143 await asyncio.sleep(sleep_sec) 144 return 'OK ' + datetime.now().isoformat() 145 146 147@app.route('/controller/sleep_locked/<int:sleep_sec>', methods=['GET', 'POST']) 148@no_concurrency 149async def http_sleep_locked(sleep_sec): 150 await asyncio.sleep(sleep_sec) 151 return 'OK ' + datetime.now().isoformat() 152 153 154# ------------------------------------------------------------------------------ 155# Deferred jobs 156# ------------------------------------------------------------------------------ 157 158 159async def check_new_cls(): 160 ''' Poll for new CLs and asynchronously enqueue jobs for them.''' 161 logging.info('Polling for new Gerrit CLs') 162 date_limit = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d') 163 url = 'https://%s/a/changes/' % GERRIT_HOST 164 url += '?o=CURRENT_REVISION&o=DETAILED_ACCOUNTS&o=LABELS&n=32' 165 url += '&q=branch:main+project:%s' % GERRIT_PROJECT 166 url += '+is:open+after:%s' % date_limit 167 resp = await req_async('GET', url, gerrit=True) 168 tasks = [] 169 for change in (change for change in resp if 'revisions' in change): 170 rev_hash = list(change['revisions'].keys())[0] 171 rev = change['revisions'][rev_hash] 172 owner = rev['uploader']['email'] 173 prs_ready = change['labels'].get('Presubmit-Ready', {}).get('approved', {}) 174 prs_owner = prs_ready.get('email', '') 175 # Only submit jobs for patchsets that are either uploaded by a trusted 176 # account or are marked as Presubmit-Verified by a trustd account. 177 if not is_trusted(owner) and not is_trusted(prs_owner): 178 continue 179 tasks.append( 180 defer( 181 check_new_cl( 182 cl=str(change['_number']), 183 patchset=str(rev['_number']), 184 change_id=change['id'], 185 rev_hash=rev_hash, 186 ref=rev['ref'], 187 wants_vote=True if prs_ready else False))) 188 await asyncio.gather(*tasks) 189 190 191def append_jobs(patch_obj, src, git_ref, now=None): 192 '''Creates the worker jobs (defined in config.py) for the given CL. 193 194 Jobs are keyed by timestamp-cl-patchset-config to get a fair schedule (workers 195 pull jobs ordered by the key above). 196 It dosn't directly write into the DB, it just appends keys to the passed 197 |patch_obj|, so the whole set of CL descriptor + jobs can be added atomically 198 to the datastore. 199 src: is cls/1234/1 (cl and patchset number). 200 ''' 201 logging.info('Enqueueing jobs fos cl %s', src) 202 timestamp = (now or datetime.utcnow()).strftime('%Y%m%d%H%M%S') 203 for cfg_name, env in JOB_CONFIGS.items(): 204 job_id = '%s--%s--%s' % (timestamp, src.replace('/', '-'), cfg_name) 205 logging.info('Enqueueing job %s', job_id) 206 patch_obj['jobs/' + job_id] = { 207 'src': src, 208 'type': cfg_name, 209 'env': dict(env, PERFETTO_TEST_GIT_REF=git_ref), 210 'status': 'QUEUED', 211 'time_queued': utc_now_iso(), 212 } 213 patch_obj['jobs_queued/' + job_id] = 0 214 patch_obj[src]['jobs'][job_id] = 0 215 216 217async def check_new_cl(change_id: str, rev_hash: str, cl: str, patchset: str, 218 ref: str, wants_vote: bool): 219 '''Creates the CL + jobs entries in the DB for the given CL if doesn't exist 220 221 If exists check if a Presubmit-Ready label has been added and if so updates it 222 with the message + vote. 223 ''' 224 # We want to do two things here: 225 # 1) If the CL doesn't exist (hence vote_prop is None) carry on below and 226 # enqueue jobs for it. 227 # 2) If the CL exists, we don't need to kick new jobs. However, the user 228 # might have addeed a Presubmit-Ready label after we created the CL. In 229 # this case update the |wants_vote| flag and return. 230 logging.debug('check_new_cl(%s-%s)', cl, patchset) 231 vote_prop = await req_async( 232 'GET', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset)) 233 if vote_prop is not None: 234 if vote_prop != wants_vote and wants_vote: 235 logging.info('Updating wants_vote flag on %s-%s', cl, patchset) 236 await req_async( 237 'PUT', '%s/cls/%s-%s/wants_vote.json' % (DB, cl, patchset), body=True) 238 # If the label is applied after we have finished running all the jobs just 239 # jump straight to the voting. 240 await check_pending_cl(cl_and_ps='%s-%s' % (cl, patchset)) 241 logging.debug('check_new_cl(%s-%s): already queued', cl, patchset) 242 return 243 244 # This is the first time we see this patchset, enqueue jobs for it. 245 246 # Dequeue jobs for older patchsets, if any. 247 await cancel_older_jobs(cl=cl, patchset=patchset) 248 249 src = 'cls/%s-%s' % (cl, patchset) 250 # Enqueue jobs for the latest patchset. 251 patch_obj = {} 252 patch_obj['cls_pending/%s-%s' % (cl, patchset)] = 0 253 patch_obj[src] = { 254 'change_id': change_id, 255 'revision_id': rev_hash, 256 'time_queued': utc_now_iso(), 257 'jobs': {}, 258 'wants_vote': wants_vote, 259 } 260 append_jobs(patch_obj, src, ref) 261 logging.debug('check_new_cl(%s-%s): queueing jobs', cl, patchset) 262 await req_async('PATCH', DB + '.json', body=patch_obj) 263 264 265async def cancel_older_jobs(cl: str, patchset: str): 266 first_key = '%s-0' % cl 267 last_key = '%s-z' % cl 268 filt = 'orderBy="$key"&startAt="%s"&endAt="%s"' % (first_key, last_key) 269 cl_objs = await req_async('GET', '%s/cls.json?%s' % (DB, filt)) or {} 270 tasks = [] 271 for cl_and_ps, cl_obj in cl_objs.items(): 272 ps = int(cl_and_ps.split('-')[-1]) 273 if cl_obj.get('time_ended') or ps >= int(patchset): 274 continue 275 logging.info('Cancelling jobs for previous patchset %s', cl_and_ps) 276 for job_id in cl_obj['jobs'].keys(): 277 tasks.append(defer(cancel_job(job_id=job_id))) 278 await asyncio.gather(*tasks) 279 280 281async def check_pending_cls(): 282 # Check if any pending CL has completed (all jobs are done). If so publish 283 # the comment and vote on the CL. 284 pending_cls = await req_async('GET', '%s/cls_pending.json' % DB) or {} 285 tasks = [] 286 for cl_and_ps, _ in pending_cls.items(): 287 tasks.append(defer(check_pending_cl(cl_and_ps=cl_and_ps))) 288 await asyncio.gather(*tasks) 289 290 291async def check_pending_cl(cl_and_ps: str): 292 # This function can be called twice on the same CL, e.g., in the case when the 293 # Presubmit-Ready label is applied after we have finished running all the 294 # jobs (we run presubmit regardless, only the voting is conditioned by PR). 295 cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) 296 all_jobs = cl_obj.get('jobs', {}).keys() 297 pending_jobs = [] 298 interrupted_jobs = [] 299 for job_id in all_jobs: 300 job_status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 301 pending_jobs += [job_id] if job_status in ('QUEUED', 'STARTED') else [] 302 interrupted_jobs += [job_id] if job_status in ('INTERRUPTED') else [] 303 304 # Interrupted jobs are due to VMs being shutdown (usually due to a scale-down) 305 # Automatically re-queue them so they get picked up by some other vm. 306 await asyncio.gather(*[requeue_job(job_id) for job_id in interrupted_jobs]) 307 308 if pending_jobs: 309 # If the CL has been pending for too long cancel all its jobs. Upon the next 310 # scan it will be deleted and optionally voted on. 311 t_queued = parse_iso_time(cl_obj['time_queued']) 312 age_sec = (datetime.utcnow() - t_queued).total_seconds() 313 if age_sec > CL_TIMEOUT_SEC: 314 logging.warning('Canceling %s, it has been pending for too long (%s sec)', 315 cl_and_ps, int(age_sec)) 316 tasks = [defer(cancel_job(job_id)) for job_id in pending_jobs] 317 await asyncio.gather(*tasks) 318 319 if pending_jobs or interrupted_jobs: 320 return 321 logging.info('All jobs completed for CL %s', cl_and_ps) 322 323 # Remove the CL from the pending queue and update end time. 324 patch_obj = { 325 'cls_pending/%s' % cl_and_ps: {}, # = DELETE 326 'cls/%s/time_ended' % cl_and_ps: cl_obj.get('time_ended', utc_now_iso()), 327 } 328 await req_async('PATCH', '%s.json' % DB, body=patch_obj) 329 await update_cl_metrics(src='cls/' + cl_and_ps) 330 tasks = [defer(update_job_metrics(job_id)) for job_id in all_jobs] 331 await asyncio.gather(*tasks) 332 if cl_obj.get('wants_vote'): 333 await comment_and_vote_cl(cl_and_ps=cl_and_ps) 334 335 336async def comment_and_vote_cl(cl_and_ps: str): 337 cl_obj = await req_async('GET', '%s/cls/%s.json' % (DB, cl_and_ps)) 338 339 if cl_obj.get('voted'): 340 logging.error('Already voted on CL %s', cl_and_ps) 341 return 342 343 if not cl_obj['wants_vote'] or not GERRIT_VOTING_ENABLED: 344 logging.info('Skipping voting on CL %s', cl_and_ps) 345 return 346 347 cl_vote = 1 348 passed_jobs = [] 349 failed_jobs = {} 350 ui_links = [] 351 cancelled = False 352 for job_id in cl_obj['jobs'].keys(): 353 job_obj = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id)) 354 job_config = JOB_CONFIGS.get(job_obj['type'], {}) 355 if job_obj['status'] == 'CANCELLED': 356 cancelled = True 357 if '-ui-' in job_id: 358 ui_links.append('https://storage.googleapis.com/%s/%s/ui/index.html' % 359 (GCS_ARTIFACTS, job_id)) 360 ui_links.append( 361 'https://storage.googleapis.com/%s/%s/ui-test-artifacts/index.html' % 362 (GCS_ARTIFACTS, job_id)) 363 if job_obj['status'] == 'COMPLETED': 364 passed_jobs.append(job_id) 365 elif not job_config.get('SKIP_VOTING', False): 366 cl_vote = -1 367 failed_jobs[job_id] = job_obj['status'] 368 369 msg = '' 370 if cancelled: 371 msg += 'Some jobs in this CI run were cancelled. This likely happened ' 372 msg += 'because a new patchset has been uploaded. Skipping vote.\n' 373 log_url = CI_SITE + '/#!/logs' 374 if failed_jobs: 375 msg += 'FAIL:\n' 376 msg += ''.join([ 377 '- %s/%s (%s)\n' % (log_url, job_id, status) 378 for (job_id, status) in failed_jobs.items() 379 ]) 380 if passed_jobs: 381 msg += '#\nPASS:\n' 382 msg += ''.join(['- %s/%s\n' % (log_url, job_id) for job_id in passed_jobs]) 383 if ui_links: 384 msg += '\nArtifacts:\n' + ''.join('- %s\n' % link for link in ui_links) 385 msg += 'CI page for this CL:\n' 386 msg += '- https://ci.perfetto.dev/#!/cls/%s\n' % cl_and_ps.split('-')[0] 387 body = {'labels': {}, 'message': msg} 388 if not cancelled: 389 body['labels']['Code-Review'] = cl_vote 390 logging.info('Posting results for CL %s', cl_and_ps) 391 url = 'https://%s/a/changes/%s/revisions/%s/review' % ( 392 GERRIT_HOST, cl_obj['change_id'], cl_obj['revision_id']) 393 await req_async('POST', url, body=body, gerrit=True) 394 await req_async('PUT', '%s/cls/%s/voted.json' % (DB, cl_and_ps), body=True) 395 396 397async def queue_postsubmit_jobs(branch: str, revision: str = None): 398 '''Creates the jobs entries in the DB for the given branch or revision 399 400 Can be called in two modes: 401 1. ?branch=main: Will retrieve the SHA1 of main and call the one below. 402 2. ?branch=main&rev=deadbeef1234: queues jobs for the given revision. 403 ''' 404 prj = urllib.parse.quote(GERRIT_PROJECT, '') 405 assert (branch) 406 407 if not revision: 408 # Get the commit SHA1 of the head of the branch. 409 url = 'https://%s/a/projects/%s/branches/%s' % (GERRIT_HOST, prj, branch) 410 revision = (await req_async('GET', url, gerrit=True))['revision'] 411 assert (revision) 412 # If the latest entry matches the revision, quit without queueing another 413 # set of jobs for the same CL. This is an optimization to avoid wasting 414 # compute over the weekend to rebuild the same revision every hour. 415 filt = 'orderBy="$key"&limitToLast=1' 416 cl_objs = await req_async('GET', '%s/branches.json?%s' % (DB, filt)) or {} 417 if cl_objs and next(iter(cl_objs.values())).get('rev') == revision: 418 logging.debug('Skipping postsubmits for %s: already run', revision) 419 return 420 await queue_postsubmit_jobs(branch=branch, revision=revision) 421 return 422 423 # Get the committer datetime for the given revision. 424 url = 'https://%s/a/projects/%s/commits/%s' % (GERRIT_HOST, prj, revision) 425 commit_info = await req_async('GET', url, gerrit=True) 426 time_committed = commit_info['committer']['date'].split('.')[0] 427 time_committed = datetime.strptime(time_committed, '%Y-%m-%d %H:%M:%S') 428 429 # Enqueue jobs. 430 src = 'branches/%s-%s' % (branch, time_committed.strftime('%Y%m%d%H%M%S')) 431 now = datetime.utcnow() 432 patch_obj = { 433 src: { 434 'rev': revision, 435 'subject': commit_info['subject'][:100], 436 'author': commit_info['author'].get('email', 'N/A'), 437 'time_committed': utc_now_iso(time_committed), 438 'time_queued': utc_now_iso(), 439 'jobs': {}, 440 } 441 } 442 ref = 'refs/heads/' + branch 443 append_jobs(patch_obj, src, ref, now) 444 await req_async('PATCH', DB + '.json', body=patch_obj) 445 446 447async def delete_expired_logs(ttl_days=LOGS_TTL_DAYS): 448 url = '%s/logs.json?limitToFirst=1000&shallow=true' % (DB) 449 logs = await req_async('GET', url) or {} 450 tasks = [] 451 logging.debug('delete_expired_logs: got %d keys', len(logs.keys())) 452 for job_id in logs.keys(): 453 age_days = (datetime.now() - datetime.strptime(job_id[:8], '%Y%m%d')).days 454 if age_days > ttl_days: 455 logging.debug('Delete log %s', job_id) 456 tasks.append(defer(delete_job_logs(job_id=job_id))) 457 await asyncio.gather(*tasks) 458 459 460async def delete_stale_jobs(): 461 '''Deletes jobs that are left in the running queue for too long 462 463 This is usually due to a crash in the VM that handles them. 464 ''' 465 running_jobs = await req_async('GET', '%s/jobs_running.json?shallow=true' % 466 (DB)) or {} 467 tasks = [] 468 for job_id in running_jobs.keys(): 469 job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id)) 470 time_started = parse_iso_time(job.get('time_started', utc_now_iso())) 471 age = (datetime.now() - time_started).total_seconds() 472 if age > JOB_TIMEOUT_SEC * 2: 473 tasks.append(defer(cancel_job(job_id=job_id))) 474 await asyncio.gather(*tasks) 475 476 477async def delete_stale_workers(): 478 '''Deletes workers that have been inactive for too long 479 480 This is usually due to a crash in the VM that handles them. 481 ''' 482 workers = await req_async('GET', '%s/workers.json' % (DB)) or {} 483 patch_obj = {} 484 for worker_id, worker in workers.items(): 485 last_update = parse_iso_time(worker.get('last_update', utc_now_iso())) 486 age = (datetime.now() - last_update).total_seconds() 487 if age > 60 * 60 * 12: 488 patch_obj['workers/' + worker_id] = {} # DELETE 489 if len(patch_obj) == 0: 490 return 491 logging.info('Purging %d inactive workers', len(patch_obj)) 492 await req_async('PATCH', DB + '.json', body=patch_obj) 493 494 495async def cancel_job(job_id: str): 496 '''Cancels a job if not completed or failed. 497 498 This function is racy: workers can complete the queued jobs while we mark them 499 as cancelled. The result of such race is still acceptable.''' 500 status = await req_async('GET', '%s/jobs/%s/status.json' % (DB, job_id)) 501 patch_obj = { 502 'jobs_running/%s' % job_id: {}, # = DELETE, 503 'jobs_queued/%s' % job_id: {}, # = DELETE, 504 } 505 if status in ('QUEUED', 'STARTED'): 506 patch_obj['jobs/%s/status' % job_id] = 'CANCELLED' 507 patch_obj['jobs/%s/time_ended' % job_id] = utc_now_iso() 508 await req_async('PATCH', DB + '.json', body=patch_obj) 509 510 511async def requeue_job(job_id: str): 512 '''Re-queues a job that was previously interrupted due to a VM shutdown.''' 513 logging.info('Requeuing interrupted job %s', job_id) 514 patch_obj = { 515 'jobs_running/%s' % job_id: {}, # = DELETE, 516 'jobs_queued/%s' % job_id: 0, 517 'jobs/%s/status' % job_id: 'QUEUED', 518 'jobs/%s/time_queued' % job_id: utc_now_iso(), 519 'jobs/%s/time_started' % job_id: {}, # = DELETE 520 'jobs/%s/time_ended' % job_id: {}, # = DELETE 521 'jobs/%s/worker' % job_id: {}, # = DELETE 522 } 523 await req_async('PATCH', DB + '.json', body=patch_obj) 524 525 526async def delete_job_logs(job_id: str): 527 await req_async('DELETE', 528 '%s/logs/%s.json?writeSizeLimit=unlimited' % (DB, job_id)) 529 530 531async def update_cl_metrics(src: str): 532 cl_obj = await req_async('GET', '%s/%s.json' % (DB, src)) 533 t_queued = parse_iso_time(cl_obj['time_queued']) 534 t_ended = parse_iso_time(cl_obj['time_ended']) 535 await write_metrics({ 536 'ci_cl_completion_time': { 537 'l': {}, 538 'v': int((t_ended - t_queued).total_seconds()) 539 } 540 }) 541 542 543async def update_job_metrics(job_id: str): 544 job = await req_async('GET', '%s/jobs/%s.json' % (DB, job_id)) 545 metrics = {} 546 if 'time_queued' in job and 'time_started' in job: 547 t_queued = parse_iso_time(job['time_queued']) 548 t_started = parse_iso_time(job['time_started']) 549 metrics['ci_job_queue_time'] = { 550 'l': { 551 'job_type': job['type'] 552 }, 553 'v': int((t_started - t_queued).total_seconds()) 554 } 555 if 'time_ended' in job and 'time_started' in job: 556 t_started = parse_iso_time(job['time_started']) 557 t_ended = parse_iso_time(job['time_ended']) 558 metrics['ci_job_run_time'] = { 559 'l': { 560 'job_type': job['type'] 561 }, 562 'v': int((t_ended - t_started).total_seconds()) 563 } 564 if metrics: 565 await write_metrics(metrics) 566 567 568async def update_queue_metrics(): 569 # Update the stackdriver metric that will drive the autoscaler. 570 queued = await req_async('GET', DB + '/jobs_queued.json?shallow=true') or {} 571 running = await req_async('GET', DB + '/jobs_running.json?shallow=true') or {} 572 logging.debug('ci_job_queue_len: %d + %d', len(queued), len(running)) 573 await write_metrics({'ci_job_queue_len': {'v': len(queued) + len(running)}}) 574 575 576async def create_stackdriver_metric_definitions(): 577 logging.info('Creating Stackdriver metric definitions') 578 for name, metric in STACKDRIVER_METRICS.items(): 579 logging.info('Creating metric %s', name) 580 await req_async('POST', STACKDRIVER_API + '/metricDescriptors', body=metric) 581 582 583async def write_metrics(metric_dict): 584 now = utc_now_iso() 585 desc = {'timeSeries': []} 586 for key, spec in metric_dict.items(): 587 desc['timeSeries'] += [{ 588 'metric': { 589 'type': STACKDRIVER_METRICS[key]['type'], 590 'labels': spec.get('l', {}) 591 }, 592 'resource': { 593 'type': 'global' 594 }, 595 'points': [{ 596 'interval': { 597 'endTime': now 598 }, 599 'value': { 600 'int64Value': str(spec['v']) 601 } 602 }] 603 }] 604 try: 605 await req_async('POST', STACKDRIVER_API + '/timeSeries', body=desc) 606 except Exception as e: 607 # Metric updates can easily fail due to Stackdriver API limitations. 608 msg = str(e) 609 if 'written more frequently than the maximum sampling' not in msg: 610 logging.error('Metrics update failed: %s', msg) 611