1from __future__ import annotations 2 3import datetime 4import inspect 5import os 6import time 7import uuid 8from decimal import Decimal 9from typing import Any 10from warnings import warn 11 12 13# boto3 is an optional dependency. If it's not installed, 14# we'll just not emit the metrics. 15# Keeping this logic here so that callers don't have to 16# worry about it. 17EMIT_METRICS = False 18try: 19 import boto3 # type: ignore[import] 20 21 EMIT_METRICS = True 22except ImportError as e: 23 print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}") 24 25# Sometimes our runner machines are located in one AWS account while the metrics table may be in 26# another, so we need to specify the table's ARN explicitly. 27TORCHCI_METRICS_TABLE_ARN = ( 28 "arn:aws:dynamodb:us-east-1:308535385114:table/torchci-metrics" 29) 30 31 32class EnvVarMetric: 33 name: str 34 env_var: str 35 required: bool = True 36 # Used to cast the value of the env_var to the correct type (defaults to str) 37 type_conversion_fn: Any = None 38 39 def __init__( 40 self, 41 name: str, 42 env_var: str, 43 required: bool = True, 44 type_conversion_fn: Any = None, 45 ) -> None: 46 self.name = name 47 self.env_var = env_var 48 self.required = required 49 self.type_conversion_fn = type_conversion_fn 50 51 def value(self) -> Any: 52 value = os.environ.get(self.env_var) 53 54 # Github CI will set some env vars to an empty string 55 DEFAULT_ENVVAR_VALUES = [None, ""] 56 if value in DEFAULT_ENVVAR_VALUES: 57 if not self.required: 58 return None 59 60 raise ValueError( 61 f"Missing {self.name}. Please set the {self.env_var} " 62 "environment variable to pass in this value." 63 ) 64 65 if self.type_conversion_fn: 66 return self.type_conversion_fn(value) 67 return value 68 69 70global_metrics: dict[str, Any] = {} 71 72 73def add_global_metric(metric_name: str, metric_value: Any) -> None: 74 """ 75 Adds stats that should be emitted with every metric by the current process. 76 If the emit_metrics method specifies a metric with the same name, it will 77 overwrite this value. 78 """ 79 global_metrics[metric_name] = metric_value 80 81 82def emit_metric( 83 metric_name: str, 84 metrics: dict[str, Any], 85) -> None: 86 """ 87 Upload a metric to DynamoDB (and from there, Rockset). 88 89 Even if EMIT_METRICS is set to False, this function will still run the code to 90 validate and shape the metrics, skipping just the upload. 91 92 Parameters: 93 metric_name: 94 Name of the metric. Every unique metric should have a different name 95 and be emitted just once per run attempt. 96 Metrics are namespaced by their module and the function that emitted them. 97 metrics: The actual data to record. 98 99 Some default values are populated from environment variables, which must be set 100 for metrics to be emitted. (If they're not set, this function becomes a noop): 101 """ 102 103 if metrics is None: 104 raise ValueError("You didn't ask to upload any metrics!") 105 106 # Merge the given metrics with the global metrics, overwriting any duplicates 107 # with the given metrics. 108 metrics = {**global_metrics, **metrics} 109 110 # We use these env vars that to determine basic info about the workflow run. 111 # By using env vars, we don't have to pass this info around to every function. 112 # It also helps ensure that we only emit metrics during CI 113 env_var_metrics = [ 114 EnvVarMetric("repo", "GITHUB_REPOSITORY"), 115 EnvVarMetric("workflow", "GITHUB_WORKFLOW"), 116 EnvVarMetric("build_environment", "BUILD_ENVIRONMENT", required=False), 117 EnvVarMetric("job", "GITHUB_JOB"), 118 EnvVarMetric("test_config", "TEST_CONFIG", required=False), 119 EnvVarMetric("pr_number", "PR_NUMBER", required=False, type_conversion_fn=int), 120 EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int), 121 EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int), 122 EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int), 123 EnvVarMetric("job_id", "JOB_ID", type_conversion_fn=int), 124 EnvVarMetric("job_name", "JOB_NAME"), 125 ] 126 127 # Use info about the function that invoked this one as a namespace and a way to filter metrics. 128 calling_frame = inspect.currentframe().f_back # type: ignore[union-attr] 129 calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type] 130 calling_file = os.path.basename(calling_frame_info.filename) 131 calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr] 132 calling_function = calling_frame_info.function 133 134 try: 135 reserved_metrics = { 136 "metric_name": metric_name, 137 "calling_file": calling_file, 138 "calling_module": calling_module, 139 "calling_function": calling_function, 140 "timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), 141 **{m.name: m.value() for m in env_var_metrics if m.value()}, 142 } 143 except ValueError as e: 144 warn(f"Not emitting metrics for {metric_name}. {e}") 145 return 146 147 # Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision 148 reserved_metrics[ 149 "dynamo_key" 150 ] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}" 151 152 # Ensure the metrics dict doesn't contain any reserved keys 153 for key in reserved_metrics.keys(): 154 used_reserved_keys = [k for k in metrics.keys() if k == key] 155 if used_reserved_keys: 156 raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]") 157 158 # boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals. 159 metrics = _convert_float_values_to_decimals(metrics) 160 161 if EMIT_METRICS: 162 try: 163 session = boto3.Session(region_name="us-east-1") 164 session.resource("dynamodb").Table(TORCHCI_METRICS_TABLE_ARN).put_item( 165 Item={ 166 **reserved_metrics, 167 **metrics, 168 } 169 ) 170 except Exception as e: 171 # We don't want to fail the job if we can't upload the metric. 172 # We still raise the ValueErrors outside this try block since those indicate improperly configured metrics 173 warn(f"Error uploading metric {metric_name} to DynamoDB: {e}") 174 return 175 else: 176 print(f"Not emitting metrics for {metric_name}. Boto wasn't imported.") 177 178 179def _convert_float_values_to_decimals(data: dict[str, Any]) -> dict[str, Any]: 180 # Attempt to recurse 181 def _helper(o: Any) -> Any: 182 if isinstance(o, float): 183 return Decimal(str(o)) 184 if isinstance(o, list): 185 return [_helper(v) for v in o] 186 if isinstance(o, dict): 187 return {_helper(k): _helper(v) for k, v in o.items()} 188 if isinstance(o, tuple): 189 return tuple(_helper(v) for v in o) 190 return o 191 192 return {k: _helper(v) for k, v in data.items()} 193