xref: /aosp_15_r20/external/grpc-grpc/src/cpp/ext/otel/otel_client_call_tracer.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/ext/otel/otel_client_call_tracer.h"
22 
23 #include <stdint.h>
24 
25 #include <array>
26 #include <functional>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 
31 #include "absl/functional/any_invocable.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/strings/strip.h"
36 #include "absl/time/clock.h"
37 #include "absl/time/time.h"
38 #include "absl/types/optional.h"
39 #include "absl/types/span.h"
40 #include "opentelemetry/context/context.h"
41 #include "opentelemetry/metrics/sync_instruments.h"
42 
43 #include <grpc/status.h>
44 #include <grpc/support/log.h>
45 #include <grpc/support/time.h>
46 
47 #include "src/core/client_channel/client_channel_filter.h"
48 #include "src/core/lib/channel/channel_stack.h"
49 #include "src/core/lib/channel/context.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/channel/tcp_tracer.h"
52 #include "src/core/lib/gprpp/sync.h"
53 #include "src/core/lib/promise/context.h"
54 #include "src/core/lib/resource_quota/arena.h"
55 #include "src/core/lib/slice/slice.h"
56 #include "src/core/lib/slice/slice_buffer.h"
57 #include "src/core/lib/transport/metadata_batch.h"
58 #include "src/cpp/ext/otel/key_value_iterable.h"
59 #include "src/cpp/ext/otel/otel_plugin.h"
60 
61 namespace grpc {
62 namespace internal {
63 
64 //
65 // OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer
66 //
67 
CallAttemptTracer(const OpenTelemetryPlugin::ClientCallTracer * parent,bool arena_allocated)68 OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
69     const OpenTelemetryPlugin::ClientCallTracer* parent, bool arena_allocated)
70     : parent_(parent),
71       arena_allocated_(arena_allocated),
72       start_time_(absl::Now()) {
73   if (parent_->otel_plugin_->client_.attempt.started != nullptr) {
74     std::array<std::pair<absl::string_view, absl::string_view>, 2>
75         additional_labels = {
76             {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
77              {OpenTelemetryTargetKey(),
78               parent_->scope_config_->filtered_target()}}};
79     // We might not have all the injected labels that we want at this point, so
80     // avoid recording a subset of injected labels here.
81     parent_->otel_plugin_->client_.attempt.started->Add(
82         1, KeyValueIterable(
83                /*injected_labels_from_plugin_options=*/{}, additional_labels,
84                /*active_plugin_options_view=*/nullptr,
85                /*optional_labels=*/{},
86                /*is_client=*/true, parent_->otel_plugin_));
87   }
88 }
89 
90 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)91     RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
92   parent_->scope_config_->active_plugin_options_view().ForEach(
93       [&](const InternalOpenTelemetryPluginOption& plugin_option,
94           size_t /*index*/) {
95         auto* labels_injector = plugin_option.labels_injector();
96         if (labels_injector != nullptr) {
97           injected_labels_from_plugin_options_.push_back(
98               labels_injector->GetLabels(recv_initial_metadata));
99         }
100         return true;
101       },
102       parent_->otel_plugin_);
103 }
104 
105 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)106     RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
107   parent_->scope_config_->active_plugin_options_view().ForEach(
108       [&](const InternalOpenTelemetryPluginOption& plugin_option,
109           size_t /*index*/) {
110         auto* labels_injector = plugin_option.labels_injector();
111         if (labels_injector != nullptr) {
112           labels_injector->AddLabels(send_initial_metadata, nullptr);
113         }
114         return true;
115       },
116       parent_->otel_plugin_);
117 }
118 
119 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer & send_message)120     RecordSendMessage(const grpc_core::SliceBuffer& send_message) {
121   RecordAnnotation(
122       absl::StrFormat("Send message: %ld bytes", send_message.Length()));
123 }
124 
125 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)126     RecordSendCompressedMessage(
127         const grpc_core::SliceBuffer& send_compressed_message) {
128   RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
129                                    send_compressed_message.Length()));
130 }
131 
132 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)133     RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) {
134   RecordAnnotation(
135       absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
136 }
137 
138 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)139     RecordReceivedDecompressedMessage(
140         const grpc_core::SliceBuffer& recv_decompressed_message) {
141   RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
142                                    recv_decompressed_message.Length()));
143 }
144 
145 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch *,const grpc_transport_stream_stats * transport_stream_stats)146     RecordReceivedTrailingMetadata(
147         absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
148         const grpc_transport_stream_stats* transport_stream_stats) {
149   std::array<std::pair<absl::string_view, absl::string_view>, 3>
150       additional_labels = {
151           {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
152            {OpenTelemetryTargetKey(),
153             parent_->scope_config_->filtered_target()},
154            {OpenTelemetryStatusKey(),
155             grpc_status_code_to_string(
156                 static_cast<grpc_status_code>(status.code()))}}};
157   KeyValueIterable labels(
158       injected_labels_from_plugin_options_, additional_labels,
159       &parent_->scope_config_->active_plugin_options_view(), optional_labels_,
160       /*is_client=*/true, parent_->otel_plugin_);
161   if (parent_->otel_plugin_->client_.attempt.duration != nullptr) {
162     parent_->otel_plugin_->client_.attempt.duration->Record(
163         absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
164         opentelemetry::context::Context{});
165   }
166   if (parent_->otel_plugin_->client_.attempt
167           .sent_total_compressed_message_size != nullptr) {
168     parent_->otel_plugin_->client_.attempt.sent_total_compressed_message_size
169         ->Record(transport_stream_stats != nullptr
170                      ? transport_stream_stats->outgoing.data_bytes
171                      : 0,
172                  labels, opentelemetry::context::Context{});
173   }
174   if (parent_->otel_plugin_->client_.attempt
175           .rcvd_total_compressed_message_size != nullptr) {
176     parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size
177         ->Record(transport_stream_stats != nullptr
178                      ? transport_stream_stats->incoming.data_bytes
179                      : 0,
180                  labels, opentelemetry::context::Context{});
181   }
182 }
183 
RecordCancel(absl::Status)184 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordCancel(
185     absl::Status /*cancel_error*/) {}
186 
RecordEnd(const gpr_timespec &)187 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordEnd(
188     const gpr_timespec& /*latency*/) {
189   if (arena_allocated_) {
190     this->~CallAttemptTracer();
191   } else {
192     delete this;
193   }
194 }
195 
RecordAnnotation(absl::string_view)196 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation(
197     absl::string_view /*annotation*/) {
198   // Not implemented
199 }
200 
RecordAnnotation(const Annotation &)201 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation(
202     const Annotation& /*annotation*/) {
203   // Not implemented
204 }
205 
206 std::shared_ptr<grpc_core::TcpTracerInterface>
StartNewTcpTrace()207 OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() {
208   // No TCP trace.
209   return nullptr;
210 }
211 
SetOptionalLabel(OptionalLabelKey key,grpc_core::RefCountedStringValue value)212 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::SetOptionalLabel(
213     OptionalLabelKey key, grpc_core::RefCountedStringValue value) {
214   GPR_ASSERT(key < OptionalLabelKey::kSize);
215   optional_labels_[static_cast<size_t>(key)] = std::move(value);
216 }
217 
218 //
219 // OpenTelemetryPlugin::ClientCallTracer
220 //
221 
ClientCallTracer(const grpc_core::Slice & path,grpc_core::Arena * arena,bool registered_method,OpenTelemetryPlugin * otel_plugin,std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config)222 OpenTelemetryPlugin::ClientCallTracer::ClientCallTracer(
223     const grpc_core::Slice& path, grpc_core::Arena* arena,
224     bool registered_method, OpenTelemetryPlugin* otel_plugin,
225     std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config)
226     : path_(path.Ref()),
227       arena_(arena),
228       registered_method_(registered_method),
229       otel_plugin_(otel_plugin),
230       scope_config_(std::move(scope_config)) {}
231 
~ClientCallTracer()232 OpenTelemetryPlugin::ClientCallTracer::~ClientCallTracer() {}
233 
234 OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)235 OpenTelemetryPlugin::ClientCallTracer::StartNewAttempt(
236     bool is_transparent_retry) {
237   // We allocate the first attempt on the arena and all subsequent attempts
238   // on the heap, so that in the common case we don't require a heap
239   // allocation, nor do we unnecessarily grow the arena.
240   bool is_first_attempt = true;
241   {
242     grpc_core::MutexLock lock(&mu_);
243     if (transparent_retries_ != 0 || retries_ != 0) {
244       is_first_attempt = false;
245     }
246     if (is_transparent_retry) {
247       ++transparent_retries_;
248     } else {
249       ++retries_;
250     }
251   }
252   if (is_first_attempt) {
253     return arena_->New<CallAttemptTracer>(this, /*arena_allocated=*/true);
254   }
255   return new CallAttemptTracer(this, /*arena_allocated=*/false);
256 }
257 
MethodForStats() const258 absl::string_view OpenTelemetryPlugin::ClientCallTracer::MethodForStats()
259     const {
260   absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
261   if (registered_method_ ||
262       (otel_plugin_->generic_method_attribute_filter() != nullptr &&
263        otel_plugin_->generic_method_attribute_filter()(method))) {
264     return method;
265   }
266   return "other";
267 }
268 
RecordAnnotation(absl::string_view)269 void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation(
270     absl::string_view /*annotation*/) {
271   // Not implemented
272 }
273 
RecordAnnotation(const Annotation &)274 void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation(
275     const Annotation& /*annotation*/) {
276   // Not implemented
277 }
278 
279 }  // namespace internal
280 }  // namespace grpc
281