1# Copyright 2024 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from typing import Dict, List 16 17from opentelemetry.sdk.metrics.export import AggregationTemporality 18from opentelemetry.sdk.metrics.export import MetricExportResult 19from opentelemetry.sdk.metrics.export import MetricExporter 20from opentelemetry.sdk.metrics.export import MetricsData 21 22 23class OTelMetricExporter(MetricExporter): 24 """Implementation of :class:`MetricExporter` that export metrics to the 25 provided metric_list. 26 27 all_metrics: A dict whose keys are grpc_observability._opentelemetry_measures.Metric.name, 28 value is a list of labels recorded for that metric. 29 An example item of this dict: 30 {"grpc.client.attempt.started": 31 [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, 32 {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} 33 """ 34 35 def __init__( 36 self, 37 all_metrics: Dict[str, List], 38 preferred_temporality: Dict[type, AggregationTemporality] = None, 39 preferred_aggregation: Dict[ 40 type, "opentelemetry.sdk.metrics.view.Aggregation" 41 ] = None, 42 print_live: bool = False, 43 ): 44 super().__init__( 45 preferred_temporality=preferred_temporality, 46 preferred_aggregation=preferred_aggregation, 47 ) 48 self._all_metrics = all_metrics 49 self._print_live = print_live 50 51 def export( 52 self, 53 metrics_data: MetricsData, 54 timeout_millis: float = 10_000, 55 **kwargs, 56 ) -> MetricExportResult: 57 self.record_metric(metrics_data) 58 return MetricExportResult.SUCCESS 59 60 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 61 pass 62 63 def force_flush(self, timeout_millis: float = 10_000) -> bool: 64 return True 65 66 def record_metric(self, metrics_data: MetricsData) -> None: 67 for resource_metric in metrics_data.resource_metrics: 68 for scope_metric in resource_metric.scope_metrics: 69 for metric in scope_metric.metrics: 70 for data_point in metric.data.data_points: 71 self._all_metrics[metric.name].append( 72 data_point.attributes 73 ) 74 if self._print_live: 75 print(f"Metric exporter received: {metric.name}") 76