xref: /aosp_15_r20/external/crosvm/tools/impl/vcs.py (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1#!/usr/bin/env python3
2# Copyright 2023 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Provides helpers for accessing gerrit and listing files under version control.
8"""
9
10import functools
11import getpass
12import json
13import shutil
14import sys
15from pathlib import Path
16from tempfile import gettempdir
17from typing import (
18    Any,
19    Dict,
20    List,
21    cast,
22)
23
24from .command import quoted, cmd
25from .util import very_verbose
26
27# File where to store http headers for gcloud authentication
28AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}"
29
30"Url of crosvm's gerrit review host"
31GERRIT_URL = "https://chromium-review.googlesource.com"
32
33
34def all_tracked_files():
35    for line in cmd("git ls-files").lines():
36        file = Path(line)
37        if file.is_file():
38            yield file
39
40
41def find_source_files(extension: str, ignore: List[str] = []):
42    for file in all_tracked_files():
43        if file.suffix != f".{extension}":
44            continue
45        if file.is_relative_to("third_party"):
46            continue
47        if str(file) in ignore:
48            continue
49        yield file
50
51
52def find_scripts(path: Path, shebang: str):
53    for file in path.glob("*"):
54        if file.is_file() and file.open(errors="ignore").read(512).startswith(f"#!{shebang}"):
55            yield file
56
57
58def get_cookie_file():
59    path = cmd("git config http.cookiefile").stdout(check=False)
60    return Path(path) if path else None
61
62
63def get_gcloud_access_token():
64    if not shutil.which("gcloud"):
65        return None
66    return cmd("gcloud auth print-access-token").stdout(check=False)
67
68
69@functools.lru_cache(maxsize=None)
70def curl_with_git_auth():
71    """
72    Returns a curl `Command` instance set up to use the same HTTP credentials as git.
73
74    This currently supports two methods:
75    - git cookies (the default)
76    - gcloud
77
78    Most developers will use git cookies, which are passed to curl.
79
80    glloud for authorization can be enabled in git via `git config credential.helper gcloud.sh`.
81    If enabled in git, this command will also return a curl command using a gloud access token.
82    """
83    helper = cmd("git config credential.helper").stdout(check=False)
84
85    if not helper:
86        cookie_file = get_cookie_file()
87        if not cookie_file or not cookie_file.is_file():
88            raise Exception("git http cookiefile is not available.")
89        return cmd("curl --cookie", cookie_file)
90
91    if helper.endswith("gcloud.sh"):
92        token = get_gcloud_access_token()
93        if not token:
94            raise Exception("Cannot get gcloud access token.")
95        # File where to store http headers for gcloud authentication
96        AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}"
97
98        # Write token to a header file so it will not appear in logs or error messages.
99        AUTH_HEADERS_FILE.write_text(f"Authorization: Bearer {token}")
100        return cmd(f"curl -H @{AUTH_HEADERS_FILE}")
101
102    raise Exception(f"Unsupported git credentials.helper: {helper}")
103
104
105def strip_xssi(response: str):
106    # See https://gerrit-review.googlesource.com/Documentation/rest-api.html#output
107    assert response.startswith(")]}'\n")
108    return response[5:]
109
110
111def gerrit_api_get(path: str):
112    response = cmd(f"curl --silent --fail {GERRIT_URL}/{path}").stdout()
113    return json.loads(strip_xssi(response))
114
115
116def gerrit_api_post(path: str, body: Any):
117    response = curl_with_git_auth()(
118        "--silent --fail",
119        "-X POST",
120        "-H",
121        quoted("Content-Type: application/json"),
122        "-d",
123        quoted(json.dumps(body)),
124        f"{GERRIT_URL}/a/{path}",
125    ).stdout()
126    if very_verbose():
127        print("Response:", response)
128    return json.loads(strip_xssi(response))
129
130
131class GerritChange(object):
132    """
133    Class to interact with the gerrit /changes/ API.
134
135    For information on the data format returned by the API, see:
136    https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#change-info
137    """
138
139    id: str
140    _data: Any
141
142    def __init__(self, data: Any):
143        self._data = data
144        self.id = data["id"]
145
146    @functools.cached_property
147    def _details(self) -> Any:
148        return gerrit_api_get(f"changes/{self.id}/detail")
149
150    @functools.cached_property
151    def _messages(self) -> List[Any]:
152        return gerrit_api_get(f"changes/{self.id}/messages")
153
154    @property
155    def status(self):
156        return cast(str, self._data["status"])
157
158    def get_votes(self, label_name: str) -> List[int]:
159        "Returns the list of votes on `label_name`"
160        label_info = self._details.get("labels", {}).get(label_name)
161        votes = label_info.get("all", [])
162        return [cast(int, v.get("value")) for v in votes]
163
164    def get_messages_by(self, email: str) -> List[str]:
165        "Returns all messages posted by the user with the specified `email`."
166        return [m["message"] for m in self._messages if m["author"].get("email") == email]
167
168    def review(self, message: str, labels: Dict[str, int]):
169        "Post review `message` and set the specified review `labels`"
170        print("Posting on", self, ":", message, labels)
171        gerrit_api_post(
172            f"changes/{self.id}/revisions/current/review",
173            {"message": message, "labels": labels},
174        )
175
176    def abandon(self, message: str):
177        print("Abandoning", self, ":", message)
178        gerrit_api_post(f"changes/{self.id}/abandon", {"message": message})
179
180    @classmethod
181    def query(cls, *queries: str):
182        "Returns a list of gerrit changes matching the provided list of queries."
183        return [cls(c) for c in gerrit_api_get(f"changes/?q={'+'.join(queries)}")]
184
185    def short_url(self):
186        return f"http://crrev.com/c/{self._data['_number']}"
187
188    def __str__(self):
189        return self.short_url()
190
191    def pretty_info(self):
192        return f"{self} - {self._data['subject']}"
193
194
195if __name__ == "__main__":
196    import doctest
197
198    (failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS)
199    sys.exit(1 if failures > 0 else 0)
200