1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import threading
17import time
18from typing import Any, Dict, Iterable, List, Optional, Union
19
20import grpc
21
22# pytype: disable=pyi-error
23from grpc_observability import _cyobservability
24from grpc_observability import _observability
25from grpc_observability import _open_telemetry_measures
26from grpc_observability._cyobservability import MetricsName
27from grpc_observability._observability import StatsData
28from opentelemetry.metrics import Counter
29from opentelemetry.metrics import Histogram
30from opentelemetry.metrics import Meter
31
32_LOGGER = logging.getLogger(__name__)
33
34ClientCallTracerCapsule = Any  # it appears only once in the function signature
35ServerCallTracerFactoryCapsule = (
36    Any  # it appears only once in the function signature
37)
38grpc_observability = Any  # grpc_observability.py imports this module.
39OpenTelemetryPlugin = Any  # _open_telemetry_plugin.py imports this module.
40
41GRPC_METHOD_LABEL = "grpc.method"
42GRPC_TARGET_LABEL = "grpc.target"
43GRPC_OTHER_LABEL_VALUE = "other"
44_observability_lock: threading.RLock = threading.RLock()
45_OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None
46
47GRPC_STATUS_CODE_TO_STRING = {
48    grpc.StatusCode.OK: "OK",
49    grpc.StatusCode.CANCELLED: "CANCELLED",
50    grpc.StatusCode.UNKNOWN: "UNKNOWN",
51    grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT",
52    grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED",
53    grpc.StatusCode.NOT_FOUND: "NOT_FOUND",
54    grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS",
55    grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED",
56    grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED",
57    grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED",
58    grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION",
59    grpc.StatusCode.ABORTED: "ABORTED",
60    grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE",
61    grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED",
62    grpc.StatusCode.INTERNAL: "INTERNAL",
63    grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE",
64    grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
65}
66
67
68class _OpenTelemetryPlugin:
69    _plugin: OpenTelemetryPlugin
70    _metric_to_recorder: Dict[MetricsName, Union[Counter, Histogram]]
71
72    def __init__(self, plugin: OpenTelemetryPlugin):
73        self._plugin = plugin
74        self._metric_to_recorder = dict()
75
76        meter_provider = self._plugin.meter_provider
77        if meter_provider:
78            meter = meter_provider.get_meter("grpc-python", grpc.__version__)
79            enabled_metrics = _open_telemetry_measures.base_metrics()
80            self._metric_to_recorder = self._register_metrics(
81                meter, enabled_metrics
82            )
83
84    def _should_record(self, stats_data: StatsData) -> bool:
85        # Decide if this plugin should record the stats_data.
86        return stats_data.name in self._metric_to_recorder.keys()
87
88    def _record_stats_data(self, stats_data: StatsData) -> None:
89        recorder = self._metric_to_recorder[stats_data.name]
90
91        target = stats_data.labels.get(GRPC_TARGET_LABEL, "")
92        if not self._plugin.target_attribute_filter(target):
93            # Filter target name.
94            stats_data.labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE
95
96        method = stats_data.labels.get(GRPC_METHOD_LABEL, "")
97        if not self._plugin.generic_method_attribute_filter(method):
98            # Filter method name.
99            stats_data.labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE
100
101        value = 0
102        if stats_data.measure_double:
103            value = stats_data.value_float
104        else:
105            value = stats_data.value_int
106        if isinstance(recorder, Counter):
107            recorder.add(value, attributes=stats_data.labels)
108        elif isinstance(recorder, Histogram):
109            recorder.record(value, attributes=stats_data.labels)
110
111    # pylint: disable=no-self-use
112    def maybe_record_stats_data(self, stats_data: List[StatsData]) -> None:
113        # Records stats data to MeterProvider.
114        if self._should_record(stats_data):
115            self._record_stats_data(stats_data)
116
117    def _register_metrics(
118        self, meter: Meter, metrics: List[_open_telemetry_measures.Metric]
119    ) -> Dict[MetricsName, Union[Counter, Histogram]]:
120        metric_to_recorder_map = {}
121        recorder = None
122        for metric in metrics:
123            if metric == _open_telemetry_measures.CLIENT_ATTEMPT_STARTED:
124                recorder = meter.create_counter(
125                    name=metric.name,
126                    unit=metric.unit,
127                    description=metric.description,
128                )
129            elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_DURATION:
130                recorder = meter.create_histogram(
131                    name=metric.name,
132                    unit=metric.unit,
133                    description=metric.description,
134                )
135            elif metric == _open_telemetry_measures.CLIENT_RPC_DURATION:
136                recorder = meter.create_histogram(
137                    name=metric.name,
138                    unit=metric.unit,
139                    description=metric.description,
140                )
141            elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_SEND_BYTES:
142                recorder = meter.create_histogram(
143                    name=metric.name,
144                    unit=metric.unit,
145                    description=metric.description,
146                )
147            elif (
148                metric == _open_telemetry_measures.CLIENT_ATTEMPT_RECEIVED_BYTES
149            ):
150                recorder = meter.create_histogram(
151                    name=metric.name,
152                    unit=metric.unit,
153                    description=metric.description,
154                )
155            elif metric == _open_telemetry_measures.SERVER_STARTED_RPCS:
156                recorder = meter.create_counter(
157                    name=metric.name,
158                    unit=metric.unit,
159                    description=metric.description,
160                )
161            elif metric == _open_telemetry_measures.SERVER_RPC_DURATION:
162                recorder = meter.create_histogram(
163                    name=metric.name,
164                    unit=metric.unit,
165                    description=metric.description,
166                )
167            elif metric == _open_telemetry_measures.SERVER_RPC_SEND_BYTES:
168                recorder = meter.create_histogram(
169                    name=metric.name,
170                    unit=metric.unit,
171                    description=metric.description,
172                )
173            elif metric == _open_telemetry_measures.SERVER_RPC_RECEIVED_BYTES:
174                recorder = meter.create_histogram(
175                    name=metric.name,
176                    unit=metric.unit,
177                    description=metric.description,
178                )
179            metric_to_recorder_map[metric.cyname] = recorder
180        return metric_to_recorder_map
181
182
183def start_open_telemetry_observability(
184    *,
185    plugins: Iterable[_OpenTelemetryPlugin],
186) -> None:
187    _start_open_telemetry_observability(
188        OpenTelemetryObservability(plugins=plugins)
189    )
190
191
192def end_open_telemetry_observability() -> None:
193    _end_open_telemetry_observability()
194
195
196class _OpenTelemetryExporterDelegator(_observability.Exporter):
197    _plugins: Iterable[_OpenTelemetryPlugin]
198
199    def __init__(self, plugins: Iterable[_OpenTelemetryPlugin]):
200        self._plugins = plugins
201
202    def export_stats_data(
203        self, stats_data: List[_observability.StatsData]
204    ) -> None:
205        # Records stats data to MeterProvider.
206        for data in stats_data:
207            for plugin in self._plugins:
208                plugin.maybe_record_stats_data(data)
209
210    def export_tracing_data(
211        self, tracing_data: List[_observability.TracingData]
212    ) -> None:
213        pass
214
215
216# pylint: disable=no-self-use
217class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
218    """OpenTelemetry based plugin implementation.
219
220    This is class is part of an EXPERIMENTAL API.
221
222    Args:
223      plugin: _OpenTelemetryPlugin to enable.
224    """
225
226    exporter: "grpc_observability.Exporter"
227
228    def __init__(
229        self,
230        *,
231        plugins: Optional[Iterable[_OpenTelemetryPlugin]],
232    ):
233        self.exporter = _OpenTelemetryExporterDelegator(plugins)
234
235    def observability_init(self):
236        try:
237            _cyobservability.activate_stats()
238            self.set_stats(True)
239        except Exception as e:  # pylint: disable=broad-except
240            raise ValueError(f"Activate observability metrics failed with: {e}")
241
242        try:
243            _cyobservability.cyobservability_init(self.exporter)
244        # TODO(xuanwn): Use specific exceptons
245        except Exception as e:  # pylint: disable=broad-except
246            _LOGGER.exception("Initiate observability failed with: %s", e)
247
248        grpc._observability.observability_init(self)
249
250    def observability_deinit(self) -> None:
251        # Sleep so we don't loss any data. If we shutdown export thread
252        # immediately after exit, it's possible that core didn't call RecordEnd
253        # in callTracer, and all data recorded by calling RecordEnd will be
254        # lost.
255        # CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in
256        # AwaitNextBatchLocked.
257        # TODO(xuanwn): explicit synchronization
258        # https://github.com/grpc/grpc/issues/33262
259        time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS)
260        self.set_tracing(False)
261        self.set_stats(False)
262        _cyobservability.observability_deinit()
263        grpc._observability.observability_deinit()
264
265    def create_client_call_tracer(
266        self, method_name: bytes, target: bytes
267    ) -> ClientCallTracerCapsule:
268        trace_id = b"TRACE_ID"
269        capsule = _cyobservability.create_client_call_tracer(
270            method_name, target, trace_id
271        )
272        return capsule
273
274    def create_server_call_tracer_factory(
275        self,
276    ) -> ServerCallTracerFactoryCapsule:
277        capsule = _cyobservability.create_server_call_tracer_factory_capsule()
278        return capsule
279
280    def delete_client_call_tracer(
281        self, client_call_tracer: ClientCallTracerCapsule
282    ) -> None:
283        _cyobservability.delete_client_call_tracer(client_call_tracer)
284
285    def save_trace_context(
286        self, trace_id: str, span_id: str, is_sampled: bool
287    ) -> None:
288        pass
289
290    def record_rpc_latency(
291        self,
292        method: str,
293        target: str,
294        rpc_latency: float,
295        status_code: grpc.StatusCode,
296    ) -> None:
297        status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN")
298        _cyobservability._record_rpc_latency(
299            self.exporter, method, target, rpc_latency, status_code
300        )
301
302
303def _start_open_telemetry_observability(
304    otel_o11y: OpenTelemetryObservability,
305) -> None:
306    global _OPEN_TELEMETRY_OBSERVABILITY  # pylint: disable=global-statement
307    with _observability_lock:
308        if _OPEN_TELEMETRY_OBSERVABILITY is None:
309            _OPEN_TELEMETRY_OBSERVABILITY = otel_o11y
310            _OPEN_TELEMETRY_OBSERVABILITY.observability_init()
311        else:
312            raise RuntimeError(
313                "gPRC Python observability was already initialized!"
314            )
315
316
317def _end_open_telemetry_observability() -> None:
318    global _OPEN_TELEMETRY_OBSERVABILITY  # pylint: disable=global-statement
319    with _observability_lock:
320        if not _OPEN_TELEMETRY_OBSERVABILITY:
321            raise RuntimeError(
322                "Trying to end gPRC Python observability without initialize first!"
323            )
324        else:
325            _OPEN_TELEMETRY_OBSERVABILITY.observability_deinit()
326            _OPEN_TELEMETRY_OBSERVABILITY = None
327