xref: /aosp_15_r20/external/mesa3d/bin/ci/ci_run_n_monitor.py (revision 6104692788411f58d303aa86923a9ff6ecaded22)
1#!/usr/bin/env python3
2# Copyright © 2020 - 2022 Collabora Ltd.
3# Authors:
4#   Tomeu Vizoso <[email protected]>
5#   David Heidelberg <[email protected]>
6#
7# For the dependencies, see the requirements.txt
8# SPDX-License-Identifier: MIT
9
10"""
11Helper script to restrict running only required CI jobs
12and show the job(s) logs.
13"""
14
15import argparse
16import re
17import sys
18import time
19from collections import defaultdict
20from concurrent.futures import ThreadPoolExecutor
21from functools import partial
22from itertools import chain
23from subprocess import check_output, CalledProcessError
24from typing import Dict, TYPE_CHECKING, Iterable, Literal, Optional, Tuple
25
26import gitlab
27import gitlab.v4.objects
28from colorama import Fore, Style
29from gitlab_common import (
30    GITLAB_URL,
31    TOKEN_DIR,
32    get_gitlab_pipeline_from_url,
33    get_gitlab_project,
34    get_token_from_default_dir,
35    pretty_duration,
36    print_once,
37    read_token,
38    wait_for_pipeline,
39)
40from gitlab_gql import GitlabGQL, create_job_needs_dag, filter_dag, print_dag
41
42if TYPE_CHECKING:
43    from gitlab_gql import Dag
44
45REFRESH_WAIT_LOG = 10
46REFRESH_WAIT_JOBS = 6
47
48URL_START = "\033]8;;"
49URL_END = "\033]8;;\a"
50
51STATUS_COLORS = {
52    "created": "",
53    "running": Fore.BLUE,
54    "success": Fore.GREEN,
55    "failed": Fore.RED,
56    "canceled": Fore.MAGENTA,
57    "canceling": Fore.MAGENTA,
58    "manual": "",
59    "pending": "",
60    "skipped": "",
61}
62
63COMPLETED_STATUSES = frozenset({"success", "failed"})
64RUNNING_STATUSES = frozenset({"created", "pending", "running"})
65
66
67def print_job_status(
68    job: gitlab.v4.objects.ProjectPipelineJob,
69    new_status: bool = False,
70    job_name_field_pad: int = 0,
71) -> None:
72    """It prints a nice, colored job status with a link to the job."""
73    if job.status in {"canceled", "canceling"}:
74        return
75
76    if new_status and job.status == "created":
77        return
78
79    job_name_field_pad = len(job.name) if job_name_field_pad < 1 else job_name_field_pad
80
81    duration = job_duration(job)
82
83    print_once(
84        STATUS_COLORS[job.status]
85        + "�� job "  # U+1F78B Round target
86        + link2print(job.web_url, job.name, job_name_field_pad)
87        + (f"has new status: {job.status}" if new_status else f"{job.status}")
88        + (f" ({pretty_duration(duration)})" if job.started_at else "")
89        + Style.RESET_ALL
90    )
91
92
93def job_duration(job: gitlab.v4.objects.ProjectPipelineJob) -> float:
94    """
95    Given a job, report the time lapsed in execution.
96    :param job: Pipeline job
97    :return: Current time in execution
98    """
99    if job.duration:
100        return job.duration
101    elif job.started_at:
102        return time.perf_counter() - time.mktime(job.started_at.timetuple())
103    return 0.0
104
105
106def pretty_wait(sec: int) -> None:
107    """shows progressbar in dots"""
108    for val in range(sec, 0, -1):
109        print(f"⏲  {val} seconds", end="\r")  # U+23F2 Timer clock
110        time.sleep(1)
111
112
113def monitor_pipeline(
114    project: gitlab.v4.objects.Project,
115    pipeline: gitlab.v4.objects.ProjectPipeline,
116    target_jobs_regex: re.Pattern,
117    include_stage_regex: re.Pattern,
118    exclude_stage_regex: re.Pattern,
119    dependencies: set[str],
120    stress: int,
121) -> tuple[Optional[int], Optional[int], Dict[str, Dict[int, Tuple[float, str, str]]]]:
122    """Monitors pipeline and delegate canceling jobs"""
123    statuses: dict[str, str] = defaultdict(str)
124    target_statuses: dict[str, str] = defaultdict(str)
125    stress_status_counter: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
126    execution_times = defaultdict(lambda: defaultdict(tuple))
127    target_id: int = -1
128    name_field_pad: int = len(max(dependencies, key=len))+2
129    # In a running pipeline, we can skip following job traces that are in these statuses.
130    skip_follow_statuses: frozenset[str] = (COMPLETED_STATUSES)
131
132    # Pre-populate the stress status counter for already completed target jobs.
133    if stress:
134        # When stress test, it is necessary to collect this information before start.
135        for job in pipeline.jobs.list(all=True, include_retried=True):
136            if target_jobs_regex.fullmatch(job.name) and \
137               include_stage_regex.fullmatch(job.stage) and \
138               not exclude_stage_regex.fullmatch(job.stage) and \
139               job.status in COMPLETED_STATUSES:
140                stress_status_counter[job.name][job.status] += 1
141                execution_times[job.name][job.id] = (job_duration(job), job.status, job.web_url)
142
143    # jobs_waiting is a list of job names that are waiting for status update.
144    # It occurs when a job that we want to run depends on another job that is not yet finished.
145    jobs_waiting = []
146    # FIXME: This function has too many parameters, consider refactoring.
147    enable_job_fn = partial(
148        enable_job,
149        project=project,
150        pipeline=pipeline,
151        job_name_field_pad=name_field_pad,
152        jobs_waiting=jobs_waiting,
153    )
154    while True:
155        deps_failed = []
156        to_cancel = []
157        jobs_waiting.clear()
158        for job in sorted(pipeline.jobs.list(all=True), key=lambda j: j.name):
159            if target_jobs_regex.fullmatch(job.name) and \
160               include_stage_regex.fullmatch(job.stage) and \
161               not exclude_stage_regex.fullmatch(job.stage):
162                target_id = job.id
163                target_status = job.status
164
165                if stress and target_status in COMPLETED_STATUSES:
166                    if (
167                        stress < 0
168                        or sum(stress_status_counter[job.name].values()) < stress
169                    ):
170                        stress_status_counter[job.name][target_status] += 1
171                        execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url)
172                        job = enable_job_fn(job=job, action_type="retry")
173                else:
174                    execution_times[job.name][job.id] = (job_duration(job), target_status, job.web_url)
175                    job = enable_job_fn(job=job, action_type="target")
176
177                print_job_status(job, target_status not in target_statuses[job.name], name_field_pad)
178                target_statuses[job.name] = target_status
179                continue
180
181            # all other non-target jobs
182            if job.status != statuses[job.name]:
183                print_job_status(job, True, name_field_pad)
184                statuses[job.name] = job.status
185
186            # run dependencies and cancel the rest
187            if job.name in dependencies:
188                job = enable_job_fn(job=job, action_type="dep")
189                if job.status == "failed":
190                    deps_failed.append(job.name)
191            else:
192                to_cancel.append(job)
193
194        cancel_jobs(project, to_cancel)
195
196        if stress:
197            enough = True
198            for job_name, status in sorted(stress_status_counter.items()):
199                print(
200                    f"* {job_name:{name_field_pad}}succ: {status['success']}; "
201                    f"fail: {status['failed']}; "
202                    f"total: {sum(status.values())} of {stress}",
203                    flush=False,
204                )
205                if stress < 0 or sum(status.values()) < stress:
206                    enough = False
207
208            if not enough:
209                pretty_wait(REFRESH_WAIT_JOBS)
210                continue
211
212        if jobs_waiting:
213            print_once(
214                f"{Fore.YELLOW}Waiting for jobs to update status:",
215                ", ".join(jobs_waiting),
216                Fore.RESET,
217            )
218            pretty_wait(REFRESH_WAIT_JOBS)
219            continue
220
221        if len(target_statuses) == 1 and RUNNING_STATUSES.intersection(
222            target_statuses.values()
223        ):
224            return target_id, None, execution_times
225
226        if (
227            {"failed"}.intersection(target_statuses.values())
228            and not RUNNING_STATUSES.intersection(target_statuses.values())
229        ):
230            return None, 1, execution_times
231
232        if (
233            {"skipped"}.intersection(target_statuses.values())
234            and not RUNNING_STATUSES.intersection(target_statuses.values())
235        ):
236            print(
237                Fore.RED,
238                "Target in skipped state, aborting. Failed dependencies:",
239                deps_failed,
240                Fore.RESET,
241            )
242            return None, 1, execution_times
243
244        if skip_follow_statuses.issuperset(target_statuses.values()):
245            return None, 0, execution_times
246
247        pretty_wait(REFRESH_WAIT_JOBS)
248
249
250def get_pipeline_job(
251    pipeline: gitlab.v4.objects.ProjectPipeline,
252    job_id: int,
253) -> gitlab.v4.objects.ProjectPipelineJob:
254    pipeline_jobs = pipeline.jobs.list(all=True)
255    return [j for j in pipeline_jobs if j.id == job_id][0]
256
257
258def enable_job(
259    project: gitlab.v4.objects.Project,
260    pipeline: gitlab.v4.objects.ProjectPipeline,
261    job: gitlab.v4.objects.ProjectPipelineJob,
262    action_type: Literal["target", "dep", "retry"],
263    job_name_field_pad: int = 0,
264    jobs_waiting: list[str] = [],
265) -> gitlab.v4.objects.ProjectPipelineJob:
266    # We want to run this job, but it is not ready to run yet, so let's try again in the next
267    # iteration.
268    if job.status == "created":
269        jobs_waiting.append(job.name)
270        return job
271
272    if (
273        (job.status in COMPLETED_STATUSES and action_type != "retry")
274        or job.status in {"skipped"} | RUNNING_STATUSES
275    ):
276        return job
277
278    pjob = project.jobs.get(job.id, lazy=True)
279
280    if job.status in {"success", "failed", "canceled", "canceling"}:
281        new_job = pjob.retry()
282        job = get_pipeline_job(pipeline, new_job["id"])
283    else:
284        pjob.play()
285        job = get_pipeline_job(pipeline, pjob.id)
286
287    if action_type == "target":
288        jtype = "�� target"  # U+1F78B Round target
289    elif action_type == "retry":
290        jtype = "↻ retrying"  # U+21BB Clockwise open circle arrow
291    else:
292        jtype = "↪ dependency"  # U+21AA Left Arrow Curving Right
293
294    job_name_field_pad = len(job.name) if job_name_field_pad < 1 else job_name_field_pad
295    print(Fore.MAGENTA + f"{jtype} job {job.name:{job_name_field_pad}}manually enabled" + Style.RESET_ALL)
296
297    return job
298
299
300def cancel_job(
301    project: gitlab.v4.objects.Project,
302    job: gitlab.v4.objects.ProjectPipelineJob
303) -> None:
304    """Cancel GitLab job"""
305    if job.status not in RUNNING_STATUSES:
306        return
307    pjob = project.jobs.get(job.id, lazy=True)
308    pjob.cancel()
309    print(f"�� {job.name}", end=" ")  # U+1F5D9 Cancellation X
310
311
312def cancel_jobs(
313    project: gitlab.v4.objects.Project,
314    to_cancel: list
315) -> None:
316    """Cancel unwanted GitLab jobs"""
317    if not to_cancel:
318        return
319
320    with ThreadPoolExecutor(max_workers=6) as exe:
321        part = partial(cancel_job, project)
322        exe.map(part, to_cancel)
323
324    # The cancelled jobs are printed without a newline
325    print_once()
326
327
328def print_log(
329    project: gitlab.v4.objects.Project,
330    job_id: int
331) -> None:
332    """Print job log into output"""
333    printed_lines = 0
334    while True:
335        job = project.jobs.get(job_id)
336
337        # GitLab's REST API doesn't offer pagination for logs, so we have to refetch it all
338        lines = job.trace().decode().splitlines()
339        for line in lines[printed_lines:]:
340            print(line)
341        printed_lines = len(lines)
342
343        if job.status in COMPLETED_STATUSES:
344            print(Fore.GREEN + f"Job finished: {job.web_url}" + Style.RESET_ALL)
345            return
346        pretty_wait(REFRESH_WAIT_LOG)
347
348
349def parse_args() -> argparse.Namespace:
350    """Parse args"""
351    parser = argparse.ArgumentParser(
352        description="Tool to trigger a subset of container jobs "
353        + "and monitor the progress of a test job",
354        epilog="Example: mesa-monitor.py --rev $(git rev-parse HEAD) "
355        + '--target ".*traces" ',
356    )
357    parser.add_argument(
358        "--target",
359        metavar="target-job",
360        help="Target job regex. For multiple targets, pass multiple values, "
361             "eg. `--target foo bar`. Only jobs in the target stage(s) "
362             "supplied, and their dependencies, will be considered.",
363        required=True,
364        nargs=argparse.ONE_OR_MORE,
365    )
366    parser.add_argument(
367        "--include-stage",
368        metavar="include-stage",
369        help="Job stages to include when searching for target jobs. "
370             "For multiple targets, pass multiple values, eg. "
371             "`--include-stage foo bar`.",
372        default=[".*"],
373        nargs=argparse.ONE_OR_MORE,
374    )
375    parser.add_argument(
376        "--exclude-stage",
377        metavar="exclude-stage",
378        help="Job stages to exclude when searching for target jobs. "
379             "For multiple targets, pass multiple values, eg. "
380             "`--exclude-stage foo bar`. By default, performance and "
381             "post-merge jobs are excluded; pass --exclude-stage '' to "
382             "include them for consideration.",
383        default=["performance", ".*-postmerge"],
384        nargs=argparse.ONE_OR_MORE,
385    )
386    parser.add_argument(
387        "--token",
388        metavar="token",
389        type=str,
390        default=get_token_from_default_dir(),
391        help="Use the provided GitLab token or token file, "
392             f"otherwise it's read from {TOKEN_DIR / 'gitlab-token'}",
393    )
394    parser.add_argument(
395        "--force-manual", action="store_true",
396        help="Deprecated argument; manual jobs are always force-enabled"
397    )
398    parser.add_argument(
399        "--stress",
400        default=0,
401        type=int,
402        help="Stresstest job(s). Specify the number of times to rerun the selected jobs, "
403             "or use -1 for indefinite. Defaults to 0. If jobs have already been executed, "
404             "this will ensure the total run count respects the specified number.",
405    )
406    parser.add_argument(
407        "--project",
408        default="mesa",
409        help="GitLab project in the format <user>/<project> or just <project>",
410    )
411    parser.add_argument(
412        "--dry-run",
413        action="store_true",
414        help="Exit after printing target jobs and dependencies",
415    )
416
417    mutex_group1 = parser.add_mutually_exclusive_group()
418    mutex_group1.add_argument(
419        "--rev", default="HEAD", metavar="revision", help="repository git revision (default: HEAD)"
420    )
421    mutex_group1.add_argument(
422        "--pipeline-url",
423        help="URL of the pipeline to use, instead of auto-detecting it.",
424    )
425    mutex_group1.add_argument(
426        "--mr",
427        type=int,
428        help="ID of a merge request; the latest pipeline in that MR will be used.",
429    )
430
431    args = parser.parse_args()
432
433    # argparse doesn't support groups inside add_mutually_exclusive_group(),
434    # which means we can't just put `--project` and `--rev` in a group together,
435    # we have to do this by heand instead.
436    if args.pipeline_url and args.project != parser.get_default("project"):
437        # weird phrasing but it's the error add_mutually_exclusive_group() gives
438        parser.error("argument --project: not allowed with argument --pipeline-url")
439
440    return args
441
442
443def print_detected_jobs(
444    target_dep_dag: "Dag",
445    dependency_jobs: Iterable[str],
446    target_jobs: Iterable[str],
447) -> None:
448    def print_job_set(color: str, kind: str, job_set: Iterable[str]):
449        print(
450            color + f"Running {len(job_set)} {kind} jobs: ",
451            "\n\t",
452            ", ".join(sorted(job_set)),
453            Fore.RESET,
454            "\n",
455        )
456
457    print(Fore.YELLOW + "Detected target job and its dependencies:", "\n")
458    print_dag(target_dep_dag)
459    print_job_set(Fore.MAGENTA, "dependency", dependency_jobs)
460    print_job_set(Fore.BLUE, "target", target_jobs)
461
462
463def find_dependencies(
464    token: str | None,
465    target_jobs_regex: re.Pattern,
466    include_stage_regex: re.Pattern,
467    exclude_stage_regex: re.Pattern,
468    project_path: str,
469    iid: int
470) -> set[str]:
471    """
472    Find the dependencies of the target jobs in a GitLab pipeline.
473
474    This function uses the GitLab GraphQL API to fetch the job dependency graph
475    of a pipeline, filters the graph to only include the target jobs and their
476    dependencies, and returns the names of these jobs.
477
478    Args:
479        token (str | None): The GitLab API token. If None, the API is accessed without
480                            authentication.
481        target_jobs_regex (re.Pattern): A regex pattern to match the names of the target jobs.
482        project_path (str): The path of the GitLab project.
483        iid (int): The internal ID of the pipeline.
484
485    Returns:
486        set[str]: A set of the names of the target jobs and their dependencies.
487
488    Raises:
489        SystemExit: If no target jobs are found in the pipeline.
490    """
491    gql_instance = GitlabGQL(token=token)
492    dag = create_job_needs_dag(
493        gql_instance, {"projectPath": project_path.path_with_namespace, "iid": iid}
494    )
495
496    target_dep_dag = filter_dag(dag, target_jobs_regex, include_stage_regex, exclude_stage_regex)
497    if not target_dep_dag:
498        print(Fore.RED + "The job(s) were not found in the pipeline." + Fore.RESET)
499        sys.exit(1)
500
501    dependency_jobs = set(chain.from_iterable(d["needs"] for d in target_dep_dag.values()))
502    target_jobs = set(target_dep_dag.keys())
503    print_detected_jobs(target_dep_dag, dependency_jobs, target_jobs)
504    return target_jobs.union(dependency_jobs)
505
506
507def print_monitor_summary(
508    execution_collection: Dict[str, Dict[int, Tuple[float, str, str]]],
509    t_start: float,
510) -> None:
511    """Summary of the test execution"""
512    t_end = time.perf_counter()
513    spend_minutes = (t_end - t_start) / 60
514    print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes")  # U+23F2 Timer clock
515    if len(execution_collection) == 0:
516        return
517    print(f"⏲ Jobs execution times:")  # U+23F2 Timer clock
518    job_names = list(execution_collection.keys())
519    job_names.sort()
520    name_field_pad = len(max(job_names, key=len)) + 2
521    for name in job_names:
522        job_executions = execution_collection[name]
523        job_times = ', '.join([__job_duration_record(job_execution)
524                               for job_execution in sorted(job_executions.items())])
525        print(f"* {name:{name_field_pad}}: ({len(job_executions)}) {job_times}")
526
527
528def __job_duration_record(dict_item: tuple) -> str:
529    """
530    Format each pair of job and its duration.
531    :param job_execution: item of execution_collection[name][idn]: Dict[int, Tuple[float, str, str]]
532    """
533    job_id = f"{dict_item[0]}"  # dictionary key
534    job_duration, job_status, job_url = dict_item[1]  # dictionary value, the tuple
535    return (f"{STATUS_COLORS[job_status]}"
536            f"{link2print(job_url, job_id)}: {pretty_duration(job_duration):>8}"
537            f"{Style.RESET_ALL}")
538
539
540def link2print(url: str, text: str, text_pad: int = 0) -> str:
541    text_pad = len(text) if text_pad < 1 else text_pad
542    return f"{URL_START}{url}\a{text:{text_pad}}{URL_END}"
543
544
545def main() -> None:
546    try:
547        t_start = time.perf_counter()
548
549        args = parse_args()
550
551        token = read_token(args.token)
552
553        gl = gitlab.Gitlab(url=GITLAB_URL,
554                           private_token=token,
555                           retry_transient_errors=True)
556
557        REV: str = args.rev
558
559        if args.pipeline_url:
560            pipe, cur_project = get_gitlab_pipeline_from_url(gl, args.pipeline_url)
561            REV = pipe.sha
562        else:
563            mesa_project = gl.projects.get("mesa/mesa")
564            projects = [mesa_project]
565            if args.mr:
566                REV = mesa_project.mergerequests.get(args.mr).sha
567            else:
568                REV = check_output(['git', 'rev-parse', REV]).decode('ascii').strip()
569
570                if args.rev == 'HEAD':
571                    try:
572                        branch_name = check_output([
573                            'git', 'symbolic-ref', '-q', 'HEAD',
574                        ]).decode('ascii').strip()
575                    except CalledProcessError:
576                        branch_name = ""
577
578                    # Ignore detached heads
579                    if branch_name:
580                        tracked_remote = check_output([
581                            'git', 'for-each-ref', '--format=%(upstream)',
582                            branch_name,
583                        ]).decode('ascii').strip()
584
585                        # Ignore local branches that do not track any remote
586                        if tracked_remote:
587                            remote_rev = check_output([
588                                'git', 'rev-parse', tracked_remote,
589                            ]).decode('ascii').strip()
590
591                            if REV != remote_rev:
592                                print(
593                                    f"Local HEAD commit {REV[:10]} is different than "
594                                    f"tracked remote HEAD commit {remote_rev[:10]}"
595                                )
596                                print("Did you forget to `git push` ?")
597
598                projects.append(get_gitlab_project(gl, args.project))
599            (pipe, cur_project) = wait_for_pipeline(projects, REV)
600
601        print(f"Revision: {REV}")
602        print(f"Pipeline: {pipe.web_url}")
603
604        target = '|'.join(args.target)
605        target = target.strip()
606
607        print("�� target job: " + Fore.BLUE + target + Style.RESET_ALL)  # U+1F78B Round target
608
609        # Implicitly include `parallel:` jobs
610        target = f'({target})' + r'( \d+/\d+)?'
611
612        target_jobs_regex = re.compile(target)
613
614        include_stage = '|'.join(args.include_stage)
615        include_stage = include_stage.strip()
616
617        print("�� target from stages: " + Fore.BLUE + include_stage + Style.RESET_ALL)  # U+1F78B Round target
618
619        include_stage_regex = re.compile(include_stage)
620
621        exclude_stage = '|'.join(args.exclude_stage)
622        exclude_stage = exclude_stage.strip()
623
624        print("�� target excluding stages: " + Fore.BLUE + exclude_stage + Style.RESET_ALL)  # U+1F78B Round target
625
626        exclude_stage_regex = re.compile(exclude_stage)
627
628        deps = find_dependencies(
629            token=token,
630            target_jobs_regex=target_jobs_regex,
631            include_stage_regex=include_stage_regex,
632            exclude_stage_regex=exclude_stage_regex,
633            iid=pipe.iid,
634            project_path=cur_project
635        )
636
637        if args.dry_run:
638            sys.exit(0)
639
640        target_job_id, ret, exec_t = monitor_pipeline(
641            cur_project,
642            pipe,
643            target_jobs_regex,
644            include_stage_regex,
645            exclude_stage_regex,
646            deps,
647            args.stress
648        )
649
650        if target_job_id:
651            print_log(cur_project, target_job_id)
652
653        print_monitor_summary(exec_t, t_start)
654
655        sys.exit(ret)
656    except KeyboardInterrupt:
657        sys.exit(1)
658
659
660if __name__ == "__main__":
661    main()
662