1#!/usr/bin/env python3 2 3from __future__ import annotations 4 5import argparse 6import json 7import os 8import re 9from collections import defaultdict 10from difflib import SequenceMatcher 11from typing import Any 12 13import requests 14from setuptools import distutils # type: ignore[import] 15 16 17ALL_SKIPPED_THRESHOLD = 100 18SIMILARITY_THRESHOLD = 0.75 19FAILURE_CHAIN_THRESHOLD = 2 20MAX_CONCURRENT_ALERTS = 1 21FAILED_JOB_PATTERN = ( 22 r"^- \[(.*)\]\(.*\) failed consecutively starting with commit \[.*\]\(.*\)$" 23) 24 25PENDING = "pending" 26NEUTRAL = "neutral" 27SKIPPED = "skipped" 28SUCCESS = "success" 29FAILURE = "failure" 30CANCELED = "canceled" 31 32ISSUES_WITH_LABEL_QUERY = """ 33query ($owner: String!, $name: String!, $labels: [String!]) { 34 repository(owner: $owner, name: $name, followRenames: false) { 35 issues(last: 10, labels: $labels, states: [OPEN]) { 36 nodes { 37 id 38 title 39 closed 40 number 41 body 42 createdAt 43 comments(first: 100) { 44 nodes { 45 bodyText 46 databaseId 47 } 48 } 49 } 50 } 51 } 52} 53""" 54 55NUM_ISSUES_QUERY = """ 56query ($query: String!) { 57 search(type: ISSUE, query: $query) { 58 issueCount 59 } 60} 61""" 62 63DISABLED_ALERTS = [ 64 "rerun_disabled_tests", 65 "unstable", 66] 67 68 69class JobStatus: 70 job_name: str = "" 71 jobs: list[Any] = [] 72 current_status: Any = None 73 job_statuses: list[Any] = [] 74 filtered_statuses: list[Any] = [] 75 failure_chain: list[Any] = [] 76 flaky_jobs: list[Any] = [] 77 78 def __init__(self, job_name: str, job_statuses: list[Any]) -> None: 79 self.job_name = job_name 80 self.job_statuses = job_statuses 81 82 self.filtered_statuses = list( 83 filter(lambda j: not is_job_skipped(j), job_statuses) 84 ) 85 self.current_status = self.get_current_status() 86 self.failure_chain = self.get_most_recent_failure_chain() 87 self.flaky_jobs = self.get_flaky_jobs() 88 89 def get_current_status(self) -> Any: 90 """ 91 When getting the current status, we want the latest status which is not pending, 92 be it success or failure 93 """ 94 for status in self.filtered_statuses: 95 if status["conclusion"] != PENDING: 96 return status 97 return None 98 99 def get_unique_failures(self, jobs: list[Any]) -> dict[str, list[Any]]: 100 """ 101 Returns list of jobs grouped by failureCaptures from the input list 102 """ 103 failures = defaultdict(list) 104 for job in jobs: 105 if job["conclusion"] == "failure": 106 found_similar_failure = False 107 if "failureCaptures" not in job: 108 failures["unclassified"] = [job] 109 continue 110 111 # This is now a list returned by HUD API, not a string 112 failureCaptures = " ".join(job["failureCaptures"]) 113 114 for failure in failures: 115 seq = SequenceMatcher(None, failureCaptures, failure) 116 if seq.ratio() > SIMILARITY_THRESHOLD: 117 failures[failure].append(job) 118 found_similar_failure = True 119 break 120 if not found_similar_failure: 121 failures[failureCaptures] = [job] 122 123 return failures 124 125 # A flaky job is if it's the only job that has that failureCapture and is not the most recent job 126 def get_flaky_jobs(self) -> list[Any]: 127 unique_failures = self.get_unique_failures(self.filtered_statuses) 128 flaky_jobs = [] 129 for failure in unique_failures: 130 failure_list = unique_failures[failure] 131 if ( 132 len(failure_list) == 1 133 and failure_list[0]["sha"] != self.current_status["sha"] 134 ): 135 flaky_jobs.append(failure_list[0]) 136 return flaky_jobs 137 138 # The most recent failure chain is an array of jobs that have the same-ish failures. 139 # A success in the middle of the chain will terminate the chain. 140 def get_most_recent_failure_chain(self) -> list[Any]: 141 failures = [] 142 found_most_recent_failure = False 143 144 for job in self.filtered_statuses: 145 if is_job_failed(job): 146 failures.append(job) 147 found_most_recent_failure = True 148 if found_most_recent_failure and not is_job_failed(job): 149 break 150 151 return failures 152 153 def should_alert(self) -> bool: 154 # Group jobs by their failures. The length of the failure chain is used 155 # to raise the alert, so we can do a simple tweak here to use the length 156 # of the longest unique chain 157 unique_failures = self.get_unique_failures(self.failure_chain) 158 159 return ( 160 self.current_status is not None 161 and self.current_status["conclusion"] != SUCCESS 162 and any( 163 len(failure_chain) >= FAILURE_CHAIN_THRESHOLD 164 for failure_chain in unique_failures.values() 165 ) 166 and all( 167 disabled_alert not in self.job_name 168 for disabled_alert in DISABLED_ALERTS 169 ) 170 ) 171 172 def __repr__(self) -> str: 173 return f"jobName: {self.job_name}" 174 175 176def fetch_hud_data(repo: str, branch: str) -> Any: 177 response = requests.get(f"https://hud.pytorch.org/api/hud/{repo}/{branch}/0") 178 response.raise_for_status() 179 hud_data = json.loads(response.text) 180 return (hud_data["jobNames"], hud_data["shaGrid"]) 181 182 183# Creates a Dict of Job Name -> [JobData]. Essentially a Column in HUD 184def map_job_data(jobNames: Any, shaGrid: Any) -> dict[str, Any]: 185 jobData = defaultdict(list) 186 for sha in shaGrid: 187 for ind, job in enumerate(sha["jobs"]): 188 jobData[jobNames[ind]].append(job) 189 return jobData 190 191 192def is_job_failed(job: Any) -> bool: 193 conclusion = job["conclusion"] if "conclusion" in job else None 194 return conclusion is not None and conclusion != SUCCESS and conclusion != PENDING 195 196 197def is_job_skipped(job: Any) -> bool: 198 conclusion = job["conclusion"] if "conclusion" in job else None 199 return conclusion in (NEUTRAL, SKIPPED) or conclusion is None 200 201 202def get_failed_jobs(job_data: list[Any]) -> list[Any]: 203 return [job for job in job_data if job["conclusion"] == "failure"] 204 205 206def classify_jobs( 207 all_job_names: list[str], sha_grid: Any, filtered_jobs_names: set[str] 208) -> tuple[list[JobStatus], list[Any]]: 209 """ 210 Creates Job Statuses which has the logic for if need to alert or if there's flaky jobs. 211 Classifies jobs into jobs to alert on and flaky jobs. 212 :param all_job_names: list of all job names as returned by the HUD 213 :param sha_grid: list of all job data as returned by the HUD (parallel index to all_job_names) 214 :param filtered_jobs_names: set of job names to actually consider 215 :return: 216 """ 217 job_data = map_job_data(all_job_names, sha_grid) 218 job_statuses: list[JobStatus] = [] 219 for job in job_data: 220 job_statuses.append(JobStatus(job, job_data[job])) 221 222 jobs_to_alert_on = [] 223 flaky_jobs = [] 224 225 for job_status in job_statuses: 226 if job_status.job_name not in filtered_jobs_names: 227 continue 228 if job_status.should_alert(): 229 jobs_to_alert_on.append(job_status) 230 flaky_jobs.extend(job_status.flaky_jobs) 231 232 return jobs_to_alert_on, flaky_jobs 233 234 235# filter job names that don't match the regex 236def filter_job_names(job_names: list[str], job_name_regex: str) -> list[str]: 237 if job_name_regex: 238 return [ 239 job_name for job_name in job_names if re.match(job_name_regex, job_name) 240 ] 241 return job_names 242 243 244def get_recurrently_failing_jobs_alerts( 245 repo: str, branch: str, job_name_regex: str 246) -> list[dict[str, Any]]: 247 job_names, sha_grid = fetch_hud_data(repo=repo, branch=branch) 248 249 filtered_job_names = set(filter_job_names(job_names, job_name_regex)) 250 if job_name_regex: 251 print() 252 print(f"Filtered to {len(filtered_job_names)} jobs:") 253 if len(filtered_job_names) == 0: 254 print("No jobs matched the regex") 255 elif len(filtered_job_names) == len(job_names): 256 print("All jobs matched the regex") 257 else: 258 print("\n".join(filtered_job_names)) 259 260 (recurrently_failing_jobs, flaky_jobs) = classify_jobs( 261 job_names, sha_grid, filtered_job_names 262 ) 263 264 alerts = [] 265 for job in recurrently_failing_jobs: 266 entry = { 267 "AlertType": "Recurrently Failing Job", 268 "AlertObject": job.job_name, 269 "OncallTeams": [], 270 "OncallIndividuals": [], 271 "Flags": [], 272 "sha": job.failure_chain[-1]["sha"], 273 "branch": branch, 274 } 275 alerts.append(entry) 276 return alerts 277 278 279def parse_args() -> argparse.Namespace: 280 parser = argparse.ArgumentParser() 281 parser.add_argument( 282 "--repo", 283 help="Repository to do checks for", 284 type=str, 285 default=os.getenv("REPO_TO_CHECK", "pytorch/pytorch"), 286 ) 287 parser.add_argument( 288 "--branch", 289 help="Branch to do checks for", 290 type=str, 291 default=os.getenv("BRANCH_TO_CHECK", "main"), 292 ) 293 parser.add_argument( 294 "--job-name-regex", 295 help="Consider only job names matching given regex (if omitted, all jobs are matched)", 296 type=str, 297 default=os.getenv("JOB_NAME_REGEX", ""), 298 ) 299 parser.add_argument( 300 "--with-flaky-test-alert", 301 help="Run this script with the flaky test alerting", 302 type=distutils.util.strtobool, 303 default=os.getenv("WITH_FLAKY_TEST_ALERT", "YES"), 304 ) 305 parser.add_argument( 306 "--dry-run", 307 help="Whether or not to actually post issues", 308 type=distutils.util.strtobool, 309 default=os.getenv("DRY_RUN", "YES"), 310 ) 311 return parser.parse_args() 312 313 314if __name__ == "__main__": 315 args = parse_args() 316 data = json.dumps( 317 get_recurrently_failing_jobs_alerts(args.repo, args.branch, args.job_name_regex) 318 ) 319 320 print(data) 321