1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "client_call_tracer.h"
16
17 #include <stddef.h>
18
19 #include <algorithm>
20 #include <vector>
21
22 #include "absl/strings/str_cat.h"
23 #include "absl/time/clock.h"
24 #include "constants.h"
25 #include "observability_util.h"
26 #include "python_census_context.h"
27
28 #include <grpc/slice.h>
29
30 #include "src/core/lib/slice/slice.h"
31
32 namespace grpc_observability {
33
34 constexpr uint32_t PythonOpenCensusCallTracer::
35 PythonOpenCensusCallAttemptTracer::kMaxTraceContextLen;
36 constexpr uint32_t
37 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::kMaxTagsLen;
38
39 //
40 // OpenCensusCallTracer
41 //
42
PythonOpenCensusCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id,bool tracing_enabled)43 PythonOpenCensusCallTracer::PythonOpenCensusCallTracer(
44 const char* method, const char* target, const char* trace_id,
45 const char* parent_span_id, bool tracing_enabled)
46 : method_(GetMethod(method)),
47 target_(GetTarget(target)),
48 tracing_enabled_(tracing_enabled) {
49 GenerateClientContext(absl::StrCat("Sent.", method_),
50 absl::string_view(trace_id),
51 absl::string_view(parent_span_id), &context_);
52 }
53
GenerateContext()54 void PythonOpenCensusCallTracer::GenerateContext() {}
55
RecordAnnotation(absl::string_view annotation)56 void PythonOpenCensusCallTracer::RecordAnnotation(
57 absl::string_view annotation) {
58 if (!context_.GetSpanContext().IsSampled()) {
59 return;
60 }
61 context_.AddSpanAnnotation(annotation);
62 }
63
RecordAnnotation(const Annotation & annotation)64 void PythonOpenCensusCallTracer::RecordAnnotation(
65 const Annotation& annotation) {
66 if (!context_.GetSpanContext().IsSampled()) {
67 return;
68 }
69
70 switch (annotation.type()) {
71 // Annotations are expensive to create. We should only create it if the call
72 // is being sampled by default.
73 default:
74 if (IsSampled()) {
75 context_.AddSpanAnnotation(annotation.ToString());
76 }
77 break;
78 }
79 }
80
~PythonOpenCensusCallTracer()81 PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() {
82 if (PythonCensusStatsEnabled()) {
83 context_.Labels().emplace_back(kClientMethod, std::string(method_));
84 RecordIntMetric(kRpcClientRetriesPerCallMeasureName, retries_ - 1,
85 context_.Labels()); // exclude first attempt
86 RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName,
87 transparent_retries_, context_.Labels());
88 RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName,
89 ToDoubleSeconds(retry_delay_), context_.Labels());
90 }
91
92 if (tracing_enabled_) {
93 context_.EndSpan();
94 if (IsSampled()) {
95 RecordSpan(context_.GetSpan().ToCensusData());
96 }
97 }
98 }
99
100 PythonCensusContext
CreateCensusContextForCallAttempt()101 PythonOpenCensusCallTracer::CreateCensusContextForCallAttempt() {
102 auto context = PythonCensusContext(absl::StrCat("Attempt.", method_),
103 &(context_.GetSpan()), context_.Labels());
104 return context;
105 }
106
107 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)108 PythonOpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
109 uint64_t attempt_num;
110 {
111 grpc_core::MutexLock lock(&mu_);
112 if (transparent_retries_ != 0 || retries_ != 0) {
113 if (PythonCensusStatsEnabled() && num_active_rpcs_ == 0) {
114 retry_delay_ += absl::Now() - time_at_last_attempt_end_;
115 }
116 }
117 attempt_num = retries_;
118 if (is_transparent_retry) {
119 ++transparent_retries_;
120 } else {
121 ++retries_;
122 }
123 ++num_active_rpcs_;
124 }
125 context_.IncreaseChildSpanCount();
126 return new PythonOpenCensusCallAttemptTracer(this, attempt_num,
127 is_transparent_retry);
128 }
129
130 //
131 // PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer
132 //
133
134 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry)135 PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent,
136 uint64_t attempt_num,
137 bool is_transparent_retry)
138 : parent_(parent),
139 context_(parent_->CreateCensusContextForCallAttempt()),
140 start_time_(absl::Now()) {
141 if (parent_->tracing_enabled_) {
142 context_.AddSpanAttribute("previous-rpc-attempts",
143 absl::StrCat(attempt_num));
144 context_.AddSpanAttribute("transparent-retry",
145 absl::StrCat(is_transparent_retry));
146 }
147 if (!PythonCensusStatsEnabled()) {
148 return;
149 }
150 context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
151 context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_));
152 RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels());
153 }
154
155 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)156 RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
157 if (parent_->tracing_enabled_) {
158 char tracing_buf[kMaxTraceContextLen];
159 size_t tracing_len =
160 TraceContextSerialize(context_, tracing_buf, kMaxTraceContextLen);
161 if (tracing_len > 0) {
162 send_initial_metadata->Set(
163 grpc_core::GrpcTraceBinMetadata(),
164 grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
165 }
166 }
167 if (!PythonCensusStatsEnabled()) {
168 return;
169 }
170 grpc_slice tags = grpc_empty_slice();
171 size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
172 if (encoded_tags_len > 0) {
173 send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
174 grpc_core::Slice(tags));
175 }
176 }
177
178 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer &)179 RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) {
180 ++sent_message_count_;
181 }
182
183 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer &)184 RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/) {
185 ++recv_message_count_;
186 }
187
188 std::shared_ptr<grpc_core::TcpTracerInterface> PythonOpenCensusCallTracer::
StartNewTcpTrace()189 PythonOpenCensusCallAttemptTracer::StartNewTcpTrace() {
190 return nullptr;
191 }
192
193 namespace {
194
195 // Returns 0 if no server stats are present in the metadata.
GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch * b)196 uint64_t GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch* b) {
197 if (!PythonCensusStatsEnabled()) {
198 return 0;
199 }
200
201 const grpc_core::Slice* grpc_server_stats_bin_ptr =
202 b->get_pointer(grpc_core::GrpcServerStatsBinMetadata());
203 if (grpc_server_stats_bin_ptr == nullptr) {
204 return 0;
205 }
206
207 uint64_t elapsed_time = 0;
208 ServerStatsDeserialize(
209 reinterpret_cast<const char*>(grpc_server_stats_bin_ptr->data()),
210 grpc_server_stats_bin_ptr->size(), &elapsed_time);
211 return elapsed_time;
212 }
213
214 } // namespace
215
216 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)217 RecordReceivedTrailingMetadata(
218 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
219 const grpc_transport_stream_stats* transport_stream_stats) {
220 if (!PythonCensusStatsEnabled()) {
221 return;
222 }
223 auto status_code_ = status.code();
224 uint64_t elapsed_time = 0;
225 if (recv_trailing_metadata != nullptr) {
226 elapsed_time = GetElapsedTimeFromTrailingMetadata(recv_trailing_metadata);
227 }
228
229 std::string final_status = absl::StatusCodeToString(status_code_);
230 context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
231 context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_));
232 context_.Labels().emplace_back(kClientStatus, final_status);
233 RecordDoubleMetric(
234 kRpcClientSentBytesPerRpcMeasureName,
235 static_cast<double>(transport_stream_stats != nullptr
236 ? transport_stream_stats->outgoing.data_bytes
237 : 0),
238 context_.Labels());
239 RecordDoubleMetric(
240 kRpcClientReceivedBytesPerRpcMeasureName,
241 static_cast<double>(transport_stream_stats != nullptr
242 ? transport_stream_stats->incoming.data_bytes
243 : 0),
244 context_.Labels());
245 RecordDoubleMetric(kRpcClientServerLatencyMeasureName,
246 absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)),
247 context_.Labels());
248 RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName,
249 absl::ToDoubleSeconds(absl::Now() - start_time_),
250 context_.Labels());
251 RecordIntMetric(kRpcClientCompletedRpcMeasureName, 1, context_.Labels());
252 }
253
254 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordCancel(absl::Status)255 RecordCancel(absl::Status /*cancel_error*/) {}
256
RecordEnd(const gpr_timespec &)257 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::RecordEnd(
258 const gpr_timespec& /*latency*/) {
259 if (PythonCensusStatsEnabled()) {
260 context_.Labels().emplace_back(kClientMethod,
261 std::string(parent_->method_));
262 context_.Labels().emplace_back(kClientStatus,
263 StatusCodeToString(status_code_));
264 RecordIntMetric(kRpcClientSentMessagesPerRpcMeasureName,
265 sent_message_count_, context_.Labels());
266 RecordIntMetric(kRpcClientReceivedMessagesPerRpcMeasureName,
267 recv_message_count_, context_.Labels());
268
269 grpc_core::MutexLock lock(&parent_->mu_);
270 if (--parent_->num_active_rpcs_ == 0) {
271 parent_->time_at_last_attempt_end_ = absl::Now();
272 }
273 }
274
275 if (parent_->tracing_enabled_) {
276 if (status_code_ != absl::StatusCode::kOk) {
277 context_.GetSpan().SetStatus(StatusCodeToString(status_code_));
278 }
279 context_.EndSpan();
280 if (IsSampled()) {
281 RecordSpan(context_.GetSpan().ToCensusData());
282 }
283 }
284
285 // After RecordEnd, Core will make no further usage of this CallAttemptTracer,
286 // so we are free it here.
287 delete this;
288 }
289
290 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(absl::string_view annotation)291 RecordAnnotation(absl::string_view annotation) {
292 if (!context_.GetSpanContext().IsSampled()) {
293 return;
294 }
295 context_.AddSpanAnnotation(annotation);
296 }
297
298 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(const Annotation & annotation)299 RecordAnnotation(const Annotation& annotation) {
300 if (!context_.GetSpanContext().IsSampled()) {
301 return;
302 }
303
304 switch (annotation.type()) {
305 // Annotations are expensive to create. We should only create it if the call
306 // is being sampled by default.
307 default:
308 if (IsSampled()) {
309 context_.AddSpanAnnotation(annotation.ToString());
310 }
311 break;
312 }
313 }
314
315 } // namespace grpc_observability
316