1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "client_call_tracer.h"
16 
17 #include <stddef.h>
18 
19 #include <algorithm>
20 #include <vector>
21 
22 #include "absl/strings/str_cat.h"
23 #include "absl/time/clock.h"
24 #include "constants.h"
25 #include "observability_util.h"
26 #include "python_census_context.h"
27 
28 #include <grpc/slice.h>
29 
30 #include "src/core/lib/slice/slice.h"
31 
32 namespace grpc_observability {
33 
34 constexpr uint32_t PythonOpenCensusCallTracer::
35     PythonOpenCensusCallAttemptTracer::kMaxTraceContextLen;
36 constexpr uint32_t
37     PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::kMaxTagsLen;
38 
39 //
40 // OpenCensusCallTracer
41 //
42 
PythonOpenCensusCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id,bool tracing_enabled)43 PythonOpenCensusCallTracer::PythonOpenCensusCallTracer(
44     const char* method, const char* target, const char* trace_id,
45     const char* parent_span_id, bool tracing_enabled)
46     : method_(GetMethod(method)),
47       target_(GetTarget(target)),
48       tracing_enabled_(tracing_enabled) {
49   GenerateClientContext(absl::StrCat("Sent.", method_),
50                         absl::string_view(trace_id),
51                         absl::string_view(parent_span_id), &context_);
52 }
53 
GenerateContext()54 void PythonOpenCensusCallTracer::GenerateContext() {}
55 
RecordAnnotation(absl::string_view annotation)56 void PythonOpenCensusCallTracer::RecordAnnotation(
57     absl::string_view annotation) {
58   if (!context_.GetSpanContext().IsSampled()) {
59     return;
60   }
61   context_.AddSpanAnnotation(annotation);
62 }
63 
RecordAnnotation(const Annotation & annotation)64 void PythonOpenCensusCallTracer::RecordAnnotation(
65     const Annotation& annotation) {
66   if (!context_.GetSpanContext().IsSampled()) {
67     return;
68   }
69 
70   switch (annotation.type()) {
71     // Annotations are expensive to create. We should only create it if the call
72     // is being sampled by default.
73     default:
74       if (IsSampled()) {
75         context_.AddSpanAnnotation(annotation.ToString());
76       }
77       break;
78   }
79 }
80 
~PythonOpenCensusCallTracer()81 PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() {
82   if (PythonCensusStatsEnabled()) {
83     context_.Labels().emplace_back(kClientMethod, std::string(method_));
84     RecordIntMetric(kRpcClientRetriesPerCallMeasureName, retries_ - 1,
85                     context_.Labels());  // exclude first attempt
86     RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName,
87                     transparent_retries_, context_.Labels());
88     RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName,
89                        ToDoubleSeconds(retry_delay_), context_.Labels());
90   }
91 
92   if (tracing_enabled_) {
93     context_.EndSpan();
94     if (IsSampled()) {
95       RecordSpan(context_.GetSpan().ToCensusData());
96     }
97   }
98 }
99 
100 PythonCensusContext
CreateCensusContextForCallAttempt()101 PythonOpenCensusCallTracer::CreateCensusContextForCallAttempt() {
102   auto context = PythonCensusContext(absl::StrCat("Attempt.", method_),
103                                      &(context_.GetSpan()), context_.Labels());
104   return context;
105 }
106 
107 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)108 PythonOpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
109   uint64_t attempt_num;
110   {
111     grpc_core::MutexLock lock(&mu_);
112     if (transparent_retries_ != 0 || retries_ != 0) {
113       if (PythonCensusStatsEnabled() && num_active_rpcs_ == 0) {
114         retry_delay_ += absl::Now() - time_at_last_attempt_end_;
115       }
116     }
117     attempt_num = retries_;
118     if (is_transparent_retry) {
119       ++transparent_retries_;
120     } else {
121       ++retries_;
122     }
123     ++num_active_rpcs_;
124   }
125   context_.IncreaseChildSpanCount();
126   return new PythonOpenCensusCallAttemptTracer(this, attempt_num,
127                                                is_transparent_retry);
128 }
129 
130 //
131 // PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer
132 //
133 
134 PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer * parent,uint64_t attempt_num,bool is_transparent_retry)135     PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent,
136                                       uint64_t attempt_num,
137                                       bool is_transparent_retry)
138     : parent_(parent),
139       context_(parent_->CreateCensusContextForCallAttempt()),
140       start_time_(absl::Now()) {
141   if (parent_->tracing_enabled_) {
142     context_.AddSpanAttribute("previous-rpc-attempts",
143                               absl::StrCat(attempt_num));
144     context_.AddSpanAttribute("transparent-retry",
145                               absl::StrCat(is_transparent_retry));
146   }
147   if (!PythonCensusStatsEnabled()) {
148     return;
149   }
150   context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
151   context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_));
152   RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels());
153 }
154 
155 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)156     RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
157   if (parent_->tracing_enabled_) {
158     char tracing_buf[kMaxTraceContextLen];
159     size_t tracing_len =
160         TraceContextSerialize(context_, tracing_buf, kMaxTraceContextLen);
161     if (tracing_len > 0) {
162       send_initial_metadata->Set(
163           grpc_core::GrpcTraceBinMetadata(),
164           grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
165     }
166   }
167   if (!PythonCensusStatsEnabled()) {
168     return;
169   }
170   grpc_slice tags = grpc_empty_slice();
171   size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
172   if (encoded_tags_len > 0) {
173     send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
174                                grpc_core::Slice(tags));
175   }
176 }
177 
178 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer &)179     RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) {
180   ++sent_message_count_;
181 }
182 
183 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer &)184     RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/) {
185   ++recv_message_count_;
186 }
187 
188 std::shared_ptr<grpc_core::TcpTracerInterface> PythonOpenCensusCallTracer::
StartNewTcpTrace()189     PythonOpenCensusCallAttemptTracer::StartNewTcpTrace() {
190   return nullptr;
191 }
192 
193 namespace {
194 
195 // Returns 0 if no server stats are present in the metadata.
GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch * b)196 uint64_t GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch* b) {
197   if (!PythonCensusStatsEnabled()) {
198     return 0;
199   }
200 
201   const grpc_core::Slice* grpc_server_stats_bin_ptr =
202       b->get_pointer(grpc_core::GrpcServerStatsBinMetadata());
203   if (grpc_server_stats_bin_ptr == nullptr) {
204     return 0;
205   }
206 
207   uint64_t elapsed_time = 0;
208   ServerStatsDeserialize(
209       reinterpret_cast<const char*>(grpc_server_stats_bin_ptr->data()),
210       grpc_server_stats_bin_ptr->size(), &elapsed_time);
211   return elapsed_time;
212 }
213 
214 }  // namespace
215 
216 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)217     RecordReceivedTrailingMetadata(
218         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
219         const grpc_transport_stream_stats* transport_stream_stats) {
220   if (!PythonCensusStatsEnabled()) {
221     return;
222   }
223   auto status_code_ = status.code();
224   uint64_t elapsed_time = 0;
225   if (recv_trailing_metadata != nullptr) {
226     elapsed_time = GetElapsedTimeFromTrailingMetadata(recv_trailing_metadata);
227   }
228 
229   std::string final_status = absl::StatusCodeToString(status_code_);
230   context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
231   context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_));
232   context_.Labels().emplace_back(kClientStatus, final_status);
233   RecordDoubleMetric(
234       kRpcClientSentBytesPerRpcMeasureName,
235       static_cast<double>(transport_stream_stats != nullptr
236                               ? transport_stream_stats->outgoing.data_bytes
237                               : 0),
238       context_.Labels());
239   RecordDoubleMetric(
240       kRpcClientReceivedBytesPerRpcMeasureName,
241       static_cast<double>(transport_stream_stats != nullptr
242                               ? transport_stream_stats->incoming.data_bytes
243                               : 0),
244       context_.Labels());
245   RecordDoubleMetric(kRpcClientServerLatencyMeasureName,
246                      absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)),
247                      context_.Labels());
248   RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName,
249                      absl::ToDoubleSeconds(absl::Now() - start_time_),
250                      context_.Labels());
251   RecordIntMetric(kRpcClientCompletedRpcMeasureName, 1, context_.Labels());
252 }
253 
254 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordCancel(absl::Status)255     RecordCancel(absl::Status /*cancel_error*/) {}
256 
RecordEnd(const gpr_timespec &)257 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::RecordEnd(
258     const gpr_timespec& /*latency*/) {
259   if (PythonCensusStatsEnabled()) {
260     context_.Labels().emplace_back(kClientMethod,
261                                    std::string(parent_->method_));
262     context_.Labels().emplace_back(kClientStatus,
263                                    StatusCodeToString(status_code_));
264     RecordIntMetric(kRpcClientSentMessagesPerRpcMeasureName,
265                     sent_message_count_, context_.Labels());
266     RecordIntMetric(kRpcClientReceivedMessagesPerRpcMeasureName,
267                     recv_message_count_, context_.Labels());
268 
269     grpc_core::MutexLock lock(&parent_->mu_);
270     if (--parent_->num_active_rpcs_ == 0) {
271       parent_->time_at_last_attempt_end_ = absl::Now();
272     }
273   }
274 
275   if (parent_->tracing_enabled_) {
276     if (status_code_ != absl::StatusCode::kOk) {
277       context_.GetSpan().SetStatus(StatusCodeToString(status_code_));
278     }
279     context_.EndSpan();
280     if (IsSampled()) {
281       RecordSpan(context_.GetSpan().ToCensusData());
282     }
283   }
284 
285   // After RecordEnd, Core will make no further usage of this CallAttemptTracer,
286   // so we are free it here.
287   delete this;
288 }
289 
290 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(absl::string_view annotation)291     RecordAnnotation(absl::string_view annotation) {
292   if (!context_.GetSpanContext().IsSampled()) {
293     return;
294   }
295   context_.AddSpanAnnotation(annotation);
296 }
297 
298 void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(const Annotation & annotation)299     RecordAnnotation(const Annotation& annotation) {
300   if (!context_.GetSpanContext().IsSampled()) {
301     return;
302   }
303 
304   switch (annotation.type()) {
305     // Annotations are expensive to create. We should only create it if the call
306     // is being sampled by default.
307     default:
308       if (IsSampled()) {
309         context_.AddSpanAnnotation(annotation.ToString());
310       }
311       break;
312   }
313 }
314 
315 }  // namespace grpc_observability
316