1from __future__ import annotations 2 3import argparse 4import concurrent.futures 5import json 6import logging 7import os 8import sys 9from enum import Enum 10from typing import Any, NamedTuple 11 12 13IS_WINDOWS: bool = os.name == "nt" 14 15 16def eprint(*args: Any, **kwargs: Any) -> None: 17 print(*args, file=sys.stderr, flush=True, **kwargs) 18 19 20class LintSeverity(str, Enum): 21 ERROR = "error" 22 WARNING = "warning" 23 ADVICE = "advice" 24 DISABLED = "disabled" 25 26 27class LintMessage(NamedTuple): 28 path: str | None 29 line: int | None 30 char: int | None 31 code: str 32 severity: LintSeverity 33 name: str 34 original: str | None 35 replacement: str | None 36 description: str | None 37 38 39def check_file(filename: str) -> list[LintMessage]: 40 with open(filename, "rb") as f: 41 original = f.read().decode("utf-8") 42 replacement = "" 43 with open(filename) as f: 44 lines = f.readlines() 45 for line in lines: 46 if len(line.strip()) > 0: 47 replacement += line 48 replacement += "\n" * 3 49 replacement = replacement[:-3] 50 51 if replacement == original: 52 return [] 53 54 return [ 55 LintMessage( 56 path=filename, 57 line=None, 58 char=None, 59 code="MERGE_CONFLICTLESS_CSV", 60 severity=LintSeverity.WARNING, 61 name="format", 62 original=original, 63 replacement=replacement, 64 description="Run `lintrunner -a` to apply this patch.", 65 ) 66 ] 67 68 69def main() -> None: 70 parser = argparse.ArgumentParser( 71 description="Format csv files to have 3 lines of space between each line to prevent merge conflicts.", 72 fromfile_prefix_chars="@", 73 ) 74 parser.add_argument( 75 "--verbose", 76 action="store_true", 77 help="verbose logging", 78 ) 79 parser.add_argument( 80 "filenames", 81 nargs="+", 82 help="paths to lint", 83 ) 84 args = parser.parse_args() 85 86 logging.basicConfig( 87 format="<%(processName)s:%(levelname)s> %(message)s", 88 level=logging.NOTSET 89 if args.verbose 90 else logging.DEBUG 91 if len(args.filenames) < 1000 92 else logging.INFO, 93 stream=sys.stderr, 94 ) 95 96 with concurrent.futures.ProcessPoolExecutor( 97 max_workers=os.cpu_count(), 98 ) as executor: 99 futures = {executor.submit(check_file, x): x for x in args.filenames} 100 for future in concurrent.futures.as_completed(futures): 101 try: 102 for lint_message in future.result(): 103 print(json.dumps(lint_message._asdict()), flush=True) 104 except Exception: 105 logging.critical('Failed at "%s".', futures[future]) 106 raise 107 108 109if __name__ == "__main__": 110 main() 111