xref: /aosp_15_r20/external/crosvm/tools/impl/util.py (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1#!/usr/bin/env python3
2# Copyright 2023 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Provides general utility functions.
8"""
9
10import argparse
11import contextlib
12import datetime
13import functools
14import os
15import re
16import subprocess
17import sys
18import urllib
19import urllib.request
20import urllib.error
21from pathlib import Path
22from subprocess import DEVNULL, PIPE, STDOUT  # type: ignore
23from typing import (
24    Dict,
25    List,
26    NamedTuple,
27    Optional,
28    Tuple,
29    Union,
30)
31
32PathLike = Union[Path, str]
33
34# Regex that matches ANSI escape sequences
35ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
36
37
38def find_crosvm_root():
39    "Walk up from CWD until we find the crosvm root dir."
40    path = Path("").resolve()
41    while True:
42        if (path / "tools/impl/common.py").is_file():
43            return path
44        if path.parent:
45            path = path.parent
46        else:
47            raise Exception("Cannot find crosvm root dir.")
48
49
50"Root directory of crosvm derived from CWD."
51CROSVM_ROOT = find_crosvm_root()
52
53"Cargo.toml file of crosvm"
54CROSVM_TOML = CROSVM_ROOT / "Cargo.toml"
55
56"""
57Root directory of crosvm devtools.
58
59May be different from `CROSVM_ROOT/tools`, which is allows you to run the crosvm dev
60tools from this directory on another crosvm repo.
61
62Use this if you want to call crosvm dev tools, which will use the scripts relative
63to this file.
64"""
65TOOLS_ROOT = Path(__file__).parent.parent.resolve()
66
67"Cache directory that is preserved between builds in CI."
68CACHE_DIR = Path(os.environ.get("CROSVM_CACHE_DIR", os.environ.get("TMPDIR", "/tmp")))
69
70# Ensure that we really found the crosvm root directory
71assert 'name = "crosvm"' in CROSVM_TOML.read_text()
72
73# List of times recorded by `record_time` which will be printed if --timing-info is provided.
74global_time_records: List[Tuple[str, datetime.timedelta]] = []
75
76
77def crosvm_target_dir():
78    crosvm_target = os.environ.get("CROSVM_TARGET_DIR")
79    cargo_target = os.environ.get("CARGO_TARGET_DIR")
80    if crosvm_target:
81        return Path(crosvm_target)
82    elif cargo_target:
83        return Path(cargo_target) / "crosvm"
84    else:
85        return CROSVM_ROOT / "target/crosvm"
86
87
88@functools.lru_cache(None)
89def parse_common_args():
90    """
91    Parse args common to all scripts
92
93    These args are parsed separately of the run_main/run_commands method so we can access
94    verbose/etc before the commands arguments are parsed.
95    """
96    parser = argparse.ArgumentParser(add_help=False)
97    add_common_args(parser)
98    return parser.parse_known_args()[0]
99
100
101def add_common_args(parser: argparse.ArgumentParser):
102    "These args are added to all commands."
103    parser.add_argument(
104        "--color",
105        default="auto",
106        choices=("always", "never", "auto"),
107        help="Force enable or disable colors. Defaults to automatic detection.",
108    )
109    parser.add_argument(
110        "--verbose",
111        "-v",
112        action="store_true",
113        default=False,
114        help="Print more details about the commands this script is running.",
115    )
116    parser.add_argument(
117        "--very-verbose",
118        "-vv",
119        action="store_true",
120        default=False,
121        help="Print more debug output",
122    )
123    parser.add_argument(
124        "--timing-info",
125        action="store_true",
126        default=False,
127        help="Print info on how long which parts of the command take",
128    )
129
130
131def verbose():
132    return very_verbose() or parse_common_args().verbose
133
134
135def very_verbose():
136    return parse_common_args().very_verbose
137
138
139def color_enabled():
140    color_arg = parse_common_args().color
141    if color_arg == "never":
142        return False
143    if color_arg == "always":
144        return True
145    return sys.stdout.isatty()
146
147
148def find_scripts(path: Path, shebang: str):
149    for file in path.glob("*"):
150        if file.is_file() and file.open(errors="ignore").read(512).startswith(f"#!{shebang}"):
151            yield file
152
153
154def confirm(message: str, default: bool = False):
155    print(message, "[y/N]" if default == False else "[Y/n]", end=" ", flush=True)
156    response = sys.stdin.readline().strip()
157    if response in ("y", "Y"):
158        return True
159    if response in ("n", "N"):
160        return False
161    return default
162
163
164def is_cros_repo():
165    "Returns true if the crosvm repo is a symlink or worktree to a CrOS repo checkout."
166    dot_git = CROSVM_ROOT / ".git"
167    if not dot_git.is_symlink() and dot_git.is_dir():
168        return False
169    return (cros_repo_root() / ".repo").exists()
170
171
172def cros_repo_root():
173    "Root directory of the CrOS repo checkout."
174    return (CROSVM_ROOT / "../../..").resolve()
175
176
177def is_kiwi_repo():
178    "Returns true if the crosvm repo contains .kiwi_repo file."
179    dot_kiwi_repo = CROSVM_ROOT / ".kiwi_repo"
180    return dot_kiwi_repo.exists()
181
182
183def kiwi_repo_root():
184    "Root directory of the kiwi repo checkout."
185    return (CROSVM_ROOT / "../..").resolve()
186
187def is_aosp_repo():
188    "Returns true if the crosvm repo is an AOSP repo checkout."
189    android_bp = CROSVM_ROOT / "Android.bp"
190    return android_bp.exists()
191
192def aosp_repo_root():
193    "Root directory of AOSP repo checkout."
194    return (CROSVM_ROOT / "../..").resolve()
195
196def is_aosp_repo():
197    "Returns true if the crosvm repo is an AOSP repo checkout."
198    android_bp = CROSVM_ROOT / "Android.bp"
199    return android_bp.exists()
200
201
202def aosp_repo_root():
203    "Root directory of AOSP repo checkout."
204    return (CROSVM_ROOT / "../..").resolve()
205
206
207def sudo_is_passwordless():
208    # Run with --askpass but no askpass set, succeeds only if passwordless sudo
209    # is available.
210    (ret, _) = subprocess.getstatusoutput("SUDO_ASKPASS=false sudo --askpass true")
211    return ret == 0
212
213
214SHORTHANDS = {
215    "mingw64": "x86_64-pc-windows-gnu",
216    "msvc64": "x86_64-pc-windows-msvc",
217    "armhf": "armv7-unknown-linux-gnueabihf",
218    "aarch64": "aarch64-unknown-linux-gnu",
219    "riscv64": "riscv64gc-unknown-linux-gnu",
220    "x86_64": "x86_64-unknown-linux-gnu",
221    "android": "aarch64-linux-android",
222}
223
224
225class Triple(NamedTuple):
226    """
227    Build triple in cargo format.
228
229    The format is: <arch><sub>-<vendor>-<sys>-<abi>, However, we will treat <arch><sub> as a single
230    arch to simplify things.
231    """
232
233    arch: str
234    vendor: str
235    sys: Optional[str]
236    abi: Optional[str]
237
238    @classmethod
239    def from_shorthand(cls, shorthand: str):
240        "These shorthands make it easier to specify triples on the command line."
241        if "-" in shorthand:
242            triple = shorthand
243        elif shorthand in SHORTHANDS:
244            triple = SHORTHANDS[shorthand]
245        else:
246            raise Exception(f"Not a valid build triple shorthand: {shorthand}")
247        return cls.from_str(triple)
248
249    @classmethod
250    def from_str(cls, triple: str):
251        parts = triple.split("-")
252        if len(parts) < 2:
253            raise Exception(f"Unsupported triple {triple}")
254        return cls(
255            parts[0],
256            parts[1],
257            parts[2] if len(parts) > 2 else None,
258            parts[3] if len(parts) > 3 else None,
259        )
260
261    @classmethod
262    def from_linux_arch(cls, arch: str):
263        "Rough logic to convert the output of `arch` into a corresponding linux build triple."
264        if arch == "armhf":
265            return cls.from_str("armv7-unknown-linux-gnueabihf")
266        else:
267            return cls.from_str(f"{arch}-unknown-linux-gnu")
268
269    @classmethod
270    def host_default(cls):
271        "Returns the default build triple of the host."
272        rustc_info = subprocess.check_output(["rustc", "-vV"], text=True)
273        match = re.search(r"host: (\S+)", rustc_info)
274        if not match:
275            raise Exception(f"Cannot parse rustc info: {rustc_info}")
276        return cls.from_str(match.group(1))
277
278    @property
279    def feature_flag(self):
280        triple_to_shorthand = {v: k for k, v in SHORTHANDS.items()}
281        shorthand = triple_to_shorthand.get(str(self))
282        if not shorthand:
283            raise Exception(f"No feature set for triple {self}")
284        return f"all-{shorthand}"
285
286    @property
287    def target_dir(self):
288        return crosvm_target_dir() / str(self)
289
290    def get_cargo_env(self):
291        """Environment variables to make cargo use the test target."""
292        env: Dict[str, str] = {}
293        cargo_target = str(self)
294        env["CARGO_BUILD_TARGET"] = cargo_target
295        env["CARGO_TARGET_DIR"] = str(self.target_dir)
296        env["CROSVM_TARGET_DIR"] = str(crosvm_target_dir())
297        # Android builds are not fully supported and can only be used to run clippy.
298        # Underlying libraries (e.g. minijail) will be built for linux instead
299        # TODO(denniskempin): This could be better done with [env] in Cargo.toml if it supported
300        # per-target configuration. See https://github.com/rust-lang/cargo/issues/10273
301        if str(self).endswith("-linux-android"):
302            env["MINIJAIL_DO_NOT_BUILD"] = "true"
303            env["MINIJAIL_BINDGEN_TARGET"] = f"{self.arch}-unknown-linux-gnu"
304        return env
305
306    def __str__(self):
307        parts = [self.arch, self.vendor]
308        if self.sys:
309            parts = [*parts, self.sys]
310        if self.abi:
311            parts = [*parts, self.abi]
312        return "-".join(parts)
313
314
315def download_file(url: str, filename: Path, attempts: int = 3):
316    assert attempts > 0
317    while True:
318        attempts -= 1
319        try:
320            urllib.request.urlretrieve(url, filename)
321            return
322        except Exception as e:
323            if attempts == 0:
324                raise e
325            else:
326                print("Download failed:", e)
327
328
329def strip_ansi_escape_sequences(line: str) -> str:
330    return ANSI_ESCAPE.sub("", line)
331
332
333def ensure_packages_exist(*packages: str):
334    """
335    Exits if one of the listed packages does not exist.
336    """
337    missing_packages: List[str] = []
338
339    for package in packages:
340        try:
341            __import__(package)
342        except ImportError:
343            missing_packages.append(package)
344
345    if missing_packages:
346        debian_packages = [f"python3-{p}" for p in missing_packages]
347        package_list = " ".join(debian_packages)
348        print("Missing python dependencies. Please re-run ./tools/install-deps")
349        print(f"Or `sudo apt install {package_list}`")
350        sys.exit(1)
351
352
353@contextlib.contextmanager
354def record_time(title: str):
355    """
356    Records wall-time of how long this context lasts.
357
358    The results will be printed at the end of script executation if --timing-info is specified.
359    """
360    start_time = datetime.datetime.now()
361    try:
362        yield
363    finally:
364        global_time_records.append((title, datetime.datetime.now() - start_time))
365
366
367def print_timing_info():
368    print()
369    print("Timing info:")
370    print()
371    for title, delta in global_time_records:
372        print(f"  {title:20} {delta.total_seconds():.2f}s")
373