xref: /aosp_15_r20/external/pytorch/tools/stats/upload_metrics.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1from __future__ import annotations
2
3import datetime
4import inspect
5import os
6import time
7import uuid
8from decimal import Decimal
9from typing import Any
10from warnings import warn
11
12
13# boto3 is an optional dependency. If it's not installed,
14# we'll just not emit the metrics.
15# Keeping this logic here so that callers don't have to
16# worry about it.
17EMIT_METRICS = False
18try:
19    import boto3  # type: ignore[import]
20
21    EMIT_METRICS = True
22except ImportError as e:
23    print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}")
24
25# Sometimes our runner machines are located in one AWS account while the metrics table may be in
26# another, so we need to specify the table's ARN explicitly.
27TORCHCI_METRICS_TABLE_ARN = (
28    "arn:aws:dynamodb:us-east-1:308535385114:table/torchci-metrics"
29)
30
31
32class EnvVarMetric:
33    name: str
34    env_var: str
35    required: bool = True
36    # Used to cast the value of the env_var to the correct type (defaults to str)
37    type_conversion_fn: Any = None
38
39    def __init__(
40        self,
41        name: str,
42        env_var: str,
43        required: bool = True,
44        type_conversion_fn: Any = None,
45    ) -> None:
46        self.name = name
47        self.env_var = env_var
48        self.required = required
49        self.type_conversion_fn = type_conversion_fn
50
51    def value(self) -> Any:
52        value = os.environ.get(self.env_var)
53
54        # Github CI will set some env vars to an empty string
55        DEFAULT_ENVVAR_VALUES = [None, ""]
56        if value in DEFAULT_ENVVAR_VALUES:
57            if not self.required:
58                return None
59
60            raise ValueError(
61                f"Missing {self.name}. Please set the {self.env_var} "
62                "environment variable to pass in this value."
63            )
64
65        if self.type_conversion_fn:
66            return self.type_conversion_fn(value)
67        return value
68
69
70global_metrics: dict[str, Any] = {}
71
72
73def add_global_metric(metric_name: str, metric_value: Any) -> None:
74    """
75    Adds stats that should be emitted with every metric by the current process.
76    If the emit_metrics method specifies a metric with the same name, it will
77    overwrite this value.
78    """
79    global_metrics[metric_name] = metric_value
80
81
82def emit_metric(
83    metric_name: str,
84    metrics: dict[str, Any],
85) -> None:
86    """
87    Upload a metric to DynamoDB (and from there, Rockset).
88
89    Even if EMIT_METRICS is set to False, this function will still run the code to
90    validate and shape the metrics, skipping just the upload.
91
92    Parameters:
93        metric_name:
94            Name of the metric. Every unique metric should have a different name
95            and be emitted just once per run attempt.
96            Metrics are namespaced by their module and the function that emitted them.
97        metrics: The actual data to record.
98
99    Some default values are populated from environment variables, which must be set
100    for metrics to be emitted. (If they're not set, this function becomes a noop):
101    """
102
103    if metrics is None:
104        raise ValueError("You didn't ask to upload any metrics!")
105
106    # Merge the given metrics with the global metrics, overwriting any duplicates
107    # with the given metrics.
108    metrics = {**global_metrics, **metrics}
109
110    # We use these env vars that to determine basic info about the workflow run.
111    # By using env vars, we don't have to pass this info around to every function.
112    # It also helps ensure that we only emit metrics during CI
113    env_var_metrics = [
114        EnvVarMetric("repo", "GITHUB_REPOSITORY"),
115        EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
116        EnvVarMetric("build_environment", "BUILD_ENVIRONMENT", required=False),
117        EnvVarMetric("job", "GITHUB_JOB"),
118        EnvVarMetric("test_config", "TEST_CONFIG", required=False),
119        EnvVarMetric("pr_number", "PR_NUMBER", required=False, type_conversion_fn=int),
120        EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
121        EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
122        EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
123        EnvVarMetric("job_id", "JOB_ID", type_conversion_fn=int),
124        EnvVarMetric("job_name", "JOB_NAME"),
125    ]
126
127    # Use info about the function that invoked this one as a namespace and a way to filter metrics.
128    calling_frame = inspect.currentframe().f_back  # type: ignore[union-attr]
129    calling_frame_info = inspect.getframeinfo(calling_frame)  # type: ignore[arg-type]
130    calling_file = os.path.basename(calling_frame_info.filename)
131    calling_module = inspect.getmodule(calling_frame).__name__  # type: ignore[union-attr]
132    calling_function = calling_frame_info.function
133
134    try:
135        reserved_metrics = {
136            "metric_name": metric_name,
137            "calling_file": calling_file,
138            "calling_module": calling_module,
139            "calling_function": calling_function,
140            "timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
141            **{m.name: m.value() for m in env_var_metrics if m.value()},
142        }
143    except ValueError as e:
144        warn(f"Not emitting metrics for {metric_name}. {e}")
145        return
146
147    # Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
148    reserved_metrics[
149        "dynamo_key"
150    ] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
151
152    # Ensure the metrics dict doesn't contain any reserved keys
153    for key in reserved_metrics.keys():
154        used_reserved_keys = [k for k in metrics.keys() if k == key]
155        if used_reserved_keys:
156            raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
157
158    # boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
159    metrics = _convert_float_values_to_decimals(metrics)
160
161    if EMIT_METRICS:
162        try:
163            session = boto3.Session(region_name="us-east-1")
164            session.resource("dynamodb").Table(TORCHCI_METRICS_TABLE_ARN).put_item(
165                Item={
166                    **reserved_metrics,
167                    **metrics,
168                }
169            )
170        except Exception as e:
171            # We don't want to fail the job if we can't upload the metric.
172            # We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
173            warn(f"Error uploading metric {metric_name} to DynamoDB: {e}")
174            return
175    else:
176        print(f"Not emitting metrics for {metric_name}. Boto wasn't imported.")
177
178
179def _convert_float_values_to_decimals(data: dict[str, Any]) -> dict[str, Any]:
180    # Attempt to recurse
181    def _helper(o: Any) -> Any:
182        if isinstance(o, float):
183            return Decimal(str(o))
184        if isinstance(o, list):
185            return [_helper(v) for v in o]
186        if isinstance(o, dict):
187            return {_helper(k): _helper(v) for k, v in o.items()}
188        if isinstance(o, tuple):
189            return tuple(_helper(v) for v in o)
190        return o
191
192    return {k: _helper(v) for k, v in data.items()}
193