1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import threading 17import time 18from typing import Any, Dict, Iterable, List, Optional, Union 19 20import grpc 21 22# pytype: disable=pyi-error 23from grpc_observability import _cyobservability 24from grpc_observability import _observability 25from grpc_observability import _open_telemetry_measures 26from grpc_observability._cyobservability import MetricsName 27from grpc_observability._observability import StatsData 28from opentelemetry.metrics import Counter 29from opentelemetry.metrics import Histogram 30from opentelemetry.metrics import Meter 31 32_LOGGER = logging.getLogger(__name__) 33 34ClientCallTracerCapsule = Any # it appears only once in the function signature 35ServerCallTracerFactoryCapsule = ( 36 Any # it appears only once in the function signature 37) 38grpc_observability = Any # grpc_observability.py imports this module. 39OpenTelemetryPlugin = Any # _open_telemetry_plugin.py imports this module. 40 41GRPC_METHOD_LABEL = "grpc.method" 42GRPC_TARGET_LABEL = "grpc.target" 43GRPC_OTHER_LABEL_VALUE = "other" 44_observability_lock: threading.RLock = threading.RLock() 45_OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None 46 47GRPC_STATUS_CODE_TO_STRING = { 48 grpc.StatusCode.OK: "OK", 49 grpc.StatusCode.CANCELLED: "CANCELLED", 50 grpc.StatusCode.UNKNOWN: "UNKNOWN", 51 grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT", 52 grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED", 53 grpc.StatusCode.NOT_FOUND: "NOT_FOUND", 54 grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS", 55 grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED", 56 grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED", 57 grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED", 58 grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION", 59 grpc.StatusCode.ABORTED: "ABORTED", 60 grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE", 61 grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED", 62 grpc.StatusCode.INTERNAL: "INTERNAL", 63 grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE", 64 grpc.StatusCode.DATA_LOSS: "DATA_LOSS", 65} 66 67 68class _OpenTelemetryPlugin: 69 _plugin: OpenTelemetryPlugin 70 _metric_to_recorder: Dict[MetricsName, Union[Counter, Histogram]] 71 72 def __init__(self, plugin: OpenTelemetryPlugin): 73 self._plugin = plugin 74 self._metric_to_recorder = dict() 75 76 meter_provider = self._plugin.meter_provider 77 if meter_provider: 78 meter = meter_provider.get_meter("grpc-python", grpc.__version__) 79 enabled_metrics = _open_telemetry_measures.base_metrics() 80 self._metric_to_recorder = self._register_metrics( 81 meter, enabled_metrics 82 ) 83 84 def _should_record(self, stats_data: StatsData) -> bool: 85 # Decide if this plugin should record the stats_data. 86 return stats_data.name in self._metric_to_recorder.keys() 87 88 def _record_stats_data(self, stats_data: StatsData) -> None: 89 recorder = self._metric_to_recorder[stats_data.name] 90 91 target = stats_data.labels.get(GRPC_TARGET_LABEL, "") 92 if not self._plugin.target_attribute_filter(target): 93 # Filter target name. 94 stats_data.labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE 95 96 method = stats_data.labels.get(GRPC_METHOD_LABEL, "") 97 if not self._plugin.generic_method_attribute_filter(method): 98 # Filter method name. 99 stats_data.labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE 100 101 value = 0 102 if stats_data.measure_double: 103 value = stats_data.value_float 104 else: 105 value = stats_data.value_int 106 if isinstance(recorder, Counter): 107 recorder.add(value, attributes=stats_data.labels) 108 elif isinstance(recorder, Histogram): 109 recorder.record(value, attributes=stats_data.labels) 110 111 # pylint: disable=no-self-use 112 def maybe_record_stats_data(self, stats_data: List[StatsData]) -> None: 113 # Records stats data to MeterProvider. 114 if self._should_record(stats_data): 115 self._record_stats_data(stats_data) 116 117 def _register_metrics( 118 self, meter: Meter, metrics: List[_open_telemetry_measures.Metric] 119 ) -> Dict[MetricsName, Union[Counter, Histogram]]: 120 metric_to_recorder_map = {} 121 recorder = None 122 for metric in metrics: 123 if metric == _open_telemetry_measures.CLIENT_ATTEMPT_STARTED: 124 recorder = meter.create_counter( 125 name=metric.name, 126 unit=metric.unit, 127 description=metric.description, 128 ) 129 elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_DURATION: 130 recorder = meter.create_histogram( 131 name=metric.name, 132 unit=metric.unit, 133 description=metric.description, 134 ) 135 elif metric == _open_telemetry_measures.CLIENT_RPC_DURATION: 136 recorder = meter.create_histogram( 137 name=metric.name, 138 unit=metric.unit, 139 description=metric.description, 140 ) 141 elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_SEND_BYTES: 142 recorder = meter.create_histogram( 143 name=metric.name, 144 unit=metric.unit, 145 description=metric.description, 146 ) 147 elif ( 148 metric == _open_telemetry_measures.CLIENT_ATTEMPT_RECEIVED_BYTES 149 ): 150 recorder = meter.create_histogram( 151 name=metric.name, 152 unit=metric.unit, 153 description=metric.description, 154 ) 155 elif metric == _open_telemetry_measures.SERVER_STARTED_RPCS: 156 recorder = meter.create_counter( 157 name=metric.name, 158 unit=metric.unit, 159 description=metric.description, 160 ) 161 elif metric == _open_telemetry_measures.SERVER_RPC_DURATION: 162 recorder = meter.create_histogram( 163 name=metric.name, 164 unit=metric.unit, 165 description=metric.description, 166 ) 167 elif metric == _open_telemetry_measures.SERVER_RPC_SEND_BYTES: 168 recorder = meter.create_histogram( 169 name=metric.name, 170 unit=metric.unit, 171 description=metric.description, 172 ) 173 elif metric == _open_telemetry_measures.SERVER_RPC_RECEIVED_BYTES: 174 recorder = meter.create_histogram( 175 name=metric.name, 176 unit=metric.unit, 177 description=metric.description, 178 ) 179 metric_to_recorder_map[metric.cyname] = recorder 180 return metric_to_recorder_map 181 182 183def start_open_telemetry_observability( 184 *, 185 plugins: Iterable[_OpenTelemetryPlugin], 186) -> None: 187 _start_open_telemetry_observability( 188 OpenTelemetryObservability(plugins=plugins) 189 ) 190 191 192def end_open_telemetry_observability() -> None: 193 _end_open_telemetry_observability() 194 195 196class _OpenTelemetryExporterDelegator(_observability.Exporter): 197 _plugins: Iterable[_OpenTelemetryPlugin] 198 199 def __init__(self, plugins: Iterable[_OpenTelemetryPlugin]): 200 self._plugins = plugins 201 202 def export_stats_data( 203 self, stats_data: List[_observability.StatsData] 204 ) -> None: 205 # Records stats data to MeterProvider. 206 for data in stats_data: 207 for plugin in self._plugins: 208 plugin.maybe_record_stats_data(data) 209 210 def export_tracing_data( 211 self, tracing_data: List[_observability.TracingData] 212 ) -> None: 213 pass 214 215 216# pylint: disable=no-self-use 217class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): 218 """OpenTelemetry based plugin implementation. 219 220 This is class is part of an EXPERIMENTAL API. 221 222 Args: 223 plugin: _OpenTelemetryPlugin to enable. 224 """ 225 226 exporter: "grpc_observability.Exporter" 227 228 def __init__( 229 self, 230 *, 231 plugins: Optional[Iterable[_OpenTelemetryPlugin]], 232 ): 233 self.exporter = _OpenTelemetryExporterDelegator(plugins) 234 235 def observability_init(self): 236 try: 237 _cyobservability.activate_stats() 238 self.set_stats(True) 239 except Exception as e: # pylint: disable=broad-except 240 raise ValueError(f"Activate observability metrics failed with: {e}") 241 242 try: 243 _cyobservability.cyobservability_init(self.exporter) 244 # TODO(xuanwn): Use specific exceptons 245 except Exception as e: # pylint: disable=broad-except 246 _LOGGER.exception("Initiate observability failed with: %s", e) 247 248 grpc._observability.observability_init(self) 249 250 def observability_deinit(self) -> None: 251 # Sleep so we don't loss any data. If we shutdown export thread 252 # immediately after exit, it's possible that core didn't call RecordEnd 253 # in callTracer, and all data recorded by calling RecordEnd will be 254 # lost. 255 # CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in 256 # AwaitNextBatchLocked. 257 # TODO(xuanwn): explicit synchronization 258 # https://github.com/grpc/grpc/issues/33262 259 time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) 260 self.set_tracing(False) 261 self.set_stats(False) 262 _cyobservability.observability_deinit() 263 grpc._observability.observability_deinit() 264 265 def create_client_call_tracer( 266 self, method_name: bytes, target: bytes 267 ) -> ClientCallTracerCapsule: 268 trace_id = b"TRACE_ID" 269 capsule = _cyobservability.create_client_call_tracer( 270 method_name, target, trace_id 271 ) 272 return capsule 273 274 def create_server_call_tracer_factory( 275 self, 276 ) -> ServerCallTracerFactoryCapsule: 277 capsule = _cyobservability.create_server_call_tracer_factory_capsule() 278 return capsule 279 280 def delete_client_call_tracer( 281 self, client_call_tracer: ClientCallTracerCapsule 282 ) -> None: 283 _cyobservability.delete_client_call_tracer(client_call_tracer) 284 285 def save_trace_context( 286 self, trace_id: str, span_id: str, is_sampled: bool 287 ) -> None: 288 pass 289 290 def record_rpc_latency( 291 self, 292 method: str, 293 target: str, 294 rpc_latency: float, 295 status_code: grpc.StatusCode, 296 ) -> None: 297 status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") 298 _cyobservability._record_rpc_latency( 299 self.exporter, method, target, rpc_latency, status_code 300 ) 301 302 303def _start_open_telemetry_observability( 304 otel_o11y: OpenTelemetryObservability, 305) -> None: 306 global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement 307 with _observability_lock: 308 if _OPEN_TELEMETRY_OBSERVABILITY is None: 309 _OPEN_TELEMETRY_OBSERVABILITY = otel_o11y 310 _OPEN_TELEMETRY_OBSERVABILITY.observability_init() 311 else: 312 raise RuntimeError( 313 "gPRC Python observability was already initialized!" 314 ) 315 316 317def _end_open_telemetry_observability() -> None: 318 global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement 319 with _observability_lock: 320 if not _OPEN_TELEMETRY_OBSERVABILITY: 321 raise RuntimeError( 322 "Trying to end gPRC Python observability without initialize first!" 323 ) 324 else: 325 _OPEN_TELEMETRY_OBSERVABILITY.observability_deinit() 326 _OPEN_TELEMETRY_OBSERVABILITY = None 327