xref: /aosp_15_r20/external/grpc-grpc/examples/python/observability/open_telemetry_exporter.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2024 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from typing import Dict, List
16
17from opentelemetry.sdk.metrics.export import AggregationTemporality
18from opentelemetry.sdk.metrics.export import MetricExportResult
19from opentelemetry.sdk.metrics.export import MetricExporter
20from opentelemetry.sdk.metrics.export import MetricsData
21
22
23class OTelMetricExporter(MetricExporter):
24    """Implementation of :class:`MetricExporter` that export metrics to the
25    provided metric_list.
26
27    all_metrics: A dict whose keys are grpc_observability._opentelemetry_measures.Metric.name,
28        value is a list of labels recorded for that metric.
29        An example item of this dict:
30            {"grpc.client.attempt.started":
31              [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'},
32               {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]}
33    """
34
35    def __init__(
36        self,
37        all_metrics: Dict[str, List],
38        preferred_temporality: Dict[type, AggregationTemporality] = None,
39        preferred_aggregation: Dict[
40            type, "opentelemetry.sdk.metrics.view.Aggregation"
41        ] = None,
42        print_live: bool = False,
43    ):
44        super().__init__(
45            preferred_temporality=preferred_temporality,
46            preferred_aggregation=preferred_aggregation,
47        )
48        self._all_metrics = all_metrics
49        self._print_live = print_live
50
51    def export(
52        self,
53        metrics_data: MetricsData,
54        timeout_millis: float = 10_000,
55        **kwargs,
56    ) -> MetricExportResult:
57        self.record_metric(metrics_data)
58        return MetricExportResult.SUCCESS
59
60    def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
61        pass
62
63    def force_flush(self, timeout_millis: float = 10_000) -> bool:
64        return True
65
66    def record_metric(self, metrics_data: MetricsData) -> None:
67        for resource_metric in metrics_data.resource_metrics:
68            for scope_metric in resource_metric.scope_metrics:
69                for metric in scope_metric.metrics:
70                    for data_point in metric.data.data_points:
71                        self._all_metrics[metric.name].append(
72                            data_point.attributes
73                        )
74                        if self._print_live:
75                            print(f"Metric exporter received: {metric.name}")
76