xref: /aosp_15_r20/external/pytorch/tools/stats/upload_dynamo_perf_stats.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1from __future__ import annotations
2
3import argparse
4import csv
5import hashlib
6import json
7import os
8import re
9from pathlib import Path
10from tempfile import TemporaryDirectory
11from typing import Any, Dict
12
13from tools.stats.upload_stats_lib import (
14    download_s3_artifacts,
15    unzip,
16    upload_to_dynamodb,
17    upload_to_rockset,
18)
19
20
21ARTIFACTS = [
22    "test-reports",
23]
24ARTIFACT_REGEX = re.compile(
25    r"test-reports-test-(?P<name>[\w\-]+)-\d+-\d+-(?P<runner>[\w\.-]+)_(?P<job>\d+).zip"
26)
27
28
29def upload_dynamo_perf_stats_to_rockset(
30    repo: str,
31    workflow_run_id: int,
32    workflow_run_attempt: int,
33    head_branch: str,
34    match_filename: str,
35) -> list[dict[str, Any]]:
36    match_filename_regex = re.compile(match_filename)
37    perf_stats = []
38    with TemporaryDirectory() as temp_dir:
39        print("Using temporary directory:", temp_dir)
40        os.chdir(temp_dir)
41
42        for artifact in ARTIFACTS:
43            artifact_paths = download_s3_artifacts(
44                artifact, workflow_run_id, workflow_run_attempt
45            )
46
47            # Unzip to get perf stats csv files
48            for path in artifact_paths:
49                m = ARTIFACT_REGEX.match(str(path))
50                if not m:
51                    print(f"Test report {path} has an invalid name. Skipping")
52                    continue
53
54                test_name = m.group("name")
55                runner = m.group("runner")
56                job_id = m.group("job")
57
58                # Extract all files
59                unzip(path)
60
61                for csv_file in Path(".").glob("**/*.csv"):
62                    filename = os.path.splitext(os.path.basename(csv_file))[0]
63                    if not re.match(match_filename_regex, filename):
64                        continue
65                    print(f"Processing {filename} from {path}")
66
67                    with open(csv_file) as csvfile:
68                        reader = csv.DictReader(csvfile, delimiter=",")
69
70                        for row in reader:
71                            row.update(
72                                {
73                                    "workflow_id": workflow_run_id,  # type: ignore[dict-item]
74                                    "run_attempt": workflow_run_attempt,  # type: ignore[dict-item]
75                                    "test_name": test_name,
76                                    "runner": runner,
77                                    "job_id": job_id,
78                                    "filename": filename,
79                                    "head_branch": head_branch,
80                                }
81                            )
82                            perf_stats.append(row)
83
84                    # Done processing the file, removing it
85                    os.remove(csv_file)
86
87    return perf_stats
88
89
90def generate_partition_key(repo: str, doc: Dict[str, Any]) -> str:
91    """
92    Generate an unique partition key for the document on DynamoDB
93    """
94    workflow_id = doc["workflow_id"]
95    job_id = doc["job_id"]
96    test_name = doc["test_name"]
97    filename = doc["filename"]
98
99    hash_content = hashlib.md5(json.dumps(doc).encode("utf-8")).hexdigest()
100    return f"{repo}/{workflow_id}/{job_id}/{test_name}/{filename}/{hash_content}"
101
102
103if __name__ == "__main__":
104    parser = argparse.ArgumentParser(
105        description="Upload dynamo perf stats from S3 to Rockset"
106    )
107    parser.add_argument(
108        "--workflow-run-id",
109        type=int,
110        required=True,
111        help="id of the workflow to get perf stats from",
112    )
113    parser.add_argument(
114        "--workflow-run-attempt",
115        type=int,
116        required=True,
117        help="which retry of the workflow this is",
118    )
119    parser.add_argument(
120        "--repo",
121        type=str,
122        required=True,
123        help="which GitHub repo this workflow run belongs to",
124    )
125    parser.add_argument(
126        "--head-branch",
127        type=str,
128        required=True,
129        help="head branch of the workflow",
130    )
131    parser.add_argument(
132        "--rockset-collection",
133        type=str,
134        required=True,
135        help="the name of the Rockset collection to store the stats",
136    )
137    parser.add_argument(
138        "--rockset-workspace",
139        type=str,
140        default="commons",
141        help="the name of the Rockset workspace to store the stats",
142    )
143    parser.add_argument(
144        "--dynamodb-table",
145        type=str,
146        required=True,
147        help="the name of the DynamoDB table to store the stats",
148    )
149    parser.add_argument(
150        "--match-filename",
151        type=str,
152        default="",
153        help="the regex to filter the list of CSV files containing the records to upload",
154    )
155    args = parser.parse_args()
156    perf_stats = upload_dynamo_perf_stats_to_rockset(
157        args.repo,
158        args.workflow_run_id,
159        args.workflow_run_attempt,
160        args.head_branch,
161        args.match_filename,
162    )
163    # TODO (huydhn): Write to both Rockset and DynamoDB, an one-off script to copy
164    # data from Rockset to DynamoDB is the next step before uploading to Rockset
165    # can be removed
166    upload_to_rockset(
167        collection=args.rockset_collection,
168        docs=perf_stats,
169        workspace=args.rockset_workspace,
170    )
171    upload_to_dynamodb(
172        dynamodb_table=args.dynamodb_table,
173        repo=args.repo,
174        docs=perf_stats,
175        generate_partition_key=generate_partition_key,
176    )
177