1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/ext/otel/otel_client_call_tracer.h"
22
23 #include <stdint.h>
24
25 #include <array>
26 #include <functional>
27 #include <memory>
28 #include <string>
29 #include <utility>
30
31 #include "absl/functional/any_invocable.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/str_format.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/strings/strip.h"
36 #include "absl/time/clock.h"
37 #include "absl/time/time.h"
38 #include "absl/types/optional.h"
39 #include "absl/types/span.h"
40 #include "opentelemetry/context/context.h"
41 #include "opentelemetry/metrics/sync_instruments.h"
42
43 #include <grpc/status.h>
44 #include <grpc/support/log.h>
45 #include <grpc/support/time.h>
46
47 #include "src/core/client_channel/client_channel_filter.h"
48 #include "src/core/lib/channel/channel_stack.h"
49 #include "src/core/lib/channel/context.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/channel/tcp_tracer.h"
52 #include "src/core/lib/gprpp/sync.h"
53 #include "src/core/lib/promise/context.h"
54 #include "src/core/lib/resource_quota/arena.h"
55 #include "src/core/lib/slice/slice.h"
56 #include "src/core/lib/slice/slice_buffer.h"
57 #include "src/core/lib/transport/metadata_batch.h"
58 #include "src/cpp/ext/otel/key_value_iterable.h"
59 #include "src/cpp/ext/otel/otel_plugin.h"
60
61 namespace grpc {
62 namespace internal {
63
64 //
65 // OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer
66 //
67
CallAttemptTracer(const OpenTelemetryPlugin::ClientCallTracer * parent,bool arena_allocated)68 OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
69 const OpenTelemetryPlugin::ClientCallTracer* parent, bool arena_allocated)
70 : parent_(parent),
71 arena_allocated_(arena_allocated),
72 start_time_(absl::Now()) {
73 if (parent_->otel_plugin_->client_.attempt.started != nullptr) {
74 std::array<std::pair<absl::string_view, absl::string_view>, 2>
75 additional_labels = {
76 {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
77 {OpenTelemetryTargetKey(),
78 parent_->scope_config_->filtered_target()}}};
79 // We might not have all the injected labels that we want at this point, so
80 // avoid recording a subset of injected labels here.
81 parent_->otel_plugin_->client_.attempt.started->Add(
82 1, KeyValueIterable(
83 /*injected_labels_from_plugin_options=*/{}, additional_labels,
84 /*active_plugin_options_view=*/nullptr,
85 /*optional_labels=*/{},
86 /*is_client=*/true, parent_->otel_plugin_));
87 }
88 }
89
90 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)91 RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
92 parent_->scope_config_->active_plugin_options_view().ForEach(
93 [&](const InternalOpenTelemetryPluginOption& plugin_option,
94 size_t /*index*/) {
95 auto* labels_injector = plugin_option.labels_injector();
96 if (labels_injector != nullptr) {
97 injected_labels_from_plugin_options_.push_back(
98 labels_injector->GetLabels(recv_initial_metadata));
99 }
100 return true;
101 },
102 parent_->otel_plugin_);
103 }
104
105 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)106 RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
107 parent_->scope_config_->active_plugin_options_view().ForEach(
108 [&](const InternalOpenTelemetryPluginOption& plugin_option,
109 size_t /*index*/) {
110 auto* labels_injector = plugin_option.labels_injector();
111 if (labels_injector != nullptr) {
112 labels_injector->AddLabels(send_initial_metadata, nullptr);
113 }
114 return true;
115 },
116 parent_->otel_plugin_);
117 }
118
119 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer & send_message)120 RecordSendMessage(const grpc_core::SliceBuffer& send_message) {
121 RecordAnnotation(
122 absl::StrFormat("Send message: %ld bytes", send_message.Length()));
123 }
124
125 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordSendCompressedMessage(const grpc_core::SliceBuffer & send_compressed_message)126 RecordSendCompressedMessage(
127 const grpc_core::SliceBuffer& send_compressed_message) {
128 RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
129 send_compressed_message.Length()));
130 }
131
132 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer & recv_message)133 RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) {
134 RecordAnnotation(
135 absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
136 }
137
138 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedDecompressedMessage(const grpc_core::SliceBuffer & recv_decompressed_message)139 RecordReceivedDecompressedMessage(
140 const grpc_core::SliceBuffer& recv_decompressed_message) {
141 RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
142 recv_decompressed_message.Length()));
143 }
144
145 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch *,const grpc_transport_stream_stats * transport_stream_stats)146 RecordReceivedTrailingMetadata(
147 absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
148 const grpc_transport_stream_stats* transport_stream_stats) {
149 std::array<std::pair<absl::string_view, absl::string_view>, 3>
150 additional_labels = {
151 {{OpenTelemetryMethodKey(), parent_->MethodForStats()},
152 {OpenTelemetryTargetKey(),
153 parent_->scope_config_->filtered_target()},
154 {OpenTelemetryStatusKey(),
155 grpc_status_code_to_string(
156 static_cast<grpc_status_code>(status.code()))}}};
157 KeyValueIterable labels(
158 injected_labels_from_plugin_options_, additional_labels,
159 &parent_->scope_config_->active_plugin_options_view(), optional_labels_,
160 /*is_client=*/true, parent_->otel_plugin_);
161 if (parent_->otel_plugin_->client_.attempt.duration != nullptr) {
162 parent_->otel_plugin_->client_.attempt.duration->Record(
163 absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
164 opentelemetry::context::Context{});
165 }
166 if (parent_->otel_plugin_->client_.attempt
167 .sent_total_compressed_message_size != nullptr) {
168 parent_->otel_plugin_->client_.attempt.sent_total_compressed_message_size
169 ->Record(transport_stream_stats != nullptr
170 ? transport_stream_stats->outgoing.data_bytes
171 : 0,
172 labels, opentelemetry::context::Context{});
173 }
174 if (parent_->otel_plugin_->client_.attempt
175 .rcvd_total_compressed_message_size != nullptr) {
176 parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size
177 ->Record(transport_stream_stats != nullptr
178 ? transport_stream_stats->incoming.data_bytes
179 : 0,
180 labels, opentelemetry::context::Context{});
181 }
182 }
183
RecordCancel(absl::Status)184 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordCancel(
185 absl::Status /*cancel_error*/) {}
186
RecordEnd(const gpr_timespec &)187 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordEnd(
188 const gpr_timespec& /*latency*/) {
189 if (arena_allocated_) {
190 this->~CallAttemptTracer();
191 } else {
192 delete this;
193 }
194 }
195
RecordAnnotation(absl::string_view)196 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation(
197 absl::string_view /*annotation*/) {
198 // Not implemented
199 }
200
RecordAnnotation(const Annotation &)201 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation(
202 const Annotation& /*annotation*/) {
203 // Not implemented
204 }
205
206 std::shared_ptr<grpc_core::TcpTracerInterface>
StartNewTcpTrace()207 OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() {
208 // No TCP trace.
209 return nullptr;
210 }
211
SetOptionalLabel(OptionalLabelKey key,grpc_core::RefCountedStringValue value)212 void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::SetOptionalLabel(
213 OptionalLabelKey key, grpc_core::RefCountedStringValue value) {
214 GPR_ASSERT(key < OptionalLabelKey::kSize);
215 optional_labels_[static_cast<size_t>(key)] = std::move(value);
216 }
217
218 //
219 // OpenTelemetryPlugin::ClientCallTracer
220 //
221
ClientCallTracer(const grpc_core::Slice & path,grpc_core::Arena * arena,bool registered_method,OpenTelemetryPlugin * otel_plugin,std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config)222 OpenTelemetryPlugin::ClientCallTracer::ClientCallTracer(
223 const grpc_core::Slice& path, grpc_core::Arena* arena,
224 bool registered_method, OpenTelemetryPlugin* otel_plugin,
225 std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config)
226 : path_(path.Ref()),
227 arena_(arena),
228 registered_method_(registered_method),
229 otel_plugin_(otel_plugin),
230 scope_config_(std::move(scope_config)) {}
231
~ClientCallTracer()232 OpenTelemetryPlugin::ClientCallTracer::~ClientCallTracer() {}
233
234 OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer*
StartNewAttempt(bool is_transparent_retry)235 OpenTelemetryPlugin::ClientCallTracer::StartNewAttempt(
236 bool is_transparent_retry) {
237 // We allocate the first attempt on the arena and all subsequent attempts
238 // on the heap, so that in the common case we don't require a heap
239 // allocation, nor do we unnecessarily grow the arena.
240 bool is_first_attempt = true;
241 {
242 grpc_core::MutexLock lock(&mu_);
243 if (transparent_retries_ != 0 || retries_ != 0) {
244 is_first_attempt = false;
245 }
246 if (is_transparent_retry) {
247 ++transparent_retries_;
248 } else {
249 ++retries_;
250 }
251 }
252 if (is_first_attempt) {
253 return arena_->New<CallAttemptTracer>(this, /*arena_allocated=*/true);
254 }
255 return new CallAttemptTracer(this, /*arena_allocated=*/false);
256 }
257
MethodForStats() const258 absl::string_view OpenTelemetryPlugin::ClientCallTracer::MethodForStats()
259 const {
260 absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
261 if (registered_method_ ||
262 (otel_plugin_->generic_method_attribute_filter() != nullptr &&
263 otel_plugin_->generic_method_attribute_filter()(method))) {
264 return method;
265 }
266 return "other";
267 }
268
RecordAnnotation(absl::string_view)269 void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation(
270 absl::string_view /*annotation*/) {
271 // Not implemented
272 }
273
RecordAnnotation(const Annotation &)274 void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation(
275 const Annotation& /*annotation*/) {
276 // Not implemented
277 }
278
279 } // namespace internal
280 } // namespace grpc
281