1from __future__ import annotations 2 3import json 4import os 5import time 6from typing import Any, TYPE_CHECKING 7 8from ..util.setting import ( 9 CompilerType, 10 JSON_FOLDER_BASE_DIR, 11 TestList, 12 TestPlatform, 13 TestStatusType, 14) 15from ..util.utils import ( 16 detect_compiler_type, 17 print_error, 18 print_time, 19 related_to_test_list, 20) 21from .parser.gcov_coverage_parser import GcovCoverageParser 22from .parser.llvm_coverage_parser import LlvmCoverageParser 23from .print_report import ( 24 file_oriented_report, 25 html_oriented_report, 26 line_oriented_report, 27) 28 29 30if TYPE_CHECKING: 31 from .parser.coverage_record import CoverageRecord 32 33 34# coverage_records: Dict[str, LineInfo] = {} 35covered_lines: dict[str, set[int]] = {} 36uncovered_lines: dict[str, set[int]] = {} 37tests_type: TestStatusType = {"success": set(), "partial": set(), "fail": set()} 38 39 40def transform_file_name( 41 file_path: str, interested_folders: list[str], platform: TestPlatform 42) -> str: 43 remove_patterns: set[str] = {".DEFAULT.cpp", ".AVX.cpp", ".AVX2.cpp"} 44 for pattern in remove_patterns: 45 file_path = file_path.replace(pattern, "") 46 # if user has specified interested folder 47 if interested_folders: 48 for folder in interested_folders: 49 if folder in file_path: 50 return file_path[file_path.find(folder) :] 51 # remove pytorch base folder path 52 if platform == TestPlatform.OSS: 53 from package.oss.utils import get_pytorch_folder # type: ignore[import] 54 55 pytorch_foler = get_pytorch_folder() 56 assert file_path.startswith(pytorch_foler) 57 file_path = file_path[len(pytorch_foler) + 1 :] 58 return file_path 59 60 61def is_intrested_file( 62 file_path: str, interested_folders: list[str], platform: TestPlatform 63) -> bool: 64 ignored_patterns = ["cuda", "aten/gen_aten", "aten/aten_", "build/"] 65 if any(pattern in file_path for pattern in ignored_patterns): 66 return False 67 68 # ignore files that are not belong to pytorch 69 if platform == TestPlatform.OSS: 70 from package.oss.utils import get_pytorch_folder 71 72 if not file_path.startswith(get_pytorch_folder()): 73 return False 74 # if user has specified interested folder 75 if interested_folders: 76 for folder in interested_folders: 77 intersted_folder_path = folder if folder.endswith("/") else f"{folder}/" 78 if intersted_folder_path in file_path: 79 return True 80 return False 81 else: 82 return True 83 84 85def get_json_obj(json_file: str) -> tuple[Any, int]: 86 """ 87 Sometimes at the start of file llvm/gcov will complains "fail to find coverage data", 88 then we need to skip these lines 89 -- success read: 0 - this json file have the full json coverage information 90 -- partial success: 1 - this json file starts with some error prompt, but still have the coverage information 91 -- fail to read: 2 - this json file doesn't have any coverage information 92 """ 93 read_status = -1 94 with open(json_file) as f: 95 lines = f.readlines() 96 for line in lines: 97 try: 98 json_obj = json.loads(line) 99 except json.JSONDecodeError: 100 read_status = 1 101 continue 102 else: 103 if read_status == -1: 104 # not meet jsonDecoderError before, return success 105 read_status = 0 106 return (json_obj, read_status) 107 return None, 2 108 109 110def parse_json(json_file: str, platform: TestPlatform) -> list[CoverageRecord]: 111 print("start parse:", json_file) 112 json_obj, read_status = get_json_obj(json_file) 113 if read_status == 0: 114 tests_type["success"].add(json_file) 115 elif read_status == 1: 116 tests_type["partial"].add(json_file) 117 else: 118 tests_type["fail"].add(json_file) 119 raise RuntimeError( 120 "Fail to do code coverage! Fail to load json file: ", json_file 121 ) 122 123 cov_type = detect_compiler_type(platform) 124 125 coverage_records: list[CoverageRecord] = [] 126 if cov_type == CompilerType.CLANG: 127 coverage_records = LlvmCoverageParser(json_obj).parse("fbcode") 128 # print(coverage_records) 129 elif cov_type == CompilerType.GCC: 130 coverage_records = GcovCoverageParser(json_obj).parse() 131 132 return coverage_records 133 134 135def parse_jsons( 136 test_list: TestList, interested_folders: list[str], platform: TestPlatform 137) -> None: 138 g = os.walk(JSON_FOLDER_BASE_DIR) 139 140 for path, _, file_list in g: 141 for file_name in file_list: 142 if file_name.endswith(".json"): 143 # if compiler is clang, we only analyze related json / when compiler is gcc, we analyze all jsons 144 cov_type = detect_compiler_type(platform) 145 if cov_type == CompilerType.CLANG and not related_to_test_list( 146 file_name, test_list 147 ): 148 continue 149 json_file = os.path.join(path, file_name) 150 try: 151 coverage_records = parse_json(json_file, platform) 152 except RuntimeError: 153 print_error("Fail to load json file: ", json_file) 154 continue 155 # collect information from each target's export file and merge them together: 156 update_coverage(coverage_records, interested_folders, platform) 157 158 159def update_coverage( 160 coverage_records: list[CoverageRecord], 161 interested_folders: list[str], 162 platform: TestPlatform, 163) -> None: 164 for item in coverage_records: 165 # extract information for the record 166 record = item.to_dict() 167 file_path = record["filepath"] 168 if not is_intrested_file(file_path, interested_folders, platform): 169 continue 170 covered_range = record["covered_lines"] 171 uncovered_range = record["uncovered_lines"] 172 # transform file name: remote/13223/caffe2/aten -> caffe2/aten 173 file_path = transform_file_name(file_path, interested_folders, platform) 174 175 # if file not exists, add it into dictionary 176 if file_path not in covered_lines: 177 covered_lines[file_path] = set() 178 if file_path not in uncovered_lines: 179 uncovered_lines[file_path] = set() 180 # update this file's covered and uncovered lines 181 if covered_range is not None: 182 covered_lines[file_path].update(covered_range) 183 if uncovered_range is not None: 184 uncovered_lines[file_path].update(uncovered_range) 185 186 187def update_set() -> None: 188 for file_name in covered_lines: 189 # difference_update 190 uncovered_lines[file_name].difference_update(covered_lines[file_name]) 191 192 193def summarize_jsons( 194 test_list: TestList, 195 interested_folders: list[str], 196 coverage_only: list[str], 197 platform: TestPlatform, 198) -> None: 199 start_time = time.time() 200 if detect_compiler_type(platform) == CompilerType.GCC: 201 html_oriented_report() 202 else: 203 parse_jsons(test_list, interested_folders, platform) 204 update_set() 205 line_oriented_report( 206 test_list, 207 tests_type, 208 interested_folders, 209 coverage_only, 210 covered_lines, 211 uncovered_lines, 212 ) 213 file_oriented_report( 214 test_list, 215 tests_type, 216 interested_folders, 217 coverage_only, 218 covered_lines, 219 uncovered_lines, 220 ) 221 print_time("summary jsons take time: ", start_time) 222