1# Copyright 2024 The ChromiumOS Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Tools for interacting with CrOS CLs, and the CQ in particular.""" 6 7import dataclasses 8import json 9import logging 10import re 11import subprocess 12from typing import Any, Dict, Iterable, List, Optional 13 14 15BuildID = int 16 17 18def _run_bb_decoding_output(command: List[str], multiline: bool = False) -> Any: 19 """Runs `bb` with the `json` flag, and decodes the command's output. 20 21 Args: 22 command: Command to run 23 multiline: If True, this function will parse each line of bb's output 24 as a separate JSON object, and a return a list of all parsed 25 objects. 26 """ 27 # `bb` always parses argv[1] as a command, so put `-json` after the first 28 # arg to `bb`. 29 run_command = ["bb", command[0], "-json"] + command[1:] 30 stdout = subprocess.run( 31 run_command, 32 check=True, 33 stdin=subprocess.DEVNULL, 34 stdout=subprocess.PIPE, 35 encoding="utf-8", 36 ).stdout 37 38 def parse_or_log(text: str) -> Any: 39 try: 40 return json.loads(text) 41 except json.JSONDecodeError: 42 logging.error( 43 "Error parsing JSON from command %r; bubbling up. Tried to " 44 "parse: %r", 45 run_command, 46 text, 47 ) 48 raise 49 50 if multiline: 51 return [ 52 parse_or_log(line) 53 for line in stdout.splitlines() 54 if line and not line.isspace() 55 ] 56 return parse_or_log(stdout) 57 58 59@dataclasses.dataclass(frozen=True, eq=True) 60class ChangeListURL: 61 """A consistent representation of a CL URL. 62 63 The __str__s always converts to a crrev.com URL. 64 """ 65 66 cl_id: int 67 patch_set: Optional[int] = None 68 internal: bool = False 69 70 @classmethod 71 def parse(cls, url: str) -> "ChangeListURL": 72 url_re = re.compile( 73 # Match an optional https:// header. 74 r"(?:https?://)?" 75 # Match either chromium-review or crrev, leaving the CL number and 76 # patch set as the next parts. These can be parsed in unison. 77 r"(chromium-review\.googlesource\.com.*/\+/" 78 r"|crrev\.com/[ci]/" 79 r"|chrome-internal-review\.googlesource\.com.*/\+/)" 80 # Match the CL number... 81 r"(\d+)" 82 # and (optionally) the patch-set, as well as consuming any of the 83 # path after the patch-set. 84 r"(?:/(\d+)?(?:/.*)?)?" 85 # Validate any sort of GET params for completeness. 86 r"(?:$|[?&].*)" 87 ) 88 89 m = url_re.fullmatch(url) 90 if not m: 91 raise ValueError( 92 f"URL {url!r} was not recognized. Supported URL formats are " 93 "crrev.com/c/${cl_number}/${patch_set_number}, and " 94 "chromium-review.googlesource.com/c/project/path/+/" 95 "${cl_number}/${patch_set_number}. The patch-set number is " 96 "optional, and there may be a preceding http:// or https://. " 97 "Internal CL links are also supported." 98 ) 99 host, cl_id, maybe_patch_set = m.groups() 100 internal = host.startswith("chrome-internal-review") or host.startswith( 101 "crrev.com/i/" 102 ) 103 if maybe_patch_set is not None: 104 maybe_patch_set = int(maybe_patch_set) 105 return cls(int(cl_id), maybe_patch_set, internal) 106 107 @classmethod 108 def parse_with_patch_set(cls, url: str) -> "ChangeListURL": 109 """parse(), but raises a ValueError if no patchset is specified.""" 110 result = cls.parse(url) 111 if result.patch_set is None: 112 raise ValueError("A patchset number must be specified.") 113 return result 114 115 def crrev_url_without_http(self): 116 namespace = "i" if self.internal else "c" 117 result = f"crrev.com/{namespace}/{self.cl_id}" 118 if self.patch_set is not None: 119 result += f"/{self.patch_set}" 120 return result 121 122 def __str__(self): 123 return f"https://{self.crrev_url_without_http()}" 124 125 126def builder_url(build_id: BuildID) -> str: 127 """Returns a builder URL given a build ID.""" 128 return f"https://ci.chromium.org/b/{build_id}" 129 130 131def fetch_cq_orchestrator_ids( 132 cl: ChangeListURL, 133) -> List[BuildID]: 134 """Returns the BuildID of completed cq-orchestrator runs on a CL. 135 136 Newer runs are sorted later in the list. 137 """ 138 results: List[Dict[str, Any]] = _run_bb_decoding_output( 139 [ 140 "ls", 141 "-cl", 142 str(cl), 143 "chromeos/cq/cq-orchestrator", 144 ], 145 multiline=True, 146 ) 147 148 # We can theoretically filter on a status flag, but it seems to only accept 149 # at most one value. Filter here instead; parsing one or two extra JSON 150 # objects is cheap. 151 finished_results = [ 152 x for x in results if x["status"] not in ("scheduled", "started") 153 ] 154 155 # Sort by createTime. Fall back to build ID if a tie needs to be broken. 156 # While `createTime` is a string, it's formatted so it can be sorted 157 # correctly without parsing. 158 finished_results.sort(key=lambda x: (x["createTime"], x["id"])) 159 return [int(x["id"]) for x in finished_results] 160 161 162@dataclasses.dataclass(frozen=True) 163class CQOrchestratorOutput: 164 """A class representing the output of a cq-orchestrator builder.""" 165 166 # The status of the CQ builder. 167 status: str 168 # A dict of builders that this CQ builder spawned. 169 child_builders: Dict[str, BuildID] 170 171 @classmethod 172 def fetch(cls, bot_id: BuildID) -> "CQOrchestratorOutput": 173 decoded: Dict[str, Any] = _run_bb_decoding_output( 174 ["get", "-steps", str(bot_id)] 175 ) 176 results = {} 177 178 # cq-orchestrator spawns builders in a series of steps. Each step has a 179 # markdownified link to the builder in the summaryMarkdown for each 180 # step. This loop parses those out. 181 build_url_re = re.compile( 182 re.escape("https://cr-buildbucket.appspot.com/build/") + r"(\d+)" 183 ) 184 # Example step name containing a build URL: 185 # "run builds|schedule new builds|${builder_name}". `builder_name` 186 # contains no spaces, though follow-up steps with the same prefix might 187 # include spaces. 188 step_name_re = re.compile( 189 re.escape("run builds|schedule new builds|") + "([^ ]+)" 190 ) 191 for step in decoded["steps"]: 192 step_name = step["name"] 193 m = step_name_re.fullmatch(step_name) 194 if not m: 195 continue 196 197 builder = m.group(1) 198 summary = step["summaryMarkdown"] 199 ids = build_url_re.findall(summary) 200 if len(ids) != 1: 201 raise ValueError( 202 f"Parsing summary of builder {builder} failed: wanted one " 203 f"match for {build_url_re}; got {ids}. Full summary: " 204 f"{summary!r}" 205 ) 206 if builder in results: 207 raise ValueError(f"Builder {builder} spawned multiple times?") 208 results[builder] = int(ids[0]) 209 return cls(child_builders=results, status=decoded["status"]) 210 211 212@dataclasses.dataclass(frozen=True) 213class CQBoardBuilderOutput: 214 """A class representing the output of a *-cq builder (e.g., brya-cq).""" 215 216 # The status of the CQ builder. 217 status: str 218 # Link to artifacts produced by this builder. Not available if the builder 219 # isn't yet finished, and not available if the builder failed in a weird 220 # way (e.g., INFRA_ERROR) 221 artifacts_link: Optional[str] 222 223 @classmethod 224 def fetch_many( 225 cls, bot_ids: Iterable[BuildID] 226 ) -> List["CQBoardBuilderOutput"]: 227 """Fetches CQBoardBuilderOutput for the given bots.""" 228 bb_output = _run_bb_decoding_output( 229 ["get", "-p"] + [str(x) for x in bot_ids], multiline=True 230 ) 231 results = [] 232 for result in bb_output: 233 status = result["status"] 234 output = result.get("output") 235 if output is None: 236 artifacts_link = None 237 else: 238 artifacts_link = output["properties"].get("artifact_link") 239 results.append(cls(status=status, artifacts_link=artifacts_link)) 240 return results 241 242 243def parse_release_from_builder_artifacts_link(artifacts_link: str) -> str: 244 """Parses the release version from a builder artifacts link. 245 246 >>> parse_release_from_builder_artifacts_link( 247 "gs://chromeos-image-archive/amd64-generic-asan-cq/" 248 "R122-15711.0.0-59730-8761718482083052481") 249 "R122-15711.0.0" 250 """ 251 results = re.findall(r"/(R\d+-\d+\.\d+\.\d+)-", artifacts_link) 252 if len(results) != 1: 253 raise ValueError( 254 f"Expected one release version in {artifacts_link}; got: {results}" 255 ) 256 return results[0] 257