xref: /aosp_15_r20/external/toolchain-utils/llvm_tools/cros_cls.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2024 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Tools for interacting with CrOS CLs, and the CQ in particular."""
6
7import dataclasses
8import json
9import logging
10import re
11import subprocess
12from typing import Any, Dict, Iterable, List, Optional
13
14
15BuildID = int
16
17
18def _run_bb_decoding_output(command: List[str], multiline: bool = False) -> Any:
19    """Runs `bb` with the `json` flag, and decodes the command's output.
20
21    Args:
22        command: Command to run
23        multiline: If True, this function will parse each line of bb's output
24            as a separate JSON object, and a return a list of all parsed
25            objects.
26    """
27    # `bb` always parses argv[1] as a command, so put `-json` after the first
28    # arg to `bb`.
29    run_command = ["bb", command[0], "-json"] + command[1:]
30    stdout = subprocess.run(
31        run_command,
32        check=True,
33        stdin=subprocess.DEVNULL,
34        stdout=subprocess.PIPE,
35        encoding="utf-8",
36    ).stdout
37
38    def parse_or_log(text: str) -> Any:
39        try:
40            return json.loads(text)
41        except json.JSONDecodeError:
42            logging.error(
43                "Error parsing JSON from command %r; bubbling up. Tried to "
44                "parse: %r",
45                run_command,
46                text,
47            )
48            raise
49
50    if multiline:
51        return [
52            parse_or_log(line)
53            for line in stdout.splitlines()
54            if line and not line.isspace()
55        ]
56    return parse_or_log(stdout)
57
58
59@dataclasses.dataclass(frozen=True, eq=True)
60class ChangeListURL:
61    """A consistent representation of a CL URL.
62
63    The __str__s always converts to a crrev.com URL.
64    """
65
66    cl_id: int
67    patch_set: Optional[int] = None
68    internal: bool = False
69
70    @classmethod
71    def parse(cls, url: str) -> "ChangeListURL":
72        url_re = re.compile(
73            # Match an optional https:// header.
74            r"(?:https?://)?"
75            # Match either chromium-review or crrev, leaving the CL number and
76            # patch set as the next parts. These can be parsed in unison.
77            r"(chromium-review\.googlesource\.com.*/\+/"
78            r"|crrev\.com/[ci]/"
79            r"|chrome-internal-review\.googlesource\.com.*/\+/)"
80            # Match the CL number...
81            r"(\d+)"
82            # and (optionally) the patch-set, as well as consuming any of the
83            # path after the patch-set.
84            r"(?:/(\d+)?(?:/.*)?)?"
85            # Validate any sort of GET params for completeness.
86            r"(?:$|[?&].*)"
87        )
88
89        m = url_re.fullmatch(url)
90        if not m:
91            raise ValueError(
92                f"URL {url!r} was not recognized. Supported URL formats are "
93                "crrev.com/c/${cl_number}/${patch_set_number}, and "
94                "chromium-review.googlesource.com/c/project/path/+/"
95                "${cl_number}/${patch_set_number}. The patch-set number is "
96                "optional, and there may be a preceding http:// or https://. "
97                "Internal CL links are also supported."
98            )
99        host, cl_id, maybe_patch_set = m.groups()
100        internal = host.startswith("chrome-internal-review") or host.startswith(
101            "crrev.com/i/"
102        )
103        if maybe_patch_set is not None:
104            maybe_patch_set = int(maybe_patch_set)
105        return cls(int(cl_id), maybe_patch_set, internal)
106
107    @classmethod
108    def parse_with_patch_set(cls, url: str) -> "ChangeListURL":
109        """parse(), but raises a ValueError if no patchset is specified."""
110        result = cls.parse(url)
111        if result.patch_set is None:
112            raise ValueError("A patchset number must be specified.")
113        return result
114
115    def crrev_url_without_http(self):
116        namespace = "i" if self.internal else "c"
117        result = f"crrev.com/{namespace}/{self.cl_id}"
118        if self.patch_set is not None:
119            result += f"/{self.patch_set}"
120        return result
121
122    def __str__(self):
123        return f"https://{self.crrev_url_without_http()}"
124
125
126def builder_url(build_id: BuildID) -> str:
127    """Returns a builder URL given a build ID."""
128    return f"https://ci.chromium.org/b/{build_id}"
129
130
131def fetch_cq_orchestrator_ids(
132    cl: ChangeListURL,
133) -> List[BuildID]:
134    """Returns the BuildID of completed cq-orchestrator runs on a CL.
135
136    Newer runs are sorted later in the list.
137    """
138    results: List[Dict[str, Any]] = _run_bb_decoding_output(
139        [
140            "ls",
141            "-cl",
142            str(cl),
143            "chromeos/cq/cq-orchestrator",
144        ],
145        multiline=True,
146    )
147
148    # We can theoretically filter on a status flag, but it seems to only accept
149    # at most one value. Filter here instead; parsing one or two extra JSON
150    # objects is cheap.
151    finished_results = [
152        x for x in results if x["status"] not in ("scheduled", "started")
153    ]
154
155    # Sort by createTime. Fall back to build ID if a tie needs to be broken.
156    # While `createTime` is a string, it's formatted so it can be sorted
157    # correctly without parsing.
158    finished_results.sort(key=lambda x: (x["createTime"], x["id"]))
159    return [int(x["id"]) for x in finished_results]
160
161
162@dataclasses.dataclass(frozen=True)
163class CQOrchestratorOutput:
164    """A class representing the output of a cq-orchestrator builder."""
165
166    # The status of the CQ builder.
167    status: str
168    # A dict of builders that this CQ builder spawned.
169    child_builders: Dict[str, BuildID]
170
171    @classmethod
172    def fetch(cls, bot_id: BuildID) -> "CQOrchestratorOutput":
173        decoded: Dict[str, Any] = _run_bb_decoding_output(
174            ["get", "-steps", str(bot_id)]
175        )
176        results = {}
177
178        # cq-orchestrator spawns builders in a series of steps. Each step has a
179        # markdownified link to the builder in the summaryMarkdown for each
180        # step. This loop parses those out.
181        build_url_re = re.compile(
182            re.escape("https://cr-buildbucket.appspot.com/build/") + r"(\d+)"
183        )
184        # Example step name containing a build URL:
185        # "run builds|schedule new builds|${builder_name}". `builder_name`
186        # contains no spaces, though follow-up steps with the same prefix might
187        # include spaces.
188        step_name_re = re.compile(
189            re.escape("run builds|schedule new builds|") + "([^ ]+)"
190        )
191        for step in decoded["steps"]:
192            step_name = step["name"]
193            m = step_name_re.fullmatch(step_name)
194            if not m:
195                continue
196
197            builder = m.group(1)
198            summary = step["summaryMarkdown"]
199            ids = build_url_re.findall(summary)
200            if len(ids) != 1:
201                raise ValueError(
202                    f"Parsing summary of builder {builder} failed: wanted one "
203                    f"match for {build_url_re}; got {ids}. Full summary: "
204                    f"{summary!r}"
205                )
206            if builder in results:
207                raise ValueError(f"Builder {builder} spawned multiple times?")
208            results[builder] = int(ids[0])
209        return cls(child_builders=results, status=decoded["status"])
210
211
212@dataclasses.dataclass(frozen=True)
213class CQBoardBuilderOutput:
214    """A class representing the output of a *-cq builder (e.g., brya-cq)."""
215
216    # The status of the CQ builder.
217    status: str
218    # Link to artifacts produced by this builder. Not available if the builder
219    # isn't yet finished, and not available if the builder failed in a weird
220    # way (e.g., INFRA_ERROR)
221    artifacts_link: Optional[str]
222
223    @classmethod
224    def fetch_many(
225        cls, bot_ids: Iterable[BuildID]
226    ) -> List["CQBoardBuilderOutput"]:
227        """Fetches CQBoardBuilderOutput for the given bots."""
228        bb_output = _run_bb_decoding_output(
229            ["get", "-p"] + [str(x) for x in bot_ids], multiline=True
230        )
231        results = []
232        for result in bb_output:
233            status = result["status"]
234            output = result.get("output")
235            if output is None:
236                artifacts_link = None
237            else:
238                artifacts_link = output["properties"].get("artifact_link")
239            results.append(cls(status=status, artifacts_link=artifacts_link))
240        return results
241
242
243def parse_release_from_builder_artifacts_link(artifacts_link: str) -> str:
244    """Parses the release version from a builder artifacts link.
245
246    >>> parse_release_from_builder_artifacts_link(
247        "gs://chromeos-image-archive/amd64-generic-asan-cq/"
248        "R122-15711.0.0-59730-8761718482083052481")
249    "R122-15711.0.0"
250    """
251    results = re.findall(r"/(R\d+-\d+\.\d+\.\d+)-", artifacts_link)
252    if len(results) != 1:
253        raise ValueError(
254            f"Expected one release version in {artifacts_link}; got: {results}"
255        )
256    return results[0]
257