xref: /aosp_15_r20/external/crosvm/tools/impl/command.py (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1#!/usr/bin/env python3
2# Copyright 2023 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Provides helpers for writing shell-like scripts in Python.
8
9It provides tools to execute commands with similar flexibility as shell scripts.
10"""
11
12import contextlib
13import json
14import os
15import re
16import shlex
17import subprocess
18import sys
19from copy import deepcopy
20from math import ceil
21from multiprocessing.pool import ThreadPool
22from pathlib import Path
23from subprocess import DEVNULL, PIPE, STDOUT  # type: ignore
24from typing import (
25    Any,
26    Callable,
27    Dict,
28    Iterable,
29    List,
30    NamedTuple,
31    Optional,
32    TypeVar,
33    Union,
34)
35from .util import verbose, very_verbose, color_enabled
36
37PathLike = Union[Path, str]
38
39
40# Regex that matches ANSI escape sequences
41ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
42
43
44class CommandResult(NamedTuple):
45    """Results of a command execution as returned by Command.run()"""
46
47    stdout: str
48    stderr: str
49    returncode: int
50
51
52class Command(object):
53    """
54    Simplified subprocess handling for shell-like scripts.
55
56    ## Example Usage
57
58    To run a program on behalf of the user:
59
60    >> cmd("cargo build").fg()
61
62    This will run the program with stdio passed to the user. Developer tools usually run a set of
63    actions on behalf of the user. These should be executed with fg().
64
65    To make calls in the background to gather information use success/stdout/lines:
66
67    >> cmd("git branch").lines()
68    >> cmd("git rev-parse foo").success()
69
70    These will capture all program output. Try to avoid using these to run mutating commands,
71    as they will remain hidden to the user even when using --verbose.
72
73    ## Arguments
74
75    Arguments are provided as a list similar to subprocess.run():
76
77    >>> Command('cargo', 'build', '--workspace')
78    Command('cargo', 'build', '--workspace')
79
80    In contrast to subprocess.run, all strings are split by whitespaces similar to bash:
81
82    >>> Command('cargo build --workspace', '--features foo')
83    Command('cargo', 'build', '--workspace', '--features', 'foo')
84
85    In contrast to bash, globs are *not* evaluated, but can easily be provided using Path:
86
87    >>> Command('ls -l', *Path(CROSVM_ROOT).glob('*.toml'))
88    Command('ls', '-l', ...)
89
90    None or False are ignored to make it easy to include conditional arguments:
91
92    >>> all = False
93    >>> Command('cargo build', '--workspace' if all else None)
94    Command('cargo', 'build')
95
96    ## Nesting
97
98    Commands can be nested, similar to $() subshells in bash. The sub-commands will be executed
99    right away and their output will undergo the usual splitting:
100
101    >>> Command('printf "(%s)"', Command('echo foo bar')).stdout()
102    '(foo)(bar)'
103
104    Arguments can be explicitly quoted to prevent splitting, it applies to both sub-commands
105    as well as strings:
106
107    >>> Command('printf "(%s)"', quoted(Command('echo foo bar'))).stdout()
108    '(foo bar)'
109
110    Commands can also be piped into one another:
111
112    >>> wc = Command('wc')
113    >>> Command('echo "abcd"').pipe(wc('-c')).stdout()
114    '5'
115
116    ## Verbosity
117
118    The --verbose flag is intended for users and will show all command lines executed in the
119    foreground with fg(), it'll also include output of programs run with fg(quiet=True). Commands
120    executed in the background are not shown.
121
122    For script developers, the --very-verbose flag will print full details and output of all
123    executed command lines, including those run hidden from the user.
124    """
125
126    def __init__(
127        self,
128        *args: Any,
129        stdin_cmd: Optional["Command"] = None,
130        env_vars: Dict[str, str] = {},
131        cwd: Optional[Path] = None,
132    ):
133        self.args = Command.__parse_cmd(args)
134        self.stdin_cmd = stdin_cmd
135        self.env_vars = env_vars
136        self.cwd = cwd
137
138    ### Builder API to construct commands
139
140    def with_args(self, *args: Any):
141        """Returns a new Command with added arguments.
142
143        >>> cargo = Command('cargo')
144        >>> cargo.with_args('clippy')
145        Command('cargo', 'clippy')
146        """
147        cmd = deepcopy(self)
148        cmd.args = [*self.args, *Command.__parse_cmd(args)]
149        return cmd
150
151    def with_cwd(self, cwd: Optional[Path]):
152        """Changes the working directory the command is executed in.
153
154        >>> cargo = Command('pwd')
155        >>> cargo.with_cwd('/tmp').stdout()
156        '/tmp'
157        """
158        cmd = deepcopy(self)
159        cmd.cwd = cwd
160        return cmd
161
162    def __call__(self, *args: Any):
163        """Shorthand for Command.with_args"""
164        return self.with_args(*args)
165
166    def with_env(self, key: str, value: Optional[str]):
167        """
168        Returns a command with an added env variable.
169
170        The variable is removed if value is None.
171        """
172        return self.with_envs({key: value})
173
174    def with_envs(self, envs: Union[Dict[str, Optional[str]], Dict[str, str]]):
175        """
176        Returns a command with an added env variable.
177
178        The variable is removed if value is None.
179        """
180        cmd = deepcopy(self)
181        for key, value in envs.items():
182            if value is not None:
183                cmd.env_vars[key] = value
184            else:
185                if key in cmd.env_vars:
186                    del cmd.env_vars[key]
187        return cmd
188
189    def with_path_env(self, new_path: str):
190        """Returns a command with a path added to the PATH variable."""
191        path_var = self.env_vars.get("PATH", os.environ.get("PATH", ""))
192        return self.with_env("PATH", f"{path_var}:{new_path}")
193
194    def with_color_arg(
195        self,
196        always: Optional[str] = None,
197        never: Optional[str] = None,
198    ):
199        """Returns a command with an argument added to pass through enabled/disabled colors."""
200        new_cmd = self
201        if color_enabled():
202            if always:
203                new_cmd = new_cmd(always)
204        else:
205            if never:
206                new_cmd = new_cmd(never)
207        return new_cmd
208
209    def with_color_env(self, var_name: str):
210        """Returns a command with an env var added to pass through enabled/disabled colors."""
211        return self.with_env(var_name, "1" if color_enabled() else "0")
212
213    def with_color_flag(self, flag: str = "--color"):
214        """Returns a command with an added --color=always/never/auto flag."""
215        return self.with_color_arg(always=f"{flag}=always", never=f"{flag}=never")
216
217    def foreach(self, arguments: Iterable[Any], batch_size: int = 1):
218        """
219        Yields a new command for each entry in `arguments`.
220
221        The argument is appended to each command and is intended to be used in
222        conjunction with `parallel()` to execute a command on a list of arguments in
223        parallel.
224
225        >>> parallel(*cmd('echo').foreach((1, 2, 3))).stdout()
226        ['1', '2', '3']
227
228        Arguments can also be batched by setting batch_size > 1, which will append multiple
229        arguments to each command.
230
231        >>> parallel(*cmd('echo').foreach((1, 2, 3), batch_size=2)).stdout()
232        ['1 2', '3']
233
234        """
235        for batch in batched(arguments, batch_size):
236            yield self(*batch)
237
238    def pipe(self, *args: Any):
239        """
240        Pipes the output of this command into another process.
241
242        The target can either be another Command or the argument list to build a new command.
243        """
244        if len(args) == 1 and isinstance(args[0], Command):
245            cmd = Command(stdin_cmd=self)
246            cmd.args = args[0].args
247            cmd.env_vars = self.env_vars.copy()
248            return cmd
249        else:
250            return Command(*args, stdin_cmd=self, env_vars=self.env_vars)
251
252    ### Executing programs in the foreground
253
254    def run_foreground(
255        self,
256        quiet: bool = False,
257        check: bool = True,
258        dry_run: bool = False,
259        style: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
260    ):
261        """
262        Runs a program in the foreground with output streamed to the user.
263
264        >>> Command('true').fg()
265        0
266
267        Non-zero exit codes will trigger an Exception
268
269        >>> Command('false').fg()
270        Traceback (most recent call last):
271        ...
272        subprocess.CalledProcessError...
273
274        But can be disabled:
275
276        >>> Command('false').fg(check=False)
277        1
278
279        Output can be hidden by setting quiet=True:
280
281        >>> Command("echo foo").fg(quiet=True)
282        0
283
284        This will hide the programs stdout and stderr unless the program fails.
285
286        More sophisticated means of outputting stdout/err are available via `Styles`:
287
288        >>> Command("echo foo").fg(style=Styles.live_truncated())
289290        foo
291        0
292
293        Will output the results of the command but truncate output after a few lines. See `Styles`
294        for more options.
295
296        Arguments:
297            quiet: Do not show stdout/stderr unless the program failed.
298            check: Raise an exception if the program returned an error code.
299            style: A function to present the output of the program. See `Styles`
300
301        Returns: The return code of the program.
302        """
303        if dry_run:
304            print(f"Not running: {self}")
305            return 0
306
307        if quiet:
308
309            def quiet_style(process: "subprocess.Popen[str]"):
310                "Won't print anything unless the command failed."
311                assert process.stdout
312                stdout = process.stdout.read()
313                if process.wait() != 0:
314                    print(stdout, end="")
315
316            style = quiet_style
317
318        if verbose():
319            print(f"$ {self}")
320
321        if style is None or verbose():
322            return self.__run(stdout=None, stderr=None, check=check).returncode
323        else:
324            process = self.popen(stdout=PIPE, stderr=STDOUT)
325            style(process)
326            returncode = process.wait()
327            if returncode != 0 and check:
328                assert process.stdout
329                raise subprocess.CalledProcessError(returncode, process.args)
330            return returncode
331
332    def fg(
333        self,
334        quiet: bool = False,
335        check: bool = True,
336        dry_run: bool = False,
337        style: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
338    ):
339        """
340        Shorthand for Command.run_foreground()
341        """
342        return self.run_foreground(quiet, check, dry_run, style)
343
344    def write_to(self, filename: Path):
345        """
346        Writes stdout to the provided file.
347        """
348        if verbose():
349            print(f"$ {self} > {filename}")
350        with open(filename, "w") as file:
351            file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout)
352
353    def append_to(self, filename: Path):
354        """
355        Appends stdout to the provided file.
356        """
357        if verbose():
358            print(f"$ {self} >> {filename}")
359        with open(filename, "a") as file:
360            file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout)
361
362    ### API for executing commands hidden from the user
363
364    def success(self):
365        """
366        Returns True if the program succeeded (i.e. returned 0).
367
368        The program will not be visible to the user unless --very-verbose is specified.
369        """
370        if very_verbose():
371            print(f"$ {self}")
372        return self.__run(stdout=PIPE, stderr=PIPE, check=False).returncode == 0
373
374    def stdout(self, check: bool = True, stderr: int = PIPE):
375        """
376        Runs a program and returns stdout.
377
378        The program will not be visible to the user unless --very-verbose is specified.
379        """
380        if very_verbose():
381            print(f"$ {self}")
382        return self.__run(stdout=PIPE, stderr=stderr, check=check).stdout.strip()
383
384    def json(self, check: bool = True) -> Any:
385        """
386        Runs a program and returns stdout parsed as json.
387
388        The program will not be visible to the user unless --very-verbose is specified.
389        """
390        stdout = self.stdout(check=check)
391        if stdout:
392            return json.loads(stdout)
393        else:
394            return None
395
396    def lines(self, check: bool = True, stderr: int = PIPE):
397        """
398        Runs a program and returns stdout line by line.
399
400        The program will not be visible to the user unless --very-verbose is specified.
401        """
402        return self.stdout(check=check, stderr=stderr).splitlines()
403
404    ### Utilities
405
406    def __str__(self):
407        stdin = ""
408        if self.stdin_cmd:
409            stdin = str(self.stdin_cmd) + " | "
410        return stdin + shlex.join(self.args)
411
412    def __repr__(self):
413        stdin = ""
414        if self.stdin_cmd:
415            stdin = ", stdin_cmd=" + repr(self.stdin_cmd)
416        return f"Command({', '.join(repr(a) for a in self.args)}{stdin})"
417
418    ### Private implementation details
419
420    def __run(
421        self,
422        stdout: Optional[int],
423        stderr: Optional[int],
424        check: bool = True,
425    ) -> CommandResult:
426        "Run this command in subprocess.run()"
427        if very_verbose():
428            print(f"cwd: {Path().resolve()}")
429            for k, v in self.env_vars.items():
430                print(f"env: {k}={v}")
431        result = subprocess.run(
432            self.args,
433            cwd=self.cwd,
434            stdout=stdout,
435            stderr=stderr,
436            stdin=self.__stdin_stream(),
437            env={**os.environ, **self.env_vars},
438            check=check,
439            text=True,
440        )
441        if very_verbose():
442            if result.stdout:
443                for line in result.stdout.splitlines():
444                    print("stdout:", line)
445            if result.stderr:
446                for line in result.stderr.splitlines():
447                    print("stderr:", line)
448            print("returncode:", result.returncode)
449        if check and result.returncode != 0:
450            raise subprocess.CalledProcessError(result.returncode, str(self), result.stdout)
451        return CommandResult(result.stdout, result.stderr, result.returncode)
452
453    def __stdin_stream(self):
454        if self.stdin_cmd:
455            return self.stdin_cmd.popen(stdout=PIPE, stderr=PIPE).stdout
456        return None
457
458    def popen(self, **kwargs: Any) -> "subprocess.Popen[str]":
459        """
460        Runs a program and returns the Popen object of the running process.
461        """
462        return subprocess.Popen(
463            self.args,
464            cwd=self.cwd,
465            stdin=self.__stdin_stream(),
466            env={**os.environ, **self.env_vars},
467            text=True,
468            **kwargs,
469        )
470
471    @staticmethod
472    def __parse_cmd(args: Iterable[Any]) -> List[str]:
473        """Parses command line arguments for Command."""
474        res = [parsed for arg in args for parsed in Command.__parse_cmd_args(arg)]
475        return res
476
477    @staticmethod
478    def __parse_cmd_args(arg: Any) -> List[str]:
479        """Parses a mixed type command line argument into a list of strings."""
480
481        def escape_backslash_if_necessary(input: str) -> str:
482            if os.name == "nt":
483                return input.replace("\\", "\\\\")
484            else:
485                return input
486
487        if isinstance(arg, Path):
488            return [escape_backslash_if_necessary(str(arg))]
489        elif isinstance(arg, QuotedString):
490            return [arg.value]
491        elif isinstance(arg, Command):
492            return [*shlex.split(escape_backslash_if_necessary(arg.stdout()))]
493        elif arg is None or arg is False:
494            return []
495        else:
496            return [*shlex.split(escape_backslash_if_necessary(str(arg)))]
497
498
499class ParallelCommands(object):
500    """
501    Allows commands to be run in parallel.
502
503    >>> parallel(cmd('true'), cmd('false')).fg(check=False)
504    [0, 1]
505
506    >>> parallel(cmd('echo a'), cmd('echo b')).stdout()
507    ['a', 'b']
508    """
509
510    def __init__(self, *commands: Command):
511        self.commands = commands
512
513    def fg(self, quiet: bool = False, check: bool = True):
514        with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool:
515            return pool.map(lambda command: command.fg(quiet=quiet, check=check), self.commands)
516
517    def stdout(self):
518        with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool:
519            return pool.map(lambda command: command.stdout(), self.commands)
520
521    def success(self):
522        results = self.fg(check=False, quiet=True)
523        return all(result == 0 for result in results)
524
525
526class Remote(object):
527    """
528    Wrapper around the cmd() API and allow execution of commands via SSH."
529    """
530
531    def __init__(self, host: str, opts: Dict[str, str]):
532        self.host = host
533        ssh_opts = [f"-o{k}={v}" for k, v in opts.items()]
534        self.ssh_cmd = cmd("ssh", host, "-T", *ssh_opts)
535        self.scp_cmd = cmd("scp", *ssh_opts)
536
537    def ssh(self, cmd: Command, remote_cwd: Optional[Path] = None):
538        # Use huponexit to ensure the process is killed if the connection is lost.
539        # Use shlex to properly quote the command.
540        wrapped_cmd = f"bash -O huponexit -c {shlex.quote(str(cmd))}"
541        if remote_cwd is not None:
542            wrapped_cmd = f"cd {remote_cwd} && {wrapped_cmd}"
543        # The whole command to pass it to SSH for remote execution.
544        return self.ssh_cmd.with_args(quoted(wrapped_cmd))
545
546    def scp(self, sources: List[Path], target: str, quiet: bool = False):
547        return self.scp_cmd.with_args(*sources, f"{self.host}:{target}").fg(quiet=quiet)
548
549
550@contextlib.contextmanager
551def cwd_context(path: PathLike):
552    """Context for temporarily changing the cwd.
553
554    >>> with cwd('/tmp'):
555    ...     os.getcwd()
556    '/tmp'
557
558    """
559    cwd = os.getcwd()
560    try:
561        chdir(path)
562        yield
563    finally:
564        chdir(cwd)
565
566
567def chdir(path: PathLike):
568    if very_verbose():
569        print("cd", path)
570    os.chdir(path)
571
572
573class QuotedString(object):
574    """
575    Prevents the provided string from being split.
576
577    Commands will be executed and their stdout is quoted.
578    """
579
580    def __init__(self, value: Any):
581        if isinstance(value, Command):
582            self.value = value.stdout()
583        else:
584            self.value = str(value)
585
586    def __str__(self):
587        return f'"{self.value}"'
588
589
590T = TypeVar("T")
591
592
593def batched(source: Iterable[T], max_batch_size: int) -> Iterable[List[T]]:
594    """
595    Returns an iterator over batches of elements from source_list.
596
597    >>> list(batched([1, 2, 3, 4, 5], 2))
598    [[1, 2], [3, 4], [5]]
599    """
600    source_list = list(source)
601    # Calculate batch size that spreads elements evenly across all batches
602    batch_count = ceil(len(source_list) / max_batch_size)
603    batch_size = ceil(len(source_list) / batch_count)
604    for index in range(0, len(source_list), batch_size):
605        yield source_list[index : min(index + batch_size, len(source_list))]
606
607
608# Shorthands
609quoted = QuotedString
610cmd = Command
611cwd = cwd_context
612parallel = ParallelCommands
613
614
615if __name__ == "__main__":
616    import doctest
617
618    (failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS)
619    sys.exit(1 if failures > 0 else 0)
620