1#!/usr/bin/env python3 2# Copyright 2023 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Provides helpers for accessing gerrit and listing files under version control. 8""" 9 10import functools 11import getpass 12import json 13import shutil 14import sys 15from pathlib import Path 16from tempfile import gettempdir 17from typing import ( 18 Any, 19 Dict, 20 List, 21 cast, 22) 23 24from .command import quoted, cmd 25from .util import very_verbose 26 27# File where to store http headers for gcloud authentication 28AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}" 29 30"Url of crosvm's gerrit review host" 31GERRIT_URL = "https://chromium-review.googlesource.com" 32 33 34def all_tracked_files(): 35 for line in cmd("git ls-files").lines(): 36 file = Path(line) 37 if file.is_file(): 38 yield file 39 40 41def find_source_files(extension: str, ignore: List[str] = []): 42 for file in all_tracked_files(): 43 if file.suffix != f".{extension}": 44 continue 45 if file.is_relative_to("third_party"): 46 continue 47 if str(file) in ignore: 48 continue 49 yield file 50 51 52def find_scripts(path: Path, shebang: str): 53 for file in path.glob("*"): 54 if file.is_file() and file.open(errors="ignore").read(512).startswith(f"#!{shebang}"): 55 yield file 56 57 58def get_cookie_file(): 59 path = cmd("git config http.cookiefile").stdout(check=False) 60 return Path(path) if path else None 61 62 63def get_gcloud_access_token(): 64 if not shutil.which("gcloud"): 65 return None 66 return cmd("gcloud auth print-access-token").stdout(check=False) 67 68 69@functools.lru_cache(maxsize=None) 70def curl_with_git_auth(): 71 """ 72 Returns a curl `Command` instance set up to use the same HTTP credentials as git. 73 74 This currently supports two methods: 75 - git cookies (the default) 76 - gcloud 77 78 Most developers will use git cookies, which are passed to curl. 79 80 glloud for authorization can be enabled in git via `git config credential.helper gcloud.sh`. 81 If enabled in git, this command will also return a curl command using a gloud access token. 82 """ 83 helper = cmd("git config credential.helper").stdout(check=False) 84 85 if not helper: 86 cookie_file = get_cookie_file() 87 if not cookie_file or not cookie_file.is_file(): 88 raise Exception("git http cookiefile is not available.") 89 return cmd("curl --cookie", cookie_file) 90 91 if helper.endswith("gcloud.sh"): 92 token = get_gcloud_access_token() 93 if not token: 94 raise Exception("Cannot get gcloud access token.") 95 # File where to store http headers for gcloud authentication 96 AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}" 97 98 # Write token to a header file so it will not appear in logs or error messages. 99 AUTH_HEADERS_FILE.write_text(f"Authorization: Bearer {token}") 100 return cmd(f"curl -H @{AUTH_HEADERS_FILE}") 101 102 raise Exception(f"Unsupported git credentials.helper: {helper}") 103 104 105def strip_xssi(response: str): 106 # See https://gerrit-review.googlesource.com/Documentation/rest-api.html#output 107 assert response.startswith(")]}'\n") 108 return response[5:] 109 110 111def gerrit_api_get(path: str): 112 response = cmd(f"curl --silent --fail {GERRIT_URL}/{path}").stdout() 113 return json.loads(strip_xssi(response)) 114 115 116def gerrit_api_post(path: str, body: Any): 117 response = curl_with_git_auth()( 118 "--silent --fail", 119 "-X POST", 120 "-H", 121 quoted("Content-Type: application/json"), 122 "-d", 123 quoted(json.dumps(body)), 124 f"{GERRIT_URL}/a/{path}", 125 ).stdout() 126 if very_verbose(): 127 print("Response:", response) 128 return json.loads(strip_xssi(response)) 129 130 131class GerritChange(object): 132 """ 133 Class to interact with the gerrit /changes/ API. 134 135 For information on the data format returned by the API, see: 136 https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#change-info 137 """ 138 139 id: str 140 _data: Any 141 142 def __init__(self, data: Any): 143 self._data = data 144 self.id = data["id"] 145 146 @functools.cached_property 147 def _details(self) -> Any: 148 return gerrit_api_get(f"changes/{self.id}/detail") 149 150 @functools.cached_property 151 def _messages(self) -> List[Any]: 152 return gerrit_api_get(f"changes/{self.id}/messages") 153 154 @property 155 def status(self): 156 return cast(str, self._data["status"]) 157 158 def get_votes(self, label_name: str) -> List[int]: 159 "Returns the list of votes on `label_name`" 160 label_info = self._details.get("labels", {}).get(label_name) 161 votes = label_info.get("all", []) 162 return [cast(int, v.get("value")) for v in votes] 163 164 def get_messages_by(self, email: str) -> List[str]: 165 "Returns all messages posted by the user with the specified `email`." 166 return [m["message"] for m in self._messages if m["author"].get("email") == email] 167 168 def review(self, message: str, labels: Dict[str, int]): 169 "Post review `message` and set the specified review `labels`" 170 print("Posting on", self, ":", message, labels) 171 gerrit_api_post( 172 f"changes/{self.id}/revisions/current/review", 173 {"message": message, "labels": labels}, 174 ) 175 176 def abandon(self, message: str): 177 print("Abandoning", self, ":", message) 178 gerrit_api_post(f"changes/{self.id}/abandon", {"message": message}) 179 180 @classmethod 181 def query(cls, *queries: str): 182 "Returns a list of gerrit changes matching the provided list of queries." 183 return [cls(c) for c in gerrit_api_get(f"changes/?q={'+'.join(queries)}")] 184 185 def short_url(self): 186 return f"http://crrev.com/c/{self._data['_number']}" 187 188 def __str__(self): 189 return self.short_url() 190 191 def pretty_info(self): 192 return f"{self} - {self._data['subject']}" 193 194 195if __name__ == "__main__": 196 import doctest 197 198 (failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS) 199 sys.exit(1 if failures > 0 else 0) 200