xref: /aosp_15_r20/external/pytorch/tools/linter/adapters/flake8_linter.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1from __future__ import annotations
2
3import argparse
4import json
5import logging
6import os
7import re
8import subprocess
9import sys
10import time
11from enum import Enum
12from typing import Any, NamedTuple
13
14
15IS_WINDOWS: bool = os.name == "nt"
16
17
18def eprint(*args: Any, **kwargs: Any) -> None:
19    print(*args, file=sys.stderr, flush=True, **kwargs)
20
21
22class LintSeverity(str, Enum):
23    ERROR = "error"
24    WARNING = "warning"
25    ADVICE = "advice"
26    DISABLED = "disabled"
27
28
29class LintMessage(NamedTuple):
30    path: str | None
31    line: int | None
32    char: int | None
33    code: str
34    severity: LintSeverity
35    name: str
36    original: str | None
37    replacement: str | None
38    description: str | None
39
40
41def as_posix(name: str) -> str:
42    return name.replace("\\", "/") if IS_WINDOWS else name
43
44
45# fmt: off
46# https://www.flake8rules.com/
47DOCUMENTED_IN_FLAKE8RULES: set[str] = {
48    "E101", "E111", "E112", "E113", "E114", "E115", "E116", "E117",
49    "E121", "E122", "E123", "E124", "E125", "E126", "E127", "E128", "E129",
50    "E131", "E133",
51    "E201", "E202", "E203",
52    "E211",
53    "E221", "E222", "E223", "E224", "E225", "E226", "E227", "E228",
54    "E231",
55    "E241", "E242",
56    "E251",
57    "E261", "E262", "E265", "E266",
58    "E271", "E272", "E273", "E274", "E275",
59    "E301", "E302", "E303", "E304", "E305", "E306",
60    "E401", "E402",
61    "E501", "E502",
62    "E701", "E702", "E703", "E704",
63    "E711", "E712", "E713", "E714",
64    "E721", "E722",
65    "E731",
66    "E741", "E742", "E743",
67    "E901", "E902", "E999",
68    "W191",
69    "W291", "W292", "W293",
70    "W391",
71    "W503", "W504",
72    "W601", "W602", "W603", "W604", "W605",
73    "F401", "F402", "F403", "F404", "F405",
74    "F811", "F812",
75    "F821", "F822", "F823",
76    "F831",
77    "F841",
78    "F901",
79    "C901",
80}
81
82# https://pypi.org/project/flake8-comprehensions/#rules
83DOCUMENTED_IN_FLAKE8COMPREHENSIONS: set[str] = {
84    "C400", "C401", "C402", "C403", "C404", "C405", "C406", "C407", "C408", "C409",
85    "C410",
86    "C411", "C412", "C413", "C414", "C415", "C416",
87}
88
89# https://github.com/PyCQA/flake8-bugbear#list-of-warnings
90DOCUMENTED_IN_BUGBEAR: set[str] = {
91    "B001", "B002", "B003", "B004", "B005", "B006", "B007", "B008", "B009", "B010",
92    "B011", "B012", "B013", "B014", "B015",
93    "B301", "B302", "B303", "B304", "B305", "B306",
94    "B901", "B902", "B903", "B950",
95}
96# fmt: on
97
98
99# stdin:2: W802 undefined name 'foo'
100# stdin:3:6: T484 Name 'foo' is not defined
101# stdin:3:-100: W605 invalid escape sequence '\/'
102# stdin:3:1: E302 expected 2 blank lines, found 1
103RESULTS_RE: re.Pattern[str] = re.compile(
104    r"""(?mx)
105    ^
106    (?P<file>.*?):
107    (?P<line>\d+):
108    (?:(?P<column>-?\d+):)?
109    \s(?P<code>\S+?):?
110    \s(?P<message>.*)
111    $
112    """
113)
114
115
116def _test_results_re() -> None:
117    """
118    >>> def t(s): return RESULTS_RE.search(s).groupdict()
119
120    >>> t(r"file.py:80:1: E302 expected 2 blank lines, found 1")
121    ... # doctest: +NORMALIZE_WHITESPACE
122    {'file': 'file.py', 'line': '80', 'column': '1', 'code': 'E302',
123     'message': 'expected 2 blank lines, found 1'}
124
125    >>> t(r"file.py:7:1: P201: Resource `stdout` is acquired but not always released.")
126    ... # doctest: +NORMALIZE_WHITESPACE
127    {'file': 'file.py', 'line': '7', 'column': '1', 'code': 'P201',
128     'message': 'Resource `stdout` is acquired but not always released.'}
129
130    >>> t(r"file.py:8:-10: W605 invalid escape sequence '/'")
131    ... # doctest: +NORMALIZE_WHITESPACE
132    {'file': 'file.py', 'line': '8', 'column': '-10', 'code': 'W605',
133     'message': "invalid escape sequence '/'"}
134    """
135
136
137def _run_command(
138    args: list[str],
139    *,
140    extra_env: dict[str, str] | None,
141) -> subprocess.CompletedProcess[str]:
142    logging.debug(
143        "$ %s",
144        " ".join(
145            ([f"{k}={v}" for (k, v) in extra_env.items()] if extra_env else []) + args
146        ),
147    )
148    start_time = time.monotonic()
149    try:
150        return subprocess.run(
151            args,
152            capture_output=True,
153            check=True,
154            encoding="utf-8",
155        )
156    finally:
157        end_time = time.monotonic()
158        logging.debug("took %dms", (end_time - start_time) * 1000)
159
160
161def run_command(
162    args: list[str],
163    *,
164    extra_env: dict[str, str] | None,
165    retries: int,
166) -> subprocess.CompletedProcess[str]:
167    remaining_retries = retries
168    while True:
169        try:
170            return _run_command(args, extra_env=extra_env)
171        except subprocess.CalledProcessError as err:
172            if remaining_retries == 0 or not re.match(
173                r"^ERROR:1:1: X000 linting with .+ timed out after \d+ seconds",
174                err.stdout,
175            ):
176                raise err
177            remaining_retries -= 1
178            logging.warning(
179                "(%s/%s) Retrying because command failed with: %r",
180                retries - remaining_retries,
181                retries,
182                err,
183            )
184            time.sleep(1)
185
186
187def get_issue_severity(code: str) -> LintSeverity:
188    # "B901": `return x` inside a generator
189    # "B902": Invalid first argument to a method
190    # "B903": __slots__ efficiency
191    # "B950": Line too long
192    # "C4": Flake8 Comprehensions
193    # "C9": Cyclomatic complexity
194    # "E2": PEP8 horizontal whitespace "errors"
195    # "E3": PEP8 blank line "errors"
196    # "E5": PEP8 line length "errors"
197    # "F401": Name imported but unused
198    # "F403": Star imports used
199    # "F405": Name possibly from star imports
200    # "T400": type checking Notes
201    # "T49": internal type checker errors or unmatched messages
202    if any(
203        code.startswith(x)
204        for x in [
205            "B9",
206            "C4",
207            "C9",
208            "E2",
209            "E3",
210            "E5",
211            "F401",
212            "F403",
213            "F405",
214            "T400",
215            "T49",
216        ]
217    ):
218        return LintSeverity.ADVICE
219
220    # "F821": Undefined name
221    # "E999": syntax error
222    if any(code.startswith(x) for x in ["F821", "E999"]):
223        return LintSeverity.ERROR
224
225    # "F": PyFlakes Error
226    # "B": flake8-bugbear Error
227    # "E": PEP8 "Error"
228    # "W": PEP8 Warning
229    # possibly other plugins...
230    return LintSeverity.WARNING
231
232
233def get_issue_documentation_url(code: str) -> str:
234    if code in DOCUMENTED_IN_FLAKE8RULES:
235        return f"https://www.flake8rules.com/rules/{code}.html"
236
237    if code in DOCUMENTED_IN_FLAKE8COMPREHENSIONS:
238        return "https://pypi.org/project/flake8-comprehensions/#rules"
239
240    if code in DOCUMENTED_IN_BUGBEAR:
241        return "https://github.com/PyCQA/flake8-bugbear#list-of-warnings"
242
243    return ""
244
245
246def check_files(
247    filenames: list[str],
248    flake8_plugins_path: str | None,
249    severities: dict[str, LintSeverity],
250    retries: int,
251) -> list[LintMessage]:
252    try:
253        proc = run_command(
254            [sys.executable, "-mflake8", "--exit-zero"] + filenames,
255            extra_env={"FLAKE8_PLUGINS_PATH": flake8_plugins_path}
256            if flake8_plugins_path
257            else None,
258            retries=retries,
259        )
260    except (OSError, subprocess.CalledProcessError) as err:
261        return [
262            LintMessage(
263                path=None,
264                line=None,
265                char=None,
266                code="FLAKE8",
267                severity=LintSeverity.ERROR,
268                name="command-failed",
269                original=None,
270                replacement=None,
271                description=(
272                    f"Failed due to {err.__class__.__name__}:\n{err}"
273                    if not isinstance(err, subprocess.CalledProcessError)
274                    else (
275                        "COMMAND (exit code {returncode})\n"
276                        "{command}\n\n"
277                        "STDERR\n{stderr}\n\n"
278                        "STDOUT\n{stdout}"
279                    ).format(
280                        returncode=err.returncode,
281                        command=" ".join(as_posix(x) for x in err.cmd),
282                        stderr=err.stderr.strip() or "(empty)",
283                        stdout=err.stdout.strip() or "(empty)",
284                    )
285                ),
286            )
287        ]
288
289    return [
290        LintMessage(
291            path=match["file"],
292            name=match["code"],
293            description=f"{match['message']}\nSee {get_issue_documentation_url(match['code'])}",
294            line=int(match["line"]),
295            char=int(match["column"])
296            if match["column"] is not None and not match["column"].startswith("-")
297            else None,
298            code="FLAKE8",
299            severity=severities.get(match["code"]) or get_issue_severity(match["code"]),
300            original=None,
301            replacement=None,
302        )
303        for match in RESULTS_RE.finditer(proc.stdout)
304    ]
305
306
307def main() -> None:
308    parser = argparse.ArgumentParser(
309        description="Flake8 wrapper linter.",
310        fromfile_prefix_chars="@",
311    )
312    parser.add_argument(
313        "--flake8-plugins-path",
314        help="FLAKE8_PLUGINS_PATH env value",
315    )
316    parser.add_argument(
317        "--severity",
318        action="append",
319        help="map code to severity (e.g. `B950:advice`)",
320    )
321    parser.add_argument(
322        "--retries",
323        default=3,
324        type=int,
325        help="times to retry timed out flake8",
326    )
327    parser.add_argument(
328        "--verbose",
329        action="store_true",
330        help="verbose logging",
331    )
332    parser.add_argument(
333        "filenames",
334        nargs="+",
335        help="paths to lint",
336    )
337    args = parser.parse_args()
338
339    logging.basicConfig(
340        format="<%(threadName)s:%(levelname)s> %(message)s",
341        level=logging.NOTSET
342        if args.verbose
343        else logging.DEBUG
344        if len(args.filenames) < 1000
345        else logging.INFO,
346        stream=sys.stderr,
347    )
348
349    flake8_plugins_path = (
350        None
351        if args.flake8_plugins_path is None
352        else os.path.realpath(args.flake8_plugins_path)
353    )
354
355    severities: dict[str, LintSeverity] = {}
356    if args.severity:
357        for severity in args.severity:
358            parts = severity.split(":", 1)
359            assert len(parts) == 2, f"invalid severity `{severity}`"
360            severities[parts[0]] = LintSeverity(parts[1])
361
362    lint_messages = check_files(
363        args.filenames, flake8_plugins_path, severities, args.retries
364    )
365    for lint_message in lint_messages:
366        print(json.dumps(lint_message._asdict()), flush=True)
367
368
369if __name__ == "__main__":
370    main()
371