1#!/usr/bin/env python3 2# Copyright © 2020 - 2022 Collabora Ltd. 3# Authors: 4# Tomeu Vizoso <[email protected]> 5# David Heidelberg <[email protected]> 6# 7# For the dependencies, see the requirements.txt 8# SPDX-License-Identifier: MIT 9 10""" 11Helper script to restrict running only required CI jobs 12and show the job(s) logs. 13""" 14 15import argparse 16import re 17import sys 18import time 19from collections import defaultdict 20from concurrent.futures import ThreadPoolExecutor 21from functools import partial 22from itertools import chain 23from subprocess import check_output, CalledProcessError 24from typing import Dict, TYPE_CHECKING, Iterable, Literal, Optional, Tuple 25 26import gitlab 27import gitlab.v4.objects 28from colorama import Fore, Style 29from gitlab_common import ( 30 GITLAB_URL, 31 TOKEN_DIR, 32 get_gitlab_pipeline_from_url, 33 get_gitlab_project, 34 get_token_from_default_dir, 35 pretty_duration, 36 print_once, 37 read_token, 38 wait_for_pipeline, 39) 40from gitlab_gql import GitlabGQL, create_job_needs_dag, filter_dag, print_dag 41 42if TYPE_CHECKING: 43 from gitlab_gql import Dag 44 45REFRESH_WAIT_LOG = 10 46REFRESH_WAIT_JOBS = 6 47 48URL_START = "\033]8;;" 49URL_END = "\033]8;;\a" 50 51STATUS_COLORS = { 52 "created": "", 53 "running": Fore.BLUE, 54 "success": Fore.GREEN, 55 "failed": Fore.RED, 56 "canceled": Fore.MAGENTA, 57 "canceling": Fore.MAGENTA, 58 "manual": "", 59 "pending": "", 60 "skipped": "", 61} 62 63COMPLETED_STATUSES = frozenset({"success", "failed"}) 64RUNNING_STATUSES = frozenset({"created", "pending", "running"}) 65 66 67def print_job_status( 68 job: gitlab.v4.objects.ProjectPipelineJob, 69 new_status: bool = False, 70 job_name_field_pad: int = 0, 71) -> None: 72 """It prints a nice, colored job status with a link to the job.""" 73 if job.status in {"canceled", "canceling"}: 74 return 75 76 if new_status and job.status == "created": 77 return 78 79 job_name_field_pad = len(job.name) if job_name_field_pad < 1 else job_name_field_pad 80 81 duration = job_duration(job) 82 83 print_once( 84 STATUS_COLORS[job.status] 85 + " job " # U+1F78B Round target 86 + link2print(job.web_url, job.name, job_name_field_pad) 87 + (f"has new status: {job.status}" if new_status else f"{job.status}") 88 + (f" ({pretty_duration(duration)})" if job.started_at else "") 89 + Style.RESET_ALL 90 ) 91 92 93def job_duration(job: gitlab.v4.objects.ProjectPipelineJob) -> float: 94 """ 95 Given a job, report the time lapsed in execution. 96 :param job: Pipeline job 97 :return: Current time in execution 98 """ 99 if job.duration: 100 return job.duration 101 elif job.started_at: 102 return time.perf_counter() - time.mktime(job.started_at.timetuple()) 103 return 0.0 104 105 106def pretty_wait(sec: int) -> None: 107 """shows progressbar in dots""" 108 for val in range(sec, 0, -1): 109 print(f"⏲ {val} seconds", end="\r") # U+23F2 Timer clock 110 time.sleep(1) 111 112 113def monitor_pipeline( 114 project: gitlab.v4.objects.Project, 115 pipeline: gitlab.v4.objects.ProjectPipeline, 116 target_jobs_regex: re.Pattern, 117 include_stage_regex: re.Pattern, 118 exclude_stage_regex: re.Pattern, 119 dependencies: set[str], 120 stress: int, 121) -> tuple[Optional[int], Optional[int], Dict[str, Dict[int, Tuple[float, str, str]]]]: 122 """Monitors pipeline and delegate canceling jobs""" 123 statuses: dict[str, str] = defaultdict(str) 124 target_statuses: dict[str, str] = defaultdict(str) 125 stress_status_counter: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) 126 execution_times = defaultdict(lambda: defaultdict(tuple)) 127 target_id: int = -1 128 name_field_pad: int = len(max(dependencies, key=len))+2 129 # In a running pipeline, we can skip following job traces that are in these statuses. 130 skip_follow_statuses: frozenset[str] = (COMPLETED_STATUSES) 131 132 # Pre-populate the stress status counter for already completed target jobs. 133 if stress: 134 # When stress test, it is necessary to collect this information before start. 135 for job in pipeline.jobs.list(all=True, include_retried=True): 136 if target_jobs_regex.fullmatch(job.name) and \ 137 include_stage_regex.fullmatch(job.stage) and \ 138 not exclude_stage_regex.fullmatch(job.stage) and \ 139 job.status in COMPLETED_STATUSES: 140 stress_status_counter[job.name][job.status] += 1 141 execution_times[job.name][job.id] = (job_duration(job), job.status, job.web_url) 142 143 # jobs_waiting is a list of job names that are waiting for status update. 144 # It occurs when a job that we want to run depends on another job that is not yet finished. 145 jobs_waiting = [] 146 # FIXME: This function has too many parameters, consider refactoring. 147 enable_job_fn = partial( 148 enable_job, 149 project=project, 150 pipeline=pipeline, 151 job_name_field_pad=name_field_pad, 152 jobs_waiting=jobs_waiting, 153 ) 154 while True: 155 deps_failed = [] 156 to_cancel = [] 157 jobs_waiting.clear() 158 for job in sorted(pipeline.jobs.list(all=True), key=lambda j: j.name): 159 if target_jobs_regex.fullmatch(job.name) and \ 160 include_stage_regex.fullmatch(job.stage) and \ 161 not exclude_stage_regex.fullmatch(job.stage): 162 target_id = job.id 163 target_status = job.status 164 165 if stress and target_status in COMPLETED_STATUSES: 166 if ( 167 stress < 0 168 or sum(stress_status_counter[job.name].values()) < stress 169 ): 170 stress_status_counter[job.name][target_status] += 1 171 execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url) 172 job = enable_job_fn(job=job, action_type="retry") 173 else: 174 execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url) 175 job = enable_job_fn(job=job, action_type="target") 176 177 print_job_status(job, target_status not in target_statuses[job.name], name_field_pad) 178 target_statuses[job.name] = target_status 179 continue 180 181 # all other non-target jobs 182 if job.status != statuses[job.name]: 183 print_job_status(job, True, name_field_pad) 184 statuses[job.name] = job.status 185 186 # run dependencies and cancel the rest 187 if job.name in dependencies: 188 job = enable_job_fn(job=job, action_type="dep") 189 if job.status == "failed": 190 deps_failed.append(job.name) 191 else: 192 to_cancel.append(job) 193 194 cancel_jobs(project, to_cancel) 195 196 if stress: 197 enough = True 198 for job_name, status in sorted(stress_status_counter.items()): 199 print( 200 f"* {job_name:{name_field_pad}}succ: {status['success']}; " 201 f"fail: {status['failed']}; " 202 f"total: {sum(status.values())} of {stress}", 203 flush=False, 204 ) 205 if stress < 0 or sum(status.values()) < stress: 206 enough = False 207 208 if not enough: 209 pretty_wait(REFRESH_WAIT_JOBS) 210 continue 211 212 if jobs_waiting: 213 print_once( 214 f"{Fore.YELLOW}Waiting for jobs to update status:", 215 ", ".join(jobs_waiting), 216 Fore.RESET, 217 ) 218 pretty_wait(REFRESH_WAIT_JOBS) 219 continue 220 221 if len(target_statuses) == 1 and RUNNING_STATUSES.intersection( 222 target_statuses.values() 223 ): 224 return target_id, None, execution_times 225 226 if ( 227 {"failed"}.intersection(target_statuses.values()) 228 and not RUNNING_STATUSES.intersection(target_statuses.values()) 229 ): 230 return None, 1, execution_times 231 232 if ( 233 {"skipped"}.intersection(target_statuses.values()) 234 and not RUNNING_STATUSES.intersection(target_statuses.values()) 235 ): 236 print( 237 Fore.RED, 238 "Target in skipped state, aborting. Failed dependencies:", 239 deps_failed, 240 Fore.RESET, 241 ) 242 return None, 1, execution_times 243 244 if skip_follow_statuses.issuperset(target_statuses.values()): 245 return None, 0, execution_times 246 247 pretty_wait(REFRESH_WAIT_JOBS) 248 249 250def get_pipeline_job( 251 pipeline: gitlab.v4.objects.ProjectPipeline, 252 job_id: int, 253) -> gitlab.v4.objects.ProjectPipelineJob: 254 pipeline_jobs = pipeline.jobs.list(all=True) 255 return [j for j in pipeline_jobs if j.id == job_id][0] 256 257 258def enable_job( 259 project: gitlab.v4.objects.Project, 260 pipeline: gitlab.v4.objects.ProjectPipeline, 261 job: gitlab.v4.objects.ProjectPipelineJob, 262 action_type: Literal["target", "dep", "retry"], 263 job_name_field_pad: int = 0, 264 jobs_waiting: list[str] = [], 265) -> gitlab.v4.objects.ProjectPipelineJob: 266 # We want to run this job, but it is not ready to run yet, so let's try again in the next 267 # iteration. 268 if job.status == "created": 269 jobs_waiting.append(job.name) 270 return job 271 272 if ( 273 (job.status in COMPLETED_STATUSES and action_type != "retry") 274 or job.status in {"skipped"} | RUNNING_STATUSES 275 ): 276 return job 277 278 pjob = project.jobs.get(job.id, lazy=True) 279 280 if job.status in {"success", "failed", "canceled", "canceling"}: 281 new_job = pjob.retry() 282 job = get_pipeline_job(pipeline, new_job["id"]) 283 else: 284 pjob.play() 285 job = get_pipeline_job(pipeline, pjob.id) 286 287 if action_type == "target": 288 jtype = " target" # U+1F78B Round target 289 elif action_type == "retry": 290 jtype = "↻ retrying" # U+21BB Clockwise open circle arrow 291 else: 292 jtype = "↪ dependency" # U+21AA Left Arrow Curving Right 293 294 job_name_field_pad = len(job.name) if job_name_field_pad < 1 else job_name_field_pad 295 print(Fore.MAGENTA + f"{jtype} job {job.name:{job_name_field_pad}}manually enabled" + Style.RESET_ALL) 296 297 return job 298 299 300def cancel_job( 301 project: gitlab.v4.objects.Project, 302 job: gitlab.v4.objects.ProjectPipelineJob 303) -> None: 304 """Cancel GitLab job""" 305 if job.status not in RUNNING_STATUSES: 306 return 307 pjob = project.jobs.get(job.id, lazy=True) 308 pjob.cancel() 309 print(f" {job.name}", end=" ") # U+1F5D9 Cancellation X 310 311 312def cancel_jobs( 313 project: gitlab.v4.objects.Project, 314 to_cancel: list 315) -> None: 316 """Cancel unwanted GitLab jobs""" 317 if not to_cancel: 318 return 319 320 with ThreadPoolExecutor(max_workers=6) as exe: 321 part = partial(cancel_job, project) 322 exe.map(part, to_cancel) 323 324 # The cancelled jobs are printed without a newline 325 print_once() 326 327 328def print_log( 329 project: gitlab.v4.objects.Project, 330 job_id: int 331) -> None: 332 """Print job log into output""" 333 printed_lines = 0 334 while True: 335 job = project.jobs.get(job_id) 336 337 # GitLab's REST API doesn't offer pagination for logs, so we have to refetch it all 338 lines = job.trace().decode().splitlines() 339 for line in lines[printed_lines:]: 340 print(line) 341 printed_lines = len(lines) 342 343 if job.status in COMPLETED_STATUSES: 344 print(Fore.GREEN + f"Job finished: {job.web_url}" + Style.RESET_ALL) 345 return 346 pretty_wait(REFRESH_WAIT_LOG) 347 348 349def parse_args() -> argparse.Namespace: 350 """Parse args""" 351 parser = argparse.ArgumentParser( 352 description="Tool to trigger a subset of container jobs " 353 + "and monitor the progress of a test job", 354 epilog="Example: mesa-monitor.py --rev $(git rev-parse HEAD) " 355 + '--target ".*traces" ', 356 ) 357 parser.add_argument( 358 "--target", 359 metavar="target-job", 360 help="Target job regex. For multiple targets, pass multiple values, " 361 "eg. `--target foo bar`. Only jobs in the target stage(s) " 362 "supplied, and their dependencies, will be considered.", 363 required=True, 364 nargs=argparse.ONE_OR_MORE, 365 ) 366 parser.add_argument( 367 "--include-stage", 368 metavar="include-stage", 369 help="Job stages to include when searching for target jobs. " 370 "For multiple targets, pass multiple values, eg. " 371 "`--include-stage foo bar`.", 372 default=[".*"], 373 nargs=argparse.ONE_OR_MORE, 374 ) 375 parser.add_argument( 376 "--exclude-stage", 377 metavar="exclude-stage", 378 help="Job stages to exclude when searching for target jobs. " 379 "For multiple targets, pass multiple values, eg. " 380 "`--exclude-stage foo bar`. By default, performance and " 381 "post-merge jobs are excluded; pass --exclude-stage '' to " 382 "include them for consideration.", 383 default=["performance", ".*-postmerge"], 384 nargs=argparse.ONE_OR_MORE, 385 ) 386 parser.add_argument( 387 "--token", 388 metavar="token", 389 type=str, 390 default=get_token_from_default_dir(), 391 help="Use the provided GitLab token or token file, " 392 f"otherwise it's read from {TOKEN_DIR / 'gitlab-token'}", 393 ) 394 parser.add_argument( 395 "--force-manual", action="store_true", 396 help="Deprecated argument; manual jobs are always force-enabled" 397 ) 398 parser.add_argument( 399 "--stress", 400 default=0, 401 type=int, 402 help="Stresstest job(s). Specify the number of times to rerun the selected jobs, " 403 "or use -1 for indefinite. Defaults to 0. If jobs have already been executed, " 404 "this will ensure the total run count respects the specified number.", 405 ) 406 parser.add_argument( 407 "--project", 408 default="mesa", 409 help="GitLab project in the format <user>/<project> or just <project>", 410 ) 411 parser.add_argument( 412 "--dry-run", 413 action="store_true", 414 help="Exit after printing target jobs and dependencies", 415 ) 416 417 mutex_group1 = parser.add_mutually_exclusive_group() 418 mutex_group1.add_argument( 419 "--rev", default="HEAD", metavar="revision", help="repository git revision (default: HEAD)" 420 ) 421 mutex_group1.add_argument( 422 "--pipeline-url", 423 help="URL of the pipeline to use, instead of auto-detecting it.", 424 ) 425 mutex_group1.add_argument( 426 "--mr", 427 type=int, 428 help="ID of a merge request; the latest pipeline in that MR will be used.", 429 ) 430 431 args = parser.parse_args() 432 433 # argparse doesn't support groups inside add_mutually_exclusive_group(), 434 # which means we can't just put `--project` and `--rev` in a group together, 435 # we have to do this by heand instead. 436 if args.pipeline_url and args.project != parser.get_default("project"): 437 # weird phrasing but it's the error add_mutually_exclusive_group() gives 438 parser.error("argument --project: not allowed with argument --pipeline-url") 439 440 return args 441 442 443def print_detected_jobs( 444 target_dep_dag: "Dag", 445 dependency_jobs: Iterable[str], 446 target_jobs: Iterable[str], 447) -> None: 448 def print_job_set(color: str, kind: str, job_set: Iterable[str]): 449 print( 450 color + f"Running {len(job_set)} {kind} jobs: ", 451 "\n\t", 452 ", ".join(sorted(job_set)), 453 Fore.RESET, 454 "\n", 455 ) 456 457 print(Fore.YELLOW + "Detected target job and its dependencies:", "\n") 458 print_dag(target_dep_dag) 459 print_job_set(Fore.MAGENTA, "dependency", dependency_jobs) 460 print_job_set(Fore.BLUE, "target", target_jobs) 461 462 463def find_dependencies( 464 token: str | None, 465 target_jobs_regex: re.Pattern, 466 include_stage_regex: re.Pattern, 467 exclude_stage_regex: re.Pattern, 468 project_path: str, 469 iid: int 470) -> set[str]: 471 """ 472 Find the dependencies of the target jobs in a GitLab pipeline. 473 474 This function uses the GitLab GraphQL API to fetch the job dependency graph 475 of a pipeline, filters the graph to only include the target jobs and their 476 dependencies, and returns the names of these jobs. 477 478 Args: 479 token (str | None): The GitLab API token. If None, the API is accessed without 480 authentication. 481 target_jobs_regex (re.Pattern): A regex pattern to match the names of the target jobs. 482 project_path (str): The path of the GitLab project. 483 iid (int): The internal ID of the pipeline. 484 485 Returns: 486 set[str]: A set of the names of the target jobs and their dependencies. 487 488 Raises: 489 SystemExit: If no target jobs are found in the pipeline. 490 """ 491 gql_instance = GitlabGQL(token=token) 492 dag = create_job_needs_dag( 493 gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid} 494 ) 495 496 target_dep_dag = filter_dag(dag, target_jobs_regex, include_stage_regex, exclude_stage_regex) 497 if not target_dep_dag: 498 print(Fore.RED + "The job(s) were not found in the pipeline." + Fore.RESET) 499 sys.exit(1) 500 501 dependency_jobs = set(chain.from_iterable(d["needs"] for d in target_dep_dag.values())) 502 target_jobs = set(target_dep_dag.keys()) 503 print_detected_jobs(target_dep_dag, dependency_jobs, target_jobs) 504 return target_jobs.union(dependency_jobs) 505 506 507def print_monitor_summary( 508 execution_collection: Dict[str, Dict[int, Tuple[float, str, str]]], 509 t_start: float, 510) -> None: 511 """Summary of the test execution""" 512 t_end = time.perf_counter() 513 spend_minutes = (t_end - t_start) / 60 514 print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes") # U+23F2 Timer clock 515 if len(execution_collection) == 0: 516 return 517 print(f"⏲ Jobs execution times:") # U+23F2 Timer clock 518 job_names = list(execution_collection.keys()) 519 job_names.sort() 520 name_field_pad = len(max(job_names, key=len)) + 2 521 for name in job_names: 522 job_executions = execution_collection[name] 523 job_times = ', '.join([__job_duration_record(job_execution) 524 for job_execution in sorted(job_executions.items())]) 525 print(f"* {name:{name_field_pad}}: ({len(job_executions)}) {job_times}") 526 527 528def __job_duration_record(dict_item: tuple) -> str: 529 """ 530 Format each pair of job and its duration. 531 :param job_execution: item of execution_collection[name][idn]: Dict[int, Tuple[float, str, str]] 532 """ 533 job_id = f"{dict_item[0]}" # dictionary key 534 job_duration, job_status, job_url = dict_item[1] # dictionary value, the tuple 535 return (f"{STATUS_COLORS[job_status]}" 536 f"{link2print(job_url, job_id)}: {pretty_duration(job_duration):>8}" 537 f"{Style.RESET_ALL}") 538 539 540def link2print(url: str, text: str, text_pad: int = 0) -> str: 541 text_pad = len(text) if text_pad < 1 else text_pad 542 return f"{URL_START}{url}\a{text:{text_pad}}{URL_END}" 543 544 545def main() -> None: 546 try: 547 t_start = time.perf_counter() 548 549 args = parse_args() 550 551 token = read_token(args.token) 552 553 gl = gitlab.Gitlab(url=GITLAB_URL, 554 private_token=token, 555 retry_transient_errors=True) 556 557 REV: str = args.rev 558 559 if args.pipeline_url: 560 pipe, cur_project = get_gitlab_pipeline_from_url(gl, args.pipeline_url) 561 REV = pipe.sha 562 else: 563 mesa_project = gl.projects.get("mesa/mesa") 564 projects = [mesa_project] 565 if args.mr: 566 REV = mesa_project.mergerequests.get(args.mr).sha 567 else: 568 REV = check_output(['git', 'rev-parse', REV]).decode('ascii').strip() 569 570 if args.rev == 'HEAD': 571 try: 572 branch_name = check_output([ 573 'git', 'symbolic-ref', '-q', 'HEAD', 574 ]).decode('ascii').strip() 575 except CalledProcessError: 576 branch_name = "" 577 578 # Ignore detached heads 579 if branch_name: 580 tracked_remote = check_output([ 581 'git', 'for-each-ref', '--format=%(upstream)', 582 branch_name, 583 ]).decode('ascii').strip() 584 585 # Ignore local branches that do not track any remote 586 if tracked_remote: 587 remote_rev = check_output([ 588 'git', 'rev-parse', tracked_remote, 589 ]).decode('ascii').strip() 590 591 if REV != remote_rev: 592 print( 593 f"Local HEAD commit {REV[:10]} is different than " 594 f"tracked remote HEAD commit {remote_rev[:10]}" 595 ) 596 print("Did you forget to `git push` ?") 597 598 projects.append(get_gitlab_project(gl, args.project)) 599 (pipe, cur_project) = wait_for_pipeline(projects, REV) 600 601 print(f"Revision: {REV}") 602 print(f"Pipeline: {pipe.web_url}") 603 604 target = '|'.join(args.target) 605 target = target.strip() 606 607 print(" target job: " + Fore.BLUE + target + Style.RESET_ALL) # U+1F78B Round target 608 609 # Implicitly include `parallel:` jobs 610 target = f'({target})' + r'( \d+/\d+)?' 611 612 target_jobs_regex = re.compile(target) 613 614 include_stage = '|'.join(args.include_stage) 615 include_stage = include_stage.strip() 616 617 print(" target from stages: " + Fore.BLUE + include_stage + Style.RESET_ALL) # U+1F78B Round target 618 619 include_stage_regex = re.compile(include_stage) 620 621 exclude_stage = '|'.join(args.exclude_stage) 622 exclude_stage = exclude_stage.strip() 623 624 print(" target excluding stages: " + Fore.BLUE + exclude_stage + Style.RESET_ALL) # U+1F78B Round target 625 626 exclude_stage_regex = re.compile(exclude_stage) 627 628 deps = find_dependencies( 629 token=token, 630 target_jobs_regex=target_jobs_regex, 631 include_stage_regex=include_stage_regex, 632 exclude_stage_regex=exclude_stage_regex, 633 iid=pipe.iid, 634 project_path=cur_project 635 ) 636 637 if args.dry_run: 638 sys.exit(0) 639 640 target_job_id, ret, exec_t = monitor_pipeline( 641 cur_project, 642 pipe, 643 target_jobs_regex, 644 include_stage_regex, 645 exclude_stage_regex, 646 deps, 647 args.stress 648 ) 649 650 if target_job_id: 651 print_log(cur_project, target_job_id) 652 653 print_monitor_summary(exec_t, t_start) 654 655 sys.exit(ret) 656 except KeyboardInterrupt: 657 sys.exit(1) 658 659 660if __name__ == "__main__": 661 main() 662