1from __future__ import annotations 2 3import argparse 4import csv 5import hashlib 6import json 7import os 8import re 9from pathlib import Path 10from tempfile import TemporaryDirectory 11from typing import Any, Dict 12 13from tools.stats.upload_stats_lib import ( 14 download_s3_artifacts, 15 unzip, 16 upload_to_dynamodb, 17 upload_to_rockset, 18) 19 20 21ARTIFACTS = [ 22 "test-reports", 23] 24ARTIFACT_REGEX = re.compile( 25 r"test-reports-test-(?P<name>[\w\-]+)-\d+-\d+-(?P<runner>[\w\.-]+)_(?P<job>\d+).zip" 26) 27 28 29def upload_dynamo_perf_stats_to_rockset( 30 repo: str, 31 workflow_run_id: int, 32 workflow_run_attempt: int, 33 head_branch: str, 34 match_filename: str, 35) -> list[dict[str, Any]]: 36 match_filename_regex = re.compile(match_filename) 37 perf_stats = [] 38 with TemporaryDirectory() as temp_dir: 39 print("Using temporary directory:", temp_dir) 40 os.chdir(temp_dir) 41 42 for artifact in ARTIFACTS: 43 artifact_paths = download_s3_artifacts( 44 artifact, workflow_run_id, workflow_run_attempt 45 ) 46 47 # Unzip to get perf stats csv files 48 for path in artifact_paths: 49 m = ARTIFACT_REGEX.match(str(path)) 50 if not m: 51 print(f"Test report {path} has an invalid name. Skipping") 52 continue 53 54 test_name = m.group("name") 55 runner = m.group("runner") 56 job_id = m.group("job") 57 58 # Extract all files 59 unzip(path) 60 61 for csv_file in Path(".").glob("**/*.csv"): 62 filename = os.path.splitext(os.path.basename(csv_file))[0] 63 if not re.match(match_filename_regex, filename): 64 continue 65 print(f"Processing {filename} from {path}") 66 67 with open(csv_file) as csvfile: 68 reader = csv.DictReader(csvfile, delimiter=",") 69 70 for row in reader: 71 row.update( 72 { 73 "workflow_id": workflow_run_id, # type: ignore[dict-item] 74 "run_attempt": workflow_run_attempt, # type: ignore[dict-item] 75 "test_name": test_name, 76 "runner": runner, 77 "job_id": job_id, 78 "filename": filename, 79 "head_branch": head_branch, 80 } 81 ) 82 perf_stats.append(row) 83 84 # Done processing the file, removing it 85 os.remove(csv_file) 86 87 return perf_stats 88 89 90def generate_partition_key(repo: str, doc: Dict[str, Any]) -> str: 91 """ 92 Generate an unique partition key for the document on DynamoDB 93 """ 94 workflow_id = doc["workflow_id"] 95 job_id = doc["job_id"] 96 test_name = doc["test_name"] 97 filename = doc["filename"] 98 99 hash_content = hashlib.md5(json.dumps(doc).encode("utf-8")).hexdigest() 100 return f"{repo}/{workflow_id}/{job_id}/{test_name}/{filename}/{hash_content}" 101 102 103if __name__ == "__main__": 104 parser = argparse.ArgumentParser( 105 description="Upload dynamo perf stats from S3 to Rockset" 106 ) 107 parser.add_argument( 108 "--workflow-run-id", 109 type=int, 110 required=True, 111 help="id of the workflow to get perf stats from", 112 ) 113 parser.add_argument( 114 "--workflow-run-attempt", 115 type=int, 116 required=True, 117 help="which retry of the workflow this is", 118 ) 119 parser.add_argument( 120 "--repo", 121 type=str, 122 required=True, 123 help="which GitHub repo this workflow run belongs to", 124 ) 125 parser.add_argument( 126 "--head-branch", 127 type=str, 128 required=True, 129 help="head branch of the workflow", 130 ) 131 parser.add_argument( 132 "--rockset-collection", 133 type=str, 134 required=True, 135 help="the name of the Rockset collection to store the stats", 136 ) 137 parser.add_argument( 138 "--rockset-workspace", 139 type=str, 140 default="commons", 141 help="the name of the Rockset workspace to store the stats", 142 ) 143 parser.add_argument( 144 "--dynamodb-table", 145 type=str, 146 required=True, 147 help="the name of the DynamoDB table to store the stats", 148 ) 149 parser.add_argument( 150 "--match-filename", 151 type=str, 152 default="", 153 help="the regex to filter the list of CSV files containing the records to upload", 154 ) 155 args = parser.parse_args() 156 perf_stats = upload_dynamo_perf_stats_to_rockset( 157 args.repo, 158 args.workflow_run_id, 159 args.workflow_run_attempt, 160 args.head_branch, 161 args.match_filename, 162 ) 163 # TODO (huydhn): Write to both Rockset and DynamoDB, an one-off script to copy 164 # data from Rockset to DynamoDB is the next step before uploading to Rockset 165 # can be removed 166 upload_to_rockset( 167 collection=args.rockset_collection, 168 docs=perf_stats, 169 workspace=args.rockset_workspace, 170 ) 171 upload_to_dynamodb( 172 dynamodb_table=args.dynamodb_table, 173 repo=args.repo, 174 docs=perf_stats, 175 generate_partition_key=generate_partition_key, 176 ) 177