1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "observability_util.h"
16 
17 #include <chrono>
18 #include <cstdlib>
19 #include <map>
20 #include <string>
21 
22 #include "absl/status/statusor.h"
23 #include "absl/strings/string_view.h"
24 #include "absl/types/optional.h"
25 #include "client_call_tracer.h"
26 #include "constants.h"
27 #include "python_census_context.h"
28 #include "server_call_tracer.h"
29 
30 #include <grpc/support/log.h>
31 
32 namespace grpc_observability {
33 
34 std::queue<CensusData>* g_census_data_buffer;
35 std::mutex g_census_data_buffer_mutex;
36 std::condition_variable g_census_data_buffer_cv;
37 // TODO(xuanwn): Change below to a more appropriate number.
38 // Assume buffer will store 100 CensusData and start export when buffer is 70%
39 // full.
40 constexpr float kExportThreshold = 0.7;
41 constexpr int kMaxExportBufferSize = 10000;
42 
43 namespace {
44 
GetExportThreadHold()45 float GetExportThreadHold() {
46   const char* value = std::getenv("GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD");
47   if (value != nullptr) {
48     return std::stof(value);
49   }
50   return kExportThreshold;
51 }
52 
GetMaxExportBufferSize()53 int GetMaxExportBufferSize() {
54   const char* value = std::getenv("GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE");
55   if (value != nullptr) {
56     return std::stoi(value);
57   }
58   return kMaxExportBufferSize;
59 }
60 
61 }  // namespace
62 
RecordIntMetric(MetricsName name,int64_t value,const std::vector<Label> & labels)63 void RecordIntMetric(MetricsName name, int64_t value,
64                      const std::vector<Label>& labels) {
65   Measurement measurement_data;
66   measurement_data.type = kMeasurementInt;
67   measurement_data.name = name;
68   measurement_data.value.value_int = value;
69 
70   CensusData data = CensusData(measurement_data, labels);
71   AddCensusDataToBuffer(data);
72 }
73 
RecordDoubleMetric(MetricsName name,double value,const std::vector<Label> & labels)74 void RecordDoubleMetric(MetricsName name, double value,
75                         const std::vector<Label>& labels) {
76   Measurement measurement_data;
77   measurement_data.type = kMeasurementDouble;
78   measurement_data.name = name;
79   measurement_data.value.value_double = value;
80 
81   CensusData data = CensusData(measurement_data, labels);
82   AddCensusDataToBuffer(data);
83 }
84 
RecordSpan(const SpanCensusData & span_census_data)85 void RecordSpan(const SpanCensusData& span_census_data) {
86   CensusData data = CensusData(span_census_data);
87   AddCensusDataToBuffer(data);
88 }
89 
NativeObservabilityInit()90 void NativeObservabilityInit() {
91   g_census_data_buffer = new std::queue<CensusData>;
92 }
93 
CreateClientCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id)94 void* CreateClientCallTracer(const char* method, const char* target,
95                              const char* trace_id, const char* parent_span_id) {
96   void* client_call_tracer = new PythonOpenCensusCallTracer(
97       method, target, trace_id, parent_span_id, PythonCensusTracingEnabled());
98   return client_call_tracer;
99 }
100 
CreateServerCallTracerFactory()101 void* CreateServerCallTracerFactory() {
102   void* server_call_tracer_factory =
103       new PythonOpenCensusServerCallTracerFactory();
104   return server_call_tracer_factory;
105 }
106 
AwaitNextBatchLocked(std::unique_lock<std::mutex> & lock,int timeout_ms)107 void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms) {
108   auto now = std::chrono::system_clock::now();
109   g_census_data_buffer_cv.wait_until(
110       lock, now + std::chrono::milliseconds(timeout_ms));
111 }
112 
AddCensusDataToBuffer(const CensusData & data)113 void AddCensusDataToBuffer(const CensusData& data) {
114   std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex);
115   if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) {
116     gpr_log(GPR_DEBUG,
117             "Reached maximum census data buffer size, discarding this "
118             "CensusData entry");
119   } else {
120     g_census_data_buffer->push(data);
121   }
122   if (g_census_data_buffer->size() >=
123       (GetExportThreadHold() * GetMaxExportBufferSize())) {
124     g_census_data_buffer_cv.notify_all();
125   }
126 }
127 
StatusCodeToString(grpc_status_code code)128 absl::string_view StatusCodeToString(grpc_status_code code) {
129   switch (code) {
130     case GRPC_STATUS_OK:
131       return "OK";
132     case GRPC_STATUS_CANCELLED:
133       return "CANCELLED";
134     case GRPC_STATUS_UNKNOWN:
135       return "UNKNOWN";
136     case GRPC_STATUS_INVALID_ARGUMENT:
137       return "INVALID_ARGUMENT";
138     case GRPC_STATUS_DEADLINE_EXCEEDED:
139       return "DEADLINE_EXCEEDED";
140     case GRPC_STATUS_NOT_FOUND:
141       return "NOT_FOUND";
142     case GRPC_STATUS_ALREADY_EXISTS:
143       return "ALREADY_EXISTS";
144     case GRPC_STATUS_PERMISSION_DENIED:
145       return "PERMISSION_DENIED";
146     case GRPC_STATUS_UNAUTHENTICATED:
147       return "UNAUTHENTICATED";
148     case GRPC_STATUS_RESOURCE_EXHAUSTED:
149       return "RESOURCE_EXHAUSTED";
150     case GRPC_STATUS_FAILED_PRECONDITION:
151       return "FAILED_PRECONDITION";
152     case GRPC_STATUS_ABORTED:
153       return "ABORTED";
154     case GRPC_STATUS_OUT_OF_RANGE:
155       return "OUT_OF_RANGE";
156     case GRPC_STATUS_UNIMPLEMENTED:
157       return "UNIMPLEMENTED";
158     case GRPC_STATUS_INTERNAL:
159       return "INTERNAL";
160     case GRPC_STATUS_UNAVAILABLE:
161       return "UNAVAILABLE";
162     case GRPC_STATUS_DATA_LOSS:
163       return "DATA_LOSS";
164     default:
165       // gRPC wants users of this enum to include a default branch so that
166       // adding values is not a breaking change.
167       return "UNKNOWN_STATUS";
168   }
169 }
170 
171 }  // namespace grpc_observability
172