xref: /aosp_15_r20/external/pytorch/tools/code_coverage/package/tool/summarize_jsons.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1from __future__ import annotations
2
3import json
4import os
5import time
6from typing import Any, TYPE_CHECKING
7
8from ..util.setting import (
9    CompilerType,
10    JSON_FOLDER_BASE_DIR,
11    TestList,
12    TestPlatform,
13    TestStatusType,
14)
15from ..util.utils import (
16    detect_compiler_type,
17    print_error,
18    print_time,
19    related_to_test_list,
20)
21from .parser.gcov_coverage_parser import GcovCoverageParser
22from .parser.llvm_coverage_parser import LlvmCoverageParser
23from .print_report import (
24    file_oriented_report,
25    html_oriented_report,
26    line_oriented_report,
27)
28
29
30if TYPE_CHECKING:
31    from .parser.coverage_record import CoverageRecord
32
33
34# coverage_records: Dict[str, LineInfo] = {}
35covered_lines: dict[str, set[int]] = {}
36uncovered_lines: dict[str, set[int]] = {}
37tests_type: TestStatusType = {"success": set(), "partial": set(), "fail": set()}
38
39
40def transform_file_name(
41    file_path: str, interested_folders: list[str], platform: TestPlatform
42) -> str:
43    remove_patterns: set[str] = {".DEFAULT.cpp", ".AVX.cpp", ".AVX2.cpp"}
44    for pattern in remove_patterns:
45        file_path = file_path.replace(pattern, "")
46    # if user has specified interested folder
47    if interested_folders:
48        for folder in interested_folders:
49            if folder in file_path:
50                return file_path[file_path.find(folder) :]
51    # remove pytorch base folder path
52    if platform == TestPlatform.OSS:
53        from package.oss.utils import get_pytorch_folder  # type: ignore[import]
54
55        pytorch_foler = get_pytorch_folder()
56        assert file_path.startswith(pytorch_foler)
57        file_path = file_path[len(pytorch_foler) + 1 :]
58    return file_path
59
60
61def is_intrested_file(
62    file_path: str, interested_folders: list[str], platform: TestPlatform
63) -> bool:
64    ignored_patterns = ["cuda", "aten/gen_aten", "aten/aten_", "build/"]
65    if any(pattern in file_path for pattern in ignored_patterns):
66        return False
67
68    # ignore files that are not belong to pytorch
69    if platform == TestPlatform.OSS:
70        from package.oss.utils import get_pytorch_folder
71
72        if not file_path.startswith(get_pytorch_folder()):
73            return False
74    # if user has specified interested folder
75    if interested_folders:
76        for folder in interested_folders:
77            intersted_folder_path = folder if folder.endswith("/") else f"{folder}/"
78            if intersted_folder_path in file_path:
79                return True
80        return False
81    else:
82        return True
83
84
85def get_json_obj(json_file: str) -> tuple[Any, int]:
86    """
87    Sometimes at the start of file llvm/gcov will complains "fail to find coverage data",
88    then we need to skip these lines
89      -- success read: 0      -  this json file have the full json coverage information
90      -- partial success: 1   -  this json file starts with some error prompt, but still have the coverage information
91      -- fail to read: 2      -  this json file doesn't have any coverage information
92    """
93    read_status = -1
94    with open(json_file) as f:
95        lines = f.readlines()
96        for line in lines:
97            try:
98                json_obj = json.loads(line)
99            except json.JSONDecodeError:
100                read_status = 1
101                continue
102            else:
103                if read_status == -1:
104                    # not meet jsonDecoderError before, return success
105                    read_status = 0
106                return (json_obj, read_status)
107    return None, 2
108
109
110def parse_json(json_file: str, platform: TestPlatform) -> list[CoverageRecord]:
111    print("start parse:", json_file)
112    json_obj, read_status = get_json_obj(json_file)
113    if read_status == 0:
114        tests_type["success"].add(json_file)
115    elif read_status == 1:
116        tests_type["partial"].add(json_file)
117    else:
118        tests_type["fail"].add(json_file)
119        raise RuntimeError(
120            "Fail to do code coverage! Fail to load json file: ", json_file
121        )
122
123    cov_type = detect_compiler_type(platform)
124
125    coverage_records: list[CoverageRecord] = []
126    if cov_type == CompilerType.CLANG:
127        coverage_records = LlvmCoverageParser(json_obj).parse("fbcode")
128        # print(coverage_records)
129    elif cov_type == CompilerType.GCC:
130        coverage_records = GcovCoverageParser(json_obj).parse()
131
132    return coverage_records
133
134
135def parse_jsons(
136    test_list: TestList, interested_folders: list[str], platform: TestPlatform
137) -> None:
138    g = os.walk(JSON_FOLDER_BASE_DIR)
139
140    for path, _, file_list in g:
141        for file_name in file_list:
142            if file_name.endswith(".json"):
143                # if compiler is clang, we only analyze related json / when compiler is gcc, we analyze all jsons
144                cov_type = detect_compiler_type(platform)
145                if cov_type == CompilerType.CLANG and not related_to_test_list(
146                    file_name, test_list
147                ):
148                    continue
149                json_file = os.path.join(path, file_name)
150                try:
151                    coverage_records = parse_json(json_file, platform)
152                except RuntimeError:
153                    print_error("Fail to load json file: ", json_file)
154                    continue
155                # collect information from each target's export file and merge them together:
156                update_coverage(coverage_records, interested_folders, platform)
157
158
159def update_coverage(
160    coverage_records: list[CoverageRecord],
161    interested_folders: list[str],
162    platform: TestPlatform,
163) -> None:
164    for item in coverage_records:
165        # extract information for the record
166        record = item.to_dict()
167        file_path = record["filepath"]
168        if not is_intrested_file(file_path, interested_folders, platform):
169            continue
170        covered_range = record["covered_lines"]
171        uncovered_range = record["uncovered_lines"]
172        # transform file name: remote/13223/caffe2/aten -> caffe2/aten
173        file_path = transform_file_name(file_path, interested_folders, platform)
174
175        # if file not exists, add it into dictionary
176        if file_path not in covered_lines:
177            covered_lines[file_path] = set()
178        if file_path not in uncovered_lines:
179            uncovered_lines[file_path] = set()
180        # update this file's covered and uncovered lines
181        if covered_range is not None:
182            covered_lines[file_path].update(covered_range)
183        if uncovered_range is not None:
184            uncovered_lines[file_path].update(uncovered_range)
185
186
187def update_set() -> None:
188    for file_name in covered_lines:
189        # difference_update
190        uncovered_lines[file_name].difference_update(covered_lines[file_name])
191
192
193def summarize_jsons(
194    test_list: TestList,
195    interested_folders: list[str],
196    coverage_only: list[str],
197    platform: TestPlatform,
198) -> None:
199    start_time = time.time()
200    if detect_compiler_type(platform) == CompilerType.GCC:
201        html_oriented_report()
202    else:
203        parse_jsons(test_list, interested_folders, platform)
204        update_set()
205        line_oriented_report(
206            test_list,
207            tests_type,
208            interested_folders,
209            coverage_only,
210            covered_lines,
211            uncovered_lines,
212        )
213        file_oriented_report(
214            test_list,
215            tests_type,
216            interested_folders,
217            coverage_only,
218            covered_lines,
219            uncovered_lines,
220        )
221    print_time("summary jsons take time: ", start_time)
222