1from __future__ import annotations 2 3import argparse 4import json 5import logging 6import os 7import re 8import subprocess 9import sys 10import time 11from enum import Enum 12from typing import Any, NamedTuple 13 14 15IS_WINDOWS: bool = os.name == "nt" 16 17 18def eprint(*args: Any, **kwargs: Any) -> None: 19 print(*args, file=sys.stderr, flush=True, **kwargs) 20 21 22class LintSeverity(str, Enum): 23 ERROR = "error" 24 WARNING = "warning" 25 ADVICE = "advice" 26 DISABLED = "disabled" 27 28 29class LintMessage(NamedTuple): 30 path: str | None 31 line: int | None 32 char: int | None 33 code: str 34 severity: LintSeverity 35 name: str 36 original: str | None 37 replacement: str | None 38 description: str | None 39 40 41def as_posix(name: str) -> str: 42 return name.replace("\\", "/") if IS_WINDOWS else name 43 44 45# fmt: off 46# https://www.flake8rules.com/ 47DOCUMENTED_IN_FLAKE8RULES: set[str] = { 48 "E101", "E111", "E112", "E113", "E114", "E115", "E116", "E117", 49 "E121", "E122", "E123", "E124", "E125", "E126", "E127", "E128", "E129", 50 "E131", "E133", 51 "E201", "E202", "E203", 52 "E211", 53 "E221", "E222", "E223", "E224", "E225", "E226", "E227", "E228", 54 "E231", 55 "E241", "E242", 56 "E251", 57 "E261", "E262", "E265", "E266", 58 "E271", "E272", "E273", "E274", "E275", 59 "E301", "E302", "E303", "E304", "E305", "E306", 60 "E401", "E402", 61 "E501", "E502", 62 "E701", "E702", "E703", "E704", 63 "E711", "E712", "E713", "E714", 64 "E721", "E722", 65 "E731", 66 "E741", "E742", "E743", 67 "E901", "E902", "E999", 68 "W191", 69 "W291", "W292", "W293", 70 "W391", 71 "W503", "W504", 72 "W601", "W602", "W603", "W604", "W605", 73 "F401", "F402", "F403", "F404", "F405", 74 "F811", "F812", 75 "F821", "F822", "F823", 76 "F831", 77 "F841", 78 "F901", 79 "C901", 80} 81 82# https://pypi.org/project/flake8-comprehensions/#rules 83DOCUMENTED_IN_FLAKE8COMPREHENSIONS: set[str] = { 84 "C400", "C401", "C402", "C403", "C404", "C405", "C406", "C407", "C408", "C409", 85 "C410", 86 "C411", "C412", "C413", "C414", "C415", "C416", 87} 88 89# https://github.com/PyCQA/flake8-bugbear#list-of-warnings 90DOCUMENTED_IN_BUGBEAR: set[str] = { 91 "B001", "B002", "B003", "B004", "B005", "B006", "B007", "B008", "B009", "B010", 92 "B011", "B012", "B013", "B014", "B015", 93 "B301", "B302", "B303", "B304", "B305", "B306", 94 "B901", "B902", "B903", "B950", 95} 96# fmt: on 97 98 99# stdin:2: W802 undefined name 'foo' 100# stdin:3:6: T484 Name 'foo' is not defined 101# stdin:3:-100: W605 invalid escape sequence '\/' 102# stdin:3:1: E302 expected 2 blank lines, found 1 103RESULTS_RE: re.Pattern[str] = re.compile( 104 r"""(?mx) 105 ^ 106 (?P<file>.*?): 107 (?P<line>\d+): 108 (?:(?P<column>-?\d+):)? 109 \s(?P<code>\S+?):? 110 \s(?P<message>.*) 111 $ 112 """ 113) 114 115 116def _test_results_re() -> None: 117 """ 118 >>> def t(s): return RESULTS_RE.search(s).groupdict() 119 120 >>> t(r"file.py:80:1: E302 expected 2 blank lines, found 1") 121 ... # doctest: +NORMALIZE_WHITESPACE 122 {'file': 'file.py', 'line': '80', 'column': '1', 'code': 'E302', 123 'message': 'expected 2 blank lines, found 1'} 124 125 >>> t(r"file.py:7:1: P201: Resource `stdout` is acquired but not always released.") 126 ... # doctest: +NORMALIZE_WHITESPACE 127 {'file': 'file.py', 'line': '7', 'column': '1', 'code': 'P201', 128 'message': 'Resource `stdout` is acquired but not always released.'} 129 130 >>> t(r"file.py:8:-10: W605 invalid escape sequence '/'") 131 ... # doctest: +NORMALIZE_WHITESPACE 132 {'file': 'file.py', 'line': '8', 'column': '-10', 'code': 'W605', 133 'message': "invalid escape sequence '/'"} 134 """ 135 136 137def _run_command( 138 args: list[str], 139 *, 140 extra_env: dict[str, str] | None, 141) -> subprocess.CompletedProcess[str]: 142 logging.debug( 143 "$ %s", 144 " ".join( 145 ([f"{k}={v}" for (k, v) in extra_env.items()] if extra_env else []) + args 146 ), 147 ) 148 start_time = time.monotonic() 149 try: 150 return subprocess.run( 151 args, 152 capture_output=True, 153 check=True, 154 encoding="utf-8", 155 ) 156 finally: 157 end_time = time.monotonic() 158 logging.debug("took %dms", (end_time - start_time) * 1000) 159 160 161def run_command( 162 args: list[str], 163 *, 164 extra_env: dict[str, str] | None, 165 retries: int, 166) -> subprocess.CompletedProcess[str]: 167 remaining_retries = retries 168 while True: 169 try: 170 return _run_command(args, extra_env=extra_env) 171 except subprocess.CalledProcessError as err: 172 if remaining_retries == 0 or not re.match( 173 r"^ERROR:1:1: X000 linting with .+ timed out after \d+ seconds", 174 err.stdout, 175 ): 176 raise err 177 remaining_retries -= 1 178 logging.warning( 179 "(%s/%s) Retrying because command failed with: %r", 180 retries - remaining_retries, 181 retries, 182 err, 183 ) 184 time.sleep(1) 185 186 187def get_issue_severity(code: str) -> LintSeverity: 188 # "B901": `return x` inside a generator 189 # "B902": Invalid first argument to a method 190 # "B903": __slots__ efficiency 191 # "B950": Line too long 192 # "C4": Flake8 Comprehensions 193 # "C9": Cyclomatic complexity 194 # "E2": PEP8 horizontal whitespace "errors" 195 # "E3": PEP8 blank line "errors" 196 # "E5": PEP8 line length "errors" 197 # "F401": Name imported but unused 198 # "F403": Star imports used 199 # "F405": Name possibly from star imports 200 # "T400": type checking Notes 201 # "T49": internal type checker errors or unmatched messages 202 if any( 203 code.startswith(x) 204 for x in [ 205 "B9", 206 "C4", 207 "C9", 208 "E2", 209 "E3", 210 "E5", 211 "F401", 212 "F403", 213 "F405", 214 "T400", 215 "T49", 216 ] 217 ): 218 return LintSeverity.ADVICE 219 220 # "F821": Undefined name 221 # "E999": syntax error 222 if any(code.startswith(x) for x in ["F821", "E999"]): 223 return LintSeverity.ERROR 224 225 # "F": PyFlakes Error 226 # "B": flake8-bugbear Error 227 # "E": PEP8 "Error" 228 # "W": PEP8 Warning 229 # possibly other plugins... 230 return LintSeverity.WARNING 231 232 233def get_issue_documentation_url(code: str) -> str: 234 if code in DOCUMENTED_IN_FLAKE8RULES: 235 return f"https://www.flake8rules.com/rules/{code}.html" 236 237 if code in DOCUMENTED_IN_FLAKE8COMPREHENSIONS: 238 return "https://pypi.org/project/flake8-comprehensions/#rules" 239 240 if code in DOCUMENTED_IN_BUGBEAR: 241 return "https://github.com/PyCQA/flake8-bugbear#list-of-warnings" 242 243 return "" 244 245 246def check_files( 247 filenames: list[str], 248 flake8_plugins_path: str | None, 249 severities: dict[str, LintSeverity], 250 retries: int, 251) -> list[LintMessage]: 252 try: 253 proc = run_command( 254 [sys.executable, "-mflake8", "--exit-zero"] + filenames, 255 extra_env={"FLAKE8_PLUGINS_PATH": flake8_plugins_path} 256 if flake8_plugins_path 257 else None, 258 retries=retries, 259 ) 260 except (OSError, subprocess.CalledProcessError) as err: 261 return [ 262 LintMessage( 263 path=None, 264 line=None, 265 char=None, 266 code="FLAKE8", 267 severity=LintSeverity.ERROR, 268 name="command-failed", 269 original=None, 270 replacement=None, 271 description=( 272 f"Failed due to {err.__class__.__name__}:\n{err}" 273 if not isinstance(err, subprocess.CalledProcessError) 274 else ( 275 "COMMAND (exit code {returncode})\n" 276 "{command}\n\n" 277 "STDERR\n{stderr}\n\n" 278 "STDOUT\n{stdout}" 279 ).format( 280 returncode=err.returncode, 281 command=" ".join(as_posix(x) for x in err.cmd), 282 stderr=err.stderr.strip() or "(empty)", 283 stdout=err.stdout.strip() or "(empty)", 284 ) 285 ), 286 ) 287 ] 288 289 return [ 290 LintMessage( 291 path=match["file"], 292 name=match["code"], 293 description=f"{match['message']}\nSee {get_issue_documentation_url(match['code'])}", 294 line=int(match["line"]), 295 char=int(match["column"]) 296 if match["column"] is not None and not match["column"].startswith("-") 297 else None, 298 code="FLAKE8", 299 severity=severities.get(match["code"]) or get_issue_severity(match["code"]), 300 original=None, 301 replacement=None, 302 ) 303 for match in RESULTS_RE.finditer(proc.stdout) 304 ] 305 306 307def main() -> None: 308 parser = argparse.ArgumentParser( 309 description="Flake8 wrapper linter.", 310 fromfile_prefix_chars="@", 311 ) 312 parser.add_argument( 313 "--flake8-plugins-path", 314 help="FLAKE8_PLUGINS_PATH env value", 315 ) 316 parser.add_argument( 317 "--severity", 318 action="append", 319 help="map code to severity (e.g. `B950:advice`)", 320 ) 321 parser.add_argument( 322 "--retries", 323 default=3, 324 type=int, 325 help="times to retry timed out flake8", 326 ) 327 parser.add_argument( 328 "--verbose", 329 action="store_true", 330 help="verbose logging", 331 ) 332 parser.add_argument( 333 "filenames", 334 nargs="+", 335 help="paths to lint", 336 ) 337 args = parser.parse_args() 338 339 logging.basicConfig( 340 format="<%(threadName)s:%(levelname)s> %(message)s", 341 level=logging.NOTSET 342 if args.verbose 343 else logging.DEBUG 344 if len(args.filenames) < 1000 345 else logging.INFO, 346 stream=sys.stderr, 347 ) 348 349 flake8_plugins_path = ( 350 None 351 if args.flake8_plugins_path is None 352 else os.path.realpath(args.flake8_plugins_path) 353 ) 354 355 severities: dict[str, LintSeverity] = {} 356 if args.severity: 357 for severity in args.severity: 358 parts = severity.split(":", 1) 359 assert len(parts) == 2, f"invalid severity `{severity}`" 360 severities[parts[0]] = LintSeverity(parts[1]) 361 362 lint_messages = check_files( 363 args.filenames, flake8_plugins_path, severities, args.retries 364 ) 365 for lint_message in lint_messages: 366 print(json.dumps(lint_message._asdict()), flush=True) 367 368 369if __name__ == "__main__": 370 main() 371