1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "observability_util.h"
16
17 #include <chrono>
18 #include <cstdlib>
19 #include <map>
20 #include <string>
21
22 #include "absl/status/statusor.h"
23 #include "absl/strings/string_view.h"
24 #include "absl/types/optional.h"
25 #include "client_call_tracer.h"
26 #include "constants.h"
27 #include "python_census_context.h"
28 #include "server_call_tracer.h"
29
30 #include <grpc/support/log.h>
31
32 namespace grpc_observability {
33
34 std::queue<CensusData>* g_census_data_buffer;
35 std::mutex g_census_data_buffer_mutex;
36 std::condition_variable g_census_data_buffer_cv;
37 // TODO(xuanwn): Change below to a more appropriate number.
38 // Assume buffer will store 100 CensusData and start export when buffer is 70%
39 // full.
40 constexpr float kExportThreshold = 0.7;
41 constexpr int kMaxExportBufferSize = 10000;
42
43 namespace {
44
GetExportThreadHold()45 float GetExportThreadHold() {
46 const char* value = std::getenv("GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD");
47 if (value != nullptr) {
48 return std::stof(value);
49 }
50 return kExportThreshold;
51 }
52
GetMaxExportBufferSize()53 int GetMaxExportBufferSize() {
54 const char* value = std::getenv("GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE");
55 if (value != nullptr) {
56 return std::stoi(value);
57 }
58 return kMaxExportBufferSize;
59 }
60
61 } // namespace
62
RecordIntMetric(MetricsName name,int64_t value,const std::vector<Label> & labels)63 void RecordIntMetric(MetricsName name, int64_t value,
64 const std::vector<Label>& labels) {
65 Measurement measurement_data;
66 measurement_data.type = kMeasurementInt;
67 measurement_data.name = name;
68 measurement_data.value.value_int = value;
69
70 CensusData data = CensusData(measurement_data, labels);
71 AddCensusDataToBuffer(data);
72 }
73
RecordDoubleMetric(MetricsName name,double value,const std::vector<Label> & labels)74 void RecordDoubleMetric(MetricsName name, double value,
75 const std::vector<Label>& labels) {
76 Measurement measurement_data;
77 measurement_data.type = kMeasurementDouble;
78 measurement_data.name = name;
79 measurement_data.value.value_double = value;
80
81 CensusData data = CensusData(measurement_data, labels);
82 AddCensusDataToBuffer(data);
83 }
84
RecordSpan(const SpanCensusData & span_census_data)85 void RecordSpan(const SpanCensusData& span_census_data) {
86 CensusData data = CensusData(span_census_data);
87 AddCensusDataToBuffer(data);
88 }
89
NativeObservabilityInit()90 void NativeObservabilityInit() {
91 g_census_data_buffer = new std::queue<CensusData>;
92 }
93
CreateClientCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id)94 void* CreateClientCallTracer(const char* method, const char* target,
95 const char* trace_id, const char* parent_span_id) {
96 void* client_call_tracer = new PythonOpenCensusCallTracer(
97 method, target, trace_id, parent_span_id, PythonCensusTracingEnabled());
98 return client_call_tracer;
99 }
100
CreateServerCallTracerFactory()101 void* CreateServerCallTracerFactory() {
102 void* server_call_tracer_factory =
103 new PythonOpenCensusServerCallTracerFactory();
104 return server_call_tracer_factory;
105 }
106
AwaitNextBatchLocked(std::unique_lock<std::mutex> & lock,int timeout_ms)107 void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms) {
108 auto now = std::chrono::system_clock::now();
109 g_census_data_buffer_cv.wait_until(
110 lock, now + std::chrono::milliseconds(timeout_ms));
111 }
112
AddCensusDataToBuffer(const CensusData & data)113 void AddCensusDataToBuffer(const CensusData& data) {
114 std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex);
115 if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) {
116 gpr_log(GPR_DEBUG,
117 "Reached maximum census data buffer size, discarding this "
118 "CensusData entry");
119 } else {
120 g_census_data_buffer->push(data);
121 }
122 if (g_census_data_buffer->size() >=
123 (GetExportThreadHold() * GetMaxExportBufferSize())) {
124 g_census_data_buffer_cv.notify_all();
125 }
126 }
127
StatusCodeToString(grpc_status_code code)128 absl::string_view StatusCodeToString(grpc_status_code code) {
129 switch (code) {
130 case GRPC_STATUS_OK:
131 return "OK";
132 case GRPC_STATUS_CANCELLED:
133 return "CANCELLED";
134 case GRPC_STATUS_UNKNOWN:
135 return "UNKNOWN";
136 case GRPC_STATUS_INVALID_ARGUMENT:
137 return "INVALID_ARGUMENT";
138 case GRPC_STATUS_DEADLINE_EXCEEDED:
139 return "DEADLINE_EXCEEDED";
140 case GRPC_STATUS_NOT_FOUND:
141 return "NOT_FOUND";
142 case GRPC_STATUS_ALREADY_EXISTS:
143 return "ALREADY_EXISTS";
144 case GRPC_STATUS_PERMISSION_DENIED:
145 return "PERMISSION_DENIED";
146 case GRPC_STATUS_UNAUTHENTICATED:
147 return "UNAUTHENTICATED";
148 case GRPC_STATUS_RESOURCE_EXHAUSTED:
149 return "RESOURCE_EXHAUSTED";
150 case GRPC_STATUS_FAILED_PRECONDITION:
151 return "FAILED_PRECONDITION";
152 case GRPC_STATUS_ABORTED:
153 return "ABORTED";
154 case GRPC_STATUS_OUT_OF_RANGE:
155 return "OUT_OF_RANGE";
156 case GRPC_STATUS_UNIMPLEMENTED:
157 return "UNIMPLEMENTED";
158 case GRPC_STATUS_INTERNAL:
159 return "INTERNAL";
160 case GRPC_STATUS_UNAVAILABLE:
161 return "UNAVAILABLE";
162 case GRPC_STATUS_DATA_LOSS:
163 return "DATA_LOSS";
164 default:
165 // gRPC wants users of this enum to include a default branch so that
166 // adding values is not a breaking change.
167 return "UNKNOWN_STATUS";
168 }
169 }
170
171 } // namespace grpc_observability
172