xref: /aosp_15_r20/external/pytorch/tools/alerts/create_alerts.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1#!/usr/bin/env python3
2
3from __future__ import annotations
4
5import argparse
6import json
7import os
8import re
9from collections import defaultdict
10from difflib import SequenceMatcher
11from typing import Any
12
13import requests
14from setuptools import distutils  # type: ignore[import]
15
16
17ALL_SKIPPED_THRESHOLD = 100
18SIMILARITY_THRESHOLD = 0.75
19FAILURE_CHAIN_THRESHOLD = 2
20MAX_CONCURRENT_ALERTS = 1
21FAILED_JOB_PATTERN = (
22    r"^- \[(.*)\]\(.*\) failed consecutively starting with commit \[.*\]\(.*\)$"
23)
24
25PENDING = "pending"
26NEUTRAL = "neutral"
27SKIPPED = "skipped"
28SUCCESS = "success"
29FAILURE = "failure"
30CANCELED = "canceled"
31
32ISSUES_WITH_LABEL_QUERY = """
33query ($owner: String!, $name: String!, $labels: [String!]) {
34  repository(owner: $owner, name: $name, followRenames: false) {
35    issues(last: 10, labels: $labels, states: [OPEN]) {
36      nodes {
37        id
38        title
39        closed
40        number
41        body
42        createdAt
43        comments(first: 100) {
44          nodes {
45            bodyText
46            databaseId
47          }
48        }
49      }
50    }
51  }
52}
53"""
54
55NUM_ISSUES_QUERY = """
56query ($query: String!) {
57  search(type: ISSUE, query: $query) {
58    issueCount
59  }
60}
61"""
62
63DISABLED_ALERTS = [
64    "rerun_disabled_tests",
65    "unstable",
66]
67
68
69class JobStatus:
70    job_name: str = ""
71    jobs: list[Any] = []
72    current_status: Any = None
73    job_statuses: list[Any] = []
74    filtered_statuses: list[Any] = []
75    failure_chain: list[Any] = []
76    flaky_jobs: list[Any] = []
77
78    def __init__(self, job_name: str, job_statuses: list[Any]) -> None:
79        self.job_name = job_name
80        self.job_statuses = job_statuses
81
82        self.filtered_statuses = list(
83            filter(lambda j: not is_job_skipped(j), job_statuses)
84        )
85        self.current_status = self.get_current_status()
86        self.failure_chain = self.get_most_recent_failure_chain()
87        self.flaky_jobs = self.get_flaky_jobs()
88
89    def get_current_status(self) -> Any:
90        """
91        When getting the current status, we want the latest status which is not pending,
92        be it success or failure
93        """
94        for status in self.filtered_statuses:
95            if status["conclusion"] != PENDING:
96                return status
97        return None
98
99    def get_unique_failures(self, jobs: list[Any]) -> dict[str, list[Any]]:
100        """
101        Returns list of jobs grouped by failureCaptures from the input list
102        """
103        failures = defaultdict(list)
104        for job in jobs:
105            if job["conclusion"] == "failure":
106                found_similar_failure = False
107                if "failureCaptures" not in job:
108                    failures["unclassified"] = [job]
109                    continue
110
111                # This is now a list returned by HUD API, not a string
112                failureCaptures = " ".join(job["failureCaptures"])
113
114                for failure in failures:
115                    seq = SequenceMatcher(None, failureCaptures, failure)
116                    if seq.ratio() > SIMILARITY_THRESHOLD:
117                        failures[failure].append(job)
118                        found_similar_failure = True
119                        break
120                if not found_similar_failure:
121                    failures[failureCaptures] = [job]
122
123        return failures
124
125    # A flaky job is if it's the only job that has that failureCapture and is not the most recent job
126    def get_flaky_jobs(self) -> list[Any]:
127        unique_failures = self.get_unique_failures(self.filtered_statuses)
128        flaky_jobs = []
129        for failure in unique_failures:
130            failure_list = unique_failures[failure]
131            if (
132                len(failure_list) == 1
133                and failure_list[0]["sha"] != self.current_status["sha"]
134            ):
135                flaky_jobs.append(failure_list[0])
136        return flaky_jobs
137
138    # The most recent failure chain is an array of jobs that have the same-ish failures.
139    # A success in the middle of the chain will terminate the chain.
140    def get_most_recent_failure_chain(self) -> list[Any]:
141        failures = []
142        found_most_recent_failure = False
143
144        for job in self.filtered_statuses:
145            if is_job_failed(job):
146                failures.append(job)
147                found_most_recent_failure = True
148            if found_most_recent_failure and not is_job_failed(job):
149                break
150
151        return failures
152
153    def should_alert(self) -> bool:
154        # Group jobs by their failures. The length of the failure chain is used
155        # to raise the alert, so we can do a simple tweak here to use the length
156        # of the longest unique chain
157        unique_failures = self.get_unique_failures(self.failure_chain)
158
159        return (
160            self.current_status is not None
161            and self.current_status["conclusion"] != SUCCESS
162            and any(
163                len(failure_chain) >= FAILURE_CHAIN_THRESHOLD
164                for failure_chain in unique_failures.values()
165            )
166            and all(
167                disabled_alert not in self.job_name
168                for disabled_alert in DISABLED_ALERTS
169            )
170        )
171
172    def __repr__(self) -> str:
173        return f"jobName: {self.job_name}"
174
175
176def fetch_hud_data(repo: str, branch: str) -> Any:
177    response = requests.get(f"https://hud.pytorch.org/api/hud/{repo}/{branch}/0")
178    response.raise_for_status()
179    hud_data = json.loads(response.text)
180    return (hud_data["jobNames"], hud_data["shaGrid"])
181
182
183# Creates a Dict of Job Name -> [JobData]. Essentially a Column in HUD
184def map_job_data(jobNames: Any, shaGrid: Any) -> dict[str, Any]:
185    jobData = defaultdict(list)
186    for sha in shaGrid:
187        for ind, job in enumerate(sha["jobs"]):
188            jobData[jobNames[ind]].append(job)
189    return jobData
190
191
192def is_job_failed(job: Any) -> bool:
193    conclusion = job["conclusion"] if "conclusion" in job else None
194    return conclusion is not None and conclusion != SUCCESS and conclusion != PENDING
195
196
197def is_job_skipped(job: Any) -> bool:
198    conclusion = job["conclusion"] if "conclusion" in job else None
199    return conclusion in (NEUTRAL, SKIPPED) or conclusion is None
200
201
202def get_failed_jobs(job_data: list[Any]) -> list[Any]:
203    return [job for job in job_data if job["conclusion"] == "failure"]
204
205
206def classify_jobs(
207    all_job_names: list[str], sha_grid: Any, filtered_jobs_names: set[str]
208) -> tuple[list[JobStatus], list[Any]]:
209    """
210    Creates Job Statuses which has the logic for if need to alert or if there's flaky jobs.
211    Classifies jobs into jobs to alert on and flaky jobs.
212    :param all_job_names: list of all job names as returned by the HUD
213    :param sha_grid: list of all job data as returned by the HUD (parallel index to all_job_names)
214    :param filtered_jobs_names: set of job names to actually consider
215    :return:
216    """
217    job_data = map_job_data(all_job_names, sha_grid)
218    job_statuses: list[JobStatus] = []
219    for job in job_data:
220        job_statuses.append(JobStatus(job, job_data[job]))
221
222    jobs_to_alert_on = []
223    flaky_jobs = []
224
225    for job_status in job_statuses:
226        if job_status.job_name not in filtered_jobs_names:
227            continue
228        if job_status.should_alert():
229            jobs_to_alert_on.append(job_status)
230        flaky_jobs.extend(job_status.flaky_jobs)
231
232    return jobs_to_alert_on, flaky_jobs
233
234
235# filter job names that don't match the regex
236def filter_job_names(job_names: list[str], job_name_regex: str) -> list[str]:
237    if job_name_regex:
238        return [
239            job_name for job_name in job_names if re.match(job_name_regex, job_name)
240        ]
241    return job_names
242
243
244def get_recurrently_failing_jobs_alerts(
245    repo: str, branch: str, job_name_regex: str
246) -> list[dict[str, Any]]:
247    job_names, sha_grid = fetch_hud_data(repo=repo, branch=branch)
248
249    filtered_job_names = set(filter_job_names(job_names, job_name_regex))
250    if job_name_regex:
251        print()
252        print(f"Filtered to {len(filtered_job_names)} jobs:")
253        if len(filtered_job_names) == 0:
254            print("No jobs matched the regex")
255        elif len(filtered_job_names) == len(job_names):
256            print("All jobs matched the regex")
257        else:
258            print("\n".join(filtered_job_names))
259
260    (recurrently_failing_jobs, flaky_jobs) = classify_jobs(
261        job_names, sha_grid, filtered_job_names
262    )
263
264    alerts = []
265    for job in recurrently_failing_jobs:
266        entry = {
267            "AlertType": "Recurrently Failing Job",
268            "AlertObject": job.job_name,
269            "OncallTeams": [],
270            "OncallIndividuals": [],
271            "Flags": [],
272            "sha": job.failure_chain[-1]["sha"],
273            "branch": branch,
274        }
275        alerts.append(entry)
276    return alerts
277
278
279def parse_args() -> argparse.Namespace:
280    parser = argparse.ArgumentParser()
281    parser.add_argument(
282        "--repo",
283        help="Repository to do checks for",
284        type=str,
285        default=os.getenv("REPO_TO_CHECK", "pytorch/pytorch"),
286    )
287    parser.add_argument(
288        "--branch",
289        help="Branch to do checks for",
290        type=str,
291        default=os.getenv("BRANCH_TO_CHECK", "main"),
292    )
293    parser.add_argument(
294        "--job-name-regex",
295        help="Consider only job names matching given regex (if omitted, all jobs are matched)",
296        type=str,
297        default=os.getenv("JOB_NAME_REGEX", ""),
298    )
299    parser.add_argument(
300        "--with-flaky-test-alert",
301        help="Run this script with the flaky test alerting",
302        type=distutils.util.strtobool,
303        default=os.getenv("WITH_FLAKY_TEST_ALERT", "YES"),
304    )
305    parser.add_argument(
306        "--dry-run",
307        help="Whether or not to actually post issues",
308        type=distutils.util.strtobool,
309        default=os.getenv("DRY_RUN", "YES"),
310    )
311    return parser.parse_args()
312
313
314if __name__ == "__main__":
315    args = parse_args()
316    data = json.dumps(
317        get_recurrently_failing_jobs_alerts(args.repo, args.branch, args.job_name_regex)
318    )
319
320    print(data)
321