1#!/usr/bin/env python3 2# Copyright 2023 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Provides helpers for writing shell-like scripts in Python. 8 9It provides tools to execute commands with similar flexibility as shell scripts. 10""" 11 12import contextlib 13import json 14import os 15import re 16import shlex 17import subprocess 18import sys 19from copy import deepcopy 20from math import ceil 21from multiprocessing.pool import ThreadPool 22from pathlib import Path 23from subprocess import DEVNULL, PIPE, STDOUT # type: ignore 24from typing import ( 25 Any, 26 Callable, 27 Dict, 28 Iterable, 29 List, 30 NamedTuple, 31 Optional, 32 TypeVar, 33 Union, 34) 35from .util import verbose, very_verbose, color_enabled 36 37PathLike = Union[Path, str] 38 39 40# Regex that matches ANSI escape sequences 41ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") 42 43 44class CommandResult(NamedTuple): 45 """Results of a command execution as returned by Command.run()""" 46 47 stdout: str 48 stderr: str 49 returncode: int 50 51 52class Command(object): 53 """ 54 Simplified subprocess handling for shell-like scripts. 55 56 ## Example Usage 57 58 To run a program on behalf of the user: 59 60 >> cmd("cargo build").fg() 61 62 This will run the program with stdio passed to the user. Developer tools usually run a set of 63 actions on behalf of the user. These should be executed with fg(). 64 65 To make calls in the background to gather information use success/stdout/lines: 66 67 >> cmd("git branch").lines() 68 >> cmd("git rev-parse foo").success() 69 70 These will capture all program output. Try to avoid using these to run mutating commands, 71 as they will remain hidden to the user even when using --verbose. 72 73 ## Arguments 74 75 Arguments are provided as a list similar to subprocess.run(): 76 77 >>> Command('cargo', 'build', '--workspace') 78 Command('cargo', 'build', '--workspace') 79 80 In contrast to subprocess.run, all strings are split by whitespaces similar to bash: 81 82 >>> Command('cargo build --workspace', '--features foo') 83 Command('cargo', 'build', '--workspace', '--features', 'foo') 84 85 In contrast to bash, globs are *not* evaluated, but can easily be provided using Path: 86 87 >>> Command('ls -l', *Path(CROSVM_ROOT).glob('*.toml')) 88 Command('ls', '-l', ...) 89 90 None or False are ignored to make it easy to include conditional arguments: 91 92 >>> all = False 93 >>> Command('cargo build', '--workspace' if all else None) 94 Command('cargo', 'build') 95 96 ## Nesting 97 98 Commands can be nested, similar to $() subshells in bash. The sub-commands will be executed 99 right away and their output will undergo the usual splitting: 100 101 >>> Command('printf "(%s)"', Command('echo foo bar')).stdout() 102 '(foo)(bar)' 103 104 Arguments can be explicitly quoted to prevent splitting, it applies to both sub-commands 105 as well as strings: 106 107 >>> Command('printf "(%s)"', quoted(Command('echo foo bar'))).stdout() 108 '(foo bar)' 109 110 Commands can also be piped into one another: 111 112 >>> wc = Command('wc') 113 >>> Command('echo "abcd"').pipe(wc('-c')).stdout() 114 '5' 115 116 ## Verbosity 117 118 The --verbose flag is intended for users and will show all command lines executed in the 119 foreground with fg(), it'll also include output of programs run with fg(quiet=True). Commands 120 executed in the background are not shown. 121 122 For script developers, the --very-verbose flag will print full details and output of all 123 executed command lines, including those run hidden from the user. 124 """ 125 126 def __init__( 127 self, 128 *args: Any, 129 stdin_cmd: Optional["Command"] = None, 130 env_vars: Dict[str, str] = {}, 131 cwd: Optional[Path] = None, 132 ): 133 self.args = Command.__parse_cmd(args) 134 self.stdin_cmd = stdin_cmd 135 self.env_vars = env_vars 136 self.cwd = cwd 137 138 ### Builder API to construct commands 139 140 def with_args(self, *args: Any): 141 """Returns a new Command with added arguments. 142 143 >>> cargo = Command('cargo') 144 >>> cargo.with_args('clippy') 145 Command('cargo', 'clippy') 146 """ 147 cmd = deepcopy(self) 148 cmd.args = [*self.args, *Command.__parse_cmd(args)] 149 return cmd 150 151 def with_cwd(self, cwd: Optional[Path]): 152 """Changes the working directory the command is executed in. 153 154 >>> cargo = Command('pwd') 155 >>> cargo.with_cwd('/tmp').stdout() 156 '/tmp' 157 """ 158 cmd = deepcopy(self) 159 cmd.cwd = cwd 160 return cmd 161 162 def __call__(self, *args: Any): 163 """Shorthand for Command.with_args""" 164 return self.with_args(*args) 165 166 def with_env(self, key: str, value: Optional[str]): 167 """ 168 Returns a command with an added env variable. 169 170 The variable is removed if value is None. 171 """ 172 return self.with_envs({key: value}) 173 174 def with_envs(self, envs: Union[Dict[str, Optional[str]], Dict[str, str]]): 175 """ 176 Returns a command with an added env variable. 177 178 The variable is removed if value is None. 179 """ 180 cmd = deepcopy(self) 181 for key, value in envs.items(): 182 if value is not None: 183 cmd.env_vars[key] = value 184 else: 185 if key in cmd.env_vars: 186 del cmd.env_vars[key] 187 return cmd 188 189 def with_path_env(self, new_path: str): 190 """Returns a command with a path added to the PATH variable.""" 191 path_var = self.env_vars.get("PATH", os.environ.get("PATH", "")) 192 return self.with_env("PATH", f"{path_var}:{new_path}") 193 194 def with_color_arg( 195 self, 196 always: Optional[str] = None, 197 never: Optional[str] = None, 198 ): 199 """Returns a command with an argument added to pass through enabled/disabled colors.""" 200 new_cmd = self 201 if color_enabled(): 202 if always: 203 new_cmd = new_cmd(always) 204 else: 205 if never: 206 new_cmd = new_cmd(never) 207 return new_cmd 208 209 def with_color_env(self, var_name: str): 210 """Returns a command with an env var added to pass through enabled/disabled colors.""" 211 return self.with_env(var_name, "1" if color_enabled() else "0") 212 213 def with_color_flag(self, flag: str = "--color"): 214 """Returns a command with an added --color=always/never/auto flag.""" 215 return self.with_color_arg(always=f"{flag}=always", never=f"{flag}=never") 216 217 def foreach(self, arguments: Iterable[Any], batch_size: int = 1): 218 """ 219 Yields a new command for each entry in `arguments`. 220 221 The argument is appended to each command and is intended to be used in 222 conjunction with `parallel()` to execute a command on a list of arguments in 223 parallel. 224 225 >>> parallel(*cmd('echo').foreach((1, 2, 3))).stdout() 226 ['1', '2', '3'] 227 228 Arguments can also be batched by setting batch_size > 1, which will append multiple 229 arguments to each command. 230 231 >>> parallel(*cmd('echo').foreach((1, 2, 3), batch_size=2)).stdout() 232 ['1 2', '3'] 233 234 """ 235 for batch in batched(arguments, batch_size): 236 yield self(*batch) 237 238 def pipe(self, *args: Any): 239 """ 240 Pipes the output of this command into another process. 241 242 The target can either be another Command or the argument list to build a new command. 243 """ 244 if len(args) == 1 and isinstance(args[0], Command): 245 cmd = Command(stdin_cmd=self) 246 cmd.args = args[0].args 247 cmd.env_vars = self.env_vars.copy() 248 return cmd 249 else: 250 return Command(*args, stdin_cmd=self, env_vars=self.env_vars) 251 252 ### Executing programs in the foreground 253 254 def run_foreground( 255 self, 256 quiet: bool = False, 257 check: bool = True, 258 dry_run: bool = False, 259 style: Optional[Callable[["subprocess.Popen[str]"], None]] = None, 260 ): 261 """ 262 Runs a program in the foreground with output streamed to the user. 263 264 >>> Command('true').fg() 265 0 266 267 Non-zero exit codes will trigger an Exception 268 269 >>> Command('false').fg() 270 Traceback (most recent call last): 271 ... 272 subprocess.CalledProcessError... 273 274 But can be disabled: 275 276 >>> Command('false').fg(check=False) 277 1 278 279 Output can be hidden by setting quiet=True: 280 281 >>> Command("echo foo").fg(quiet=True) 282 0 283 284 This will hide the programs stdout and stderr unless the program fails. 285 286 More sophisticated means of outputting stdout/err are available via `Styles`: 287 288 >>> Command("echo foo").fg(style=Styles.live_truncated()) 289 … 290 foo 291 0 292 293 Will output the results of the command but truncate output after a few lines. See `Styles` 294 for more options. 295 296 Arguments: 297 quiet: Do not show stdout/stderr unless the program failed. 298 check: Raise an exception if the program returned an error code. 299 style: A function to present the output of the program. See `Styles` 300 301 Returns: The return code of the program. 302 """ 303 if dry_run: 304 print(f"Not running: {self}") 305 return 0 306 307 if quiet: 308 309 def quiet_style(process: "subprocess.Popen[str]"): 310 "Won't print anything unless the command failed." 311 assert process.stdout 312 stdout = process.stdout.read() 313 if process.wait() != 0: 314 print(stdout, end="") 315 316 style = quiet_style 317 318 if verbose(): 319 print(f"$ {self}") 320 321 if style is None or verbose(): 322 return self.__run(stdout=None, stderr=None, check=check).returncode 323 else: 324 process = self.popen(stdout=PIPE, stderr=STDOUT) 325 style(process) 326 returncode = process.wait() 327 if returncode != 0 and check: 328 assert process.stdout 329 raise subprocess.CalledProcessError(returncode, process.args) 330 return returncode 331 332 def fg( 333 self, 334 quiet: bool = False, 335 check: bool = True, 336 dry_run: bool = False, 337 style: Optional[Callable[["subprocess.Popen[str]"], None]] = None, 338 ): 339 """ 340 Shorthand for Command.run_foreground() 341 """ 342 return self.run_foreground(quiet, check, dry_run, style) 343 344 def write_to(self, filename: Path): 345 """ 346 Writes stdout to the provided file. 347 """ 348 if verbose(): 349 print(f"$ {self} > {filename}") 350 with open(filename, "w") as file: 351 file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout) 352 353 def append_to(self, filename: Path): 354 """ 355 Appends stdout to the provided file. 356 """ 357 if verbose(): 358 print(f"$ {self} >> {filename}") 359 with open(filename, "a") as file: 360 file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout) 361 362 ### API for executing commands hidden from the user 363 364 def success(self): 365 """ 366 Returns True if the program succeeded (i.e. returned 0). 367 368 The program will not be visible to the user unless --very-verbose is specified. 369 """ 370 if very_verbose(): 371 print(f"$ {self}") 372 return self.__run(stdout=PIPE, stderr=PIPE, check=False).returncode == 0 373 374 def stdout(self, check: bool = True, stderr: int = PIPE): 375 """ 376 Runs a program and returns stdout. 377 378 The program will not be visible to the user unless --very-verbose is specified. 379 """ 380 if very_verbose(): 381 print(f"$ {self}") 382 return self.__run(stdout=PIPE, stderr=stderr, check=check).stdout.strip() 383 384 def json(self, check: bool = True) -> Any: 385 """ 386 Runs a program and returns stdout parsed as json. 387 388 The program will not be visible to the user unless --very-verbose is specified. 389 """ 390 stdout = self.stdout(check=check) 391 if stdout: 392 return json.loads(stdout) 393 else: 394 return None 395 396 def lines(self, check: bool = True, stderr: int = PIPE): 397 """ 398 Runs a program and returns stdout line by line. 399 400 The program will not be visible to the user unless --very-verbose is specified. 401 """ 402 return self.stdout(check=check, stderr=stderr).splitlines() 403 404 ### Utilities 405 406 def __str__(self): 407 stdin = "" 408 if self.stdin_cmd: 409 stdin = str(self.stdin_cmd) + " | " 410 return stdin + shlex.join(self.args) 411 412 def __repr__(self): 413 stdin = "" 414 if self.stdin_cmd: 415 stdin = ", stdin_cmd=" + repr(self.stdin_cmd) 416 return f"Command({', '.join(repr(a) for a in self.args)}{stdin})" 417 418 ### Private implementation details 419 420 def __run( 421 self, 422 stdout: Optional[int], 423 stderr: Optional[int], 424 check: bool = True, 425 ) -> CommandResult: 426 "Run this command in subprocess.run()" 427 if very_verbose(): 428 print(f"cwd: {Path().resolve()}") 429 for k, v in self.env_vars.items(): 430 print(f"env: {k}={v}") 431 result = subprocess.run( 432 self.args, 433 cwd=self.cwd, 434 stdout=stdout, 435 stderr=stderr, 436 stdin=self.__stdin_stream(), 437 env={**os.environ, **self.env_vars}, 438 check=check, 439 text=True, 440 ) 441 if very_verbose(): 442 if result.stdout: 443 for line in result.stdout.splitlines(): 444 print("stdout:", line) 445 if result.stderr: 446 for line in result.stderr.splitlines(): 447 print("stderr:", line) 448 print("returncode:", result.returncode) 449 if check and result.returncode != 0: 450 raise subprocess.CalledProcessError(result.returncode, str(self), result.stdout) 451 return CommandResult(result.stdout, result.stderr, result.returncode) 452 453 def __stdin_stream(self): 454 if self.stdin_cmd: 455 return self.stdin_cmd.popen(stdout=PIPE, stderr=PIPE).stdout 456 return None 457 458 def popen(self, **kwargs: Any) -> "subprocess.Popen[str]": 459 """ 460 Runs a program and returns the Popen object of the running process. 461 """ 462 return subprocess.Popen( 463 self.args, 464 cwd=self.cwd, 465 stdin=self.__stdin_stream(), 466 env={**os.environ, **self.env_vars}, 467 text=True, 468 **kwargs, 469 ) 470 471 @staticmethod 472 def __parse_cmd(args: Iterable[Any]) -> List[str]: 473 """Parses command line arguments for Command.""" 474 res = [parsed for arg in args for parsed in Command.__parse_cmd_args(arg)] 475 return res 476 477 @staticmethod 478 def __parse_cmd_args(arg: Any) -> List[str]: 479 """Parses a mixed type command line argument into a list of strings.""" 480 481 def escape_backslash_if_necessary(input: str) -> str: 482 if os.name == "nt": 483 return input.replace("\\", "\\\\") 484 else: 485 return input 486 487 if isinstance(arg, Path): 488 return [escape_backslash_if_necessary(str(arg))] 489 elif isinstance(arg, QuotedString): 490 return [arg.value] 491 elif isinstance(arg, Command): 492 return [*shlex.split(escape_backslash_if_necessary(arg.stdout()))] 493 elif arg is None or arg is False: 494 return [] 495 else: 496 return [*shlex.split(escape_backslash_if_necessary(str(arg)))] 497 498 499class ParallelCommands(object): 500 """ 501 Allows commands to be run in parallel. 502 503 >>> parallel(cmd('true'), cmd('false')).fg(check=False) 504 [0, 1] 505 506 >>> parallel(cmd('echo a'), cmd('echo b')).stdout() 507 ['a', 'b'] 508 """ 509 510 def __init__(self, *commands: Command): 511 self.commands = commands 512 513 def fg(self, quiet: bool = False, check: bool = True): 514 with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool: 515 return pool.map(lambda command: command.fg(quiet=quiet, check=check), self.commands) 516 517 def stdout(self): 518 with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool: 519 return pool.map(lambda command: command.stdout(), self.commands) 520 521 def success(self): 522 results = self.fg(check=False, quiet=True) 523 return all(result == 0 for result in results) 524 525 526class Remote(object): 527 """ 528 Wrapper around the cmd() API and allow execution of commands via SSH." 529 """ 530 531 def __init__(self, host: str, opts: Dict[str, str]): 532 self.host = host 533 ssh_opts = [f"-o{k}={v}" for k, v in opts.items()] 534 self.ssh_cmd = cmd("ssh", host, "-T", *ssh_opts) 535 self.scp_cmd = cmd("scp", *ssh_opts) 536 537 def ssh(self, cmd: Command, remote_cwd: Optional[Path] = None): 538 # Use huponexit to ensure the process is killed if the connection is lost. 539 # Use shlex to properly quote the command. 540 wrapped_cmd = f"bash -O huponexit -c {shlex.quote(str(cmd))}" 541 if remote_cwd is not None: 542 wrapped_cmd = f"cd {remote_cwd} && {wrapped_cmd}" 543 # The whole command to pass it to SSH for remote execution. 544 return self.ssh_cmd.with_args(quoted(wrapped_cmd)) 545 546 def scp(self, sources: List[Path], target: str, quiet: bool = False): 547 return self.scp_cmd.with_args(*sources, f"{self.host}:{target}").fg(quiet=quiet) 548 549 550@contextlib.contextmanager 551def cwd_context(path: PathLike): 552 """Context for temporarily changing the cwd. 553 554 >>> with cwd('/tmp'): 555 ... os.getcwd() 556 '/tmp' 557 558 """ 559 cwd = os.getcwd() 560 try: 561 chdir(path) 562 yield 563 finally: 564 chdir(cwd) 565 566 567def chdir(path: PathLike): 568 if very_verbose(): 569 print("cd", path) 570 os.chdir(path) 571 572 573class QuotedString(object): 574 """ 575 Prevents the provided string from being split. 576 577 Commands will be executed and their stdout is quoted. 578 """ 579 580 def __init__(self, value: Any): 581 if isinstance(value, Command): 582 self.value = value.stdout() 583 else: 584 self.value = str(value) 585 586 def __str__(self): 587 return f'"{self.value}"' 588 589 590T = TypeVar("T") 591 592 593def batched(source: Iterable[T], max_batch_size: int) -> Iterable[List[T]]: 594 """ 595 Returns an iterator over batches of elements from source_list. 596 597 >>> list(batched([1, 2, 3, 4, 5], 2)) 598 [[1, 2], [3, 4], [5]] 599 """ 600 source_list = list(source) 601 # Calculate batch size that spreads elements evenly across all batches 602 batch_count = ceil(len(source_list) / max_batch_size) 603 batch_size = ceil(len(source_list) / batch_count) 604 for index in range(0, len(source_list), batch_size): 605 yield source_list[index : min(index + batch_size, len(source_list))] 606 607 608# Shorthands 609quoted = QuotedString 610cmd = Command 611cwd = cwd_context 612parallel = ParallelCommands 613 614 615if __name__ == "__main__": 616 import doctest 617 618 (failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS) 619 sys.exit(1 if failures > 0 else 0) 620