1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/channel/call_tracer.h"
22
23 #include <memory>
24 #include <utility>
25 #include <vector>
26
27 #include <grpc/support/log.h>
28
29 #include "src/core/lib/channel/tcp_tracer.h"
30 #include "src/core/lib/promise/context.h"
31
32 namespace grpc_core {
33
34 //
35 // ServerCallTracerFactory
36 //
37
38 namespace {
39
40 ServerCallTracerFactory* g_server_call_tracer_factory_ = nullptr;
41
42 const char* kServerCallTracerFactoryChannelArgName =
43 "grpc.experimental.server_call_tracer_factory";
44 } // namespace
45
Get(const ChannelArgs & channel_args)46 ServerCallTracerFactory* ServerCallTracerFactory::Get(
47 const ChannelArgs& channel_args) {
48 ServerCallTracerFactory* factory =
49 channel_args.GetObject<ServerCallTracerFactory>();
50 if (factory == nullptr) {
51 factory = g_server_call_tracer_factory_;
52 }
53 if (factory && factory->IsServerTraced(channel_args)) {
54 return factory;
55 }
56 return nullptr;
57 }
58
RegisterGlobal(ServerCallTracerFactory * factory)59 void ServerCallTracerFactory::RegisterGlobal(ServerCallTracerFactory* factory) {
60 g_server_call_tracer_factory_ = factory;
61 }
62
TestOnlyReset()63 void ServerCallTracerFactory::TestOnlyReset() {
64 delete g_server_call_tracer_factory_;
65 g_server_call_tracer_factory_ = nullptr;
66 }
67
ChannelArgName()68 absl::string_view ServerCallTracerFactory::ChannelArgName() {
69 return kServerCallTracerFactoryChannelArgName;
70 }
71
72 class DelegatingClientCallTracer : public ClientCallTracer {
73 public:
74 class DelegatingClientCallAttemptTracer
75 : public ClientCallTracer::CallAttemptTracer {
76 public:
DelegatingClientCallAttemptTracer(std::vector<CallAttemptTracer * > tracers)77 explicit DelegatingClientCallAttemptTracer(
78 std::vector<CallAttemptTracer*> tracers)
79 : tracers_(std::move(tracers)) {
80 GPR_DEBUG_ASSERT(!tracers_.empty());
81 }
~DelegatingClientCallAttemptTracer()82 ~DelegatingClientCallAttemptTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)83 void RecordSendInitialMetadata(
84 grpc_metadata_batch* send_initial_metadata) override {
85 for (auto* tracer : tracers_) {
86 tracer->RecordSendInitialMetadata(send_initial_metadata);
87 }
88 }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)89 void RecordSendTrailingMetadata(
90 grpc_metadata_batch* send_trailing_metadata) override {
91 for (auto* tracer : tracers_) {
92 tracer->RecordSendTrailingMetadata(send_trailing_metadata);
93 }
94 }
RecordSendMessage(const SliceBuffer & send_message)95 void RecordSendMessage(const SliceBuffer& send_message) override {
96 for (auto* tracer : tracers_) {
97 tracer->RecordSendMessage(send_message);
98 }
99 }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)100 void RecordSendCompressedMessage(
101 const SliceBuffer& send_compressed_message) override {
102 for (auto* tracer : tracers_) {
103 tracer->RecordSendCompressedMessage(send_compressed_message);
104 }
105 }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)106 void RecordReceivedInitialMetadata(
107 grpc_metadata_batch* recv_initial_metadata) override {
108 for (auto* tracer : tracers_) {
109 tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
110 }
111 }
RecordReceivedMessage(const SliceBuffer & recv_message)112 void RecordReceivedMessage(const SliceBuffer& recv_message) override {
113 for (auto* tracer : tracers_) {
114 tracer->RecordReceivedMessage(recv_message);
115 }
116 }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)117 void RecordReceivedDecompressedMessage(
118 const SliceBuffer& recv_decompressed_message) override {
119 for (auto* tracer : tracers_) {
120 tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
121 }
122 }
RecordCancel(grpc_error_handle cancel_error)123 void RecordCancel(grpc_error_handle cancel_error) override {
124 for (auto* tracer : tracers_) {
125 tracer->RecordCancel(cancel_error);
126 }
127 }
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)128 void RecordReceivedTrailingMetadata(
129 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
130 const grpc_transport_stream_stats* transport_stream_stats) override {
131 for (auto* tracer : tracers_) {
132 tracer->RecordReceivedTrailingMetadata(status, recv_trailing_metadata,
133 transport_stream_stats);
134 }
135 }
RecordEnd(const gpr_timespec & latency)136 void RecordEnd(const gpr_timespec& latency) override {
137 for (auto* tracer : tracers_) {
138 tracer->RecordEnd(latency);
139 }
140 }
RecordAnnotation(absl::string_view annotation)141 void RecordAnnotation(absl::string_view annotation) override {
142 for (auto* tracer : tracers_) {
143 tracer->RecordAnnotation(annotation);
144 }
145 }
RecordAnnotation(const Annotation & annotation)146 void RecordAnnotation(const Annotation& annotation) override {
147 for (auto* tracer : tracers_) {
148 tracer->RecordAnnotation(annotation);
149 }
150 }
StartNewTcpTrace()151 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
152 return nullptr;
153 }
SetOptionalLabel(OptionalLabelKey key,RefCountedStringValue value)154 void SetOptionalLabel(OptionalLabelKey key,
155 RefCountedStringValue value) override {
156 for (auto* tracer : tracers_) {
157 tracer->SetOptionalLabel(key, value);
158 }
159 }
TraceId()160 std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()161 std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()162 bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()163 bool IsDelegatingTracer() override { return true; }
164
165 private:
166 // There is no additional synchronization needed since filters/interceptors
167 // will be adding call tracers to the context and these are already
168 // synchronized through promises/call combiners (single promise running per
169 // call at any moment).
170 std::vector<CallAttemptTracer*> tracers_;
171 };
DelegatingClientCallTracer(ClientCallTracer * tracer)172 explicit DelegatingClientCallTracer(ClientCallTracer* tracer)
173 : tracers_{tracer} {}
~DelegatingClientCallTracer()174 ~DelegatingClientCallTracer() override {}
StartNewAttempt(bool is_transparent_retry)175 CallAttemptTracer* StartNewAttempt(bool is_transparent_retry) override {
176 std::vector<CallAttemptTracer*> attempt_tracers;
177 attempt_tracers.reserve(tracers_.size());
178 for (auto* tracer : tracers_) {
179 auto* attempt_tracer = tracer->StartNewAttempt(is_transparent_retry);
180 GPR_DEBUG_ASSERT(attempt_tracer != nullptr);
181 attempt_tracers.push_back(attempt_tracer);
182 }
183 return GetContext<Arena>()->ManagedNew<DelegatingClientCallAttemptTracer>(
184 std::move(attempt_tracers));
185 }
186
RecordAnnotation(absl::string_view annotation)187 void RecordAnnotation(absl::string_view annotation) override {
188 for (auto* tracer : tracers_) {
189 tracer->RecordAnnotation(annotation);
190 }
191 }
RecordAnnotation(const Annotation & annotation)192 void RecordAnnotation(const Annotation& annotation) override {
193 for (auto* tracer : tracers_) {
194 tracer->RecordAnnotation(annotation);
195 }
196 }
TraceId()197 std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()198 std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()199 bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()200 bool IsDelegatingTracer() override { return true; }
201
202 // There is no additional synchronization needed since filters/interceptors
203 // will be adding call tracers to the context and these are already
204 // synchronized through promises/call combiners (single promise running per
205 // call at any moment).
AddTracer(ClientCallTracer * tracer)206 void AddTracer(ClientCallTracer* tracer) { tracers_.push_back(tracer); }
207
208 private:
209 std::vector<ClientCallTracer*> tracers_;
210 };
211
212 class DelegatingServerCallTracer : public ServerCallTracer {
213 public:
DelegatingServerCallTracer(ServerCallTracer * tracer)214 explicit DelegatingServerCallTracer(ServerCallTracer* tracer)
215 : tracers_{tracer} {}
~DelegatingServerCallTracer()216 ~DelegatingServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)217 void RecordSendInitialMetadata(
218 grpc_metadata_batch* send_initial_metadata) override {
219 for (auto* tracer : tracers_) {
220 tracer->RecordSendInitialMetadata(send_initial_metadata);
221 }
222 }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)223 void RecordSendTrailingMetadata(
224 grpc_metadata_batch* send_trailing_metadata) override {
225 for (auto* tracer : tracers_) {
226 tracer->RecordSendTrailingMetadata(send_trailing_metadata);
227 }
228 }
RecordSendMessage(const SliceBuffer & send_message)229 void RecordSendMessage(const SliceBuffer& send_message) override {
230 for (auto* tracer : tracers_) {
231 tracer->RecordSendMessage(send_message);
232 }
233 }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)234 void RecordSendCompressedMessage(
235 const SliceBuffer& send_compressed_message) override {
236 for (auto* tracer : tracers_) {
237 tracer->RecordSendCompressedMessage(send_compressed_message);
238 }
239 }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)240 void RecordReceivedInitialMetadata(
241 grpc_metadata_batch* recv_initial_metadata) override {
242 for (auto* tracer : tracers_) {
243 tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
244 }
245 }
RecordReceivedMessage(const SliceBuffer & recv_message)246 void RecordReceivedMessage(const SliceBuffer& recv_message) override {
247 for (auto* tracer : tracers_) {
248 tracer->RecordReceivedMessage(recv_message);
249 }
250 }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)251 void RecordReceivedDecompressedMessage(
252 const SliceBuffer& recv_decompressed_message) override {
253 for (auto* tracer : tracers_) {
254 tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
255 }
256 }
RecordCancel(grpc_error_handle cancel_error)257 void RecordCancel(grpc_error_handle cancel_error) override {
258 for (auto* tracer : tracers_) {
259 tracer->RecordCancel(cancel_error);
260 }
261 }
RecordReceivedTrailingMetadata(grpc_metadata_batch * recv_trailing_metadata)262 void RecordReceivedTrailingMetadata(
263 grpc_metadata_batch* recv_trailing_metadata) override {
264 for (auto* tracer : tracers_) {
265 tracer->RecordReceivedTrailingMetadata(recv_trailing_metadata);
266 }
267 }
RecordEnd(const grpc_call_final_info * final_info)268 void RecordEnd(const grpc_call_final_info* final_info) override {
269 for (auto* tracer : tracers_) {
270 tracer->RecordEnd(final_info);
271 }
272 }
RecordAnnotation(absl::string_view annotation)273 void RecordAnnotation(absl::string_view annotation) override {
274 for (auto* tracer : tracers_) {
275 tracer->RecordAnnotation(annotation);
276 }
277 }
RecordAnnotation(const Annotation & annotation)278 void RecordAnnotation(const Annotation& annotation) override {
279 for (auto* tracer : tracers_) {
280 tracer->RecordAnnotation(annotation);
281 }
282 }
StartNewTcpTrace()283 std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
284 return nullptr;
285 }
TraceId()286 std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()287 std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()288 bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()289 bool IsDelegatingTracer() override { return true; }
290
AddTracer(ServerCallTracer * tracer)291 void AddTracer(ServerCallTracer* tracer) { tracers_.push_back(tracer); }
292
293 private:
294 // The ServerCallTracerFilter will be responsible for making sure that the
295 // tracers are added in a thread-safe manner. It is imagined that the filter
296 // will just invoke the factories in the server call tracer factory list
297 // sequentially, removing the need for any synchronization.
298 std::vector<ServerCallTracer*> tracers_;
299 };
300
AddClientCallTracerToContext(grpc_call_context_element * call_context,ClientCallTracer * tracer)301 void AddClientCallTracerToContext(grpc_call_context_element* call_context,
302 ClientCallTracer* tracer) {
303 if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
304 nullptr) {
305 // This is the first call tracer. Set it directly.
306 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
307 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
308 nullptr;
309 } else {
310 // There was already a call tracer present.
311 auto* orig_tracer = static_cast<ClientCallTracer*>(
312 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
313 if (orig_tracer->IsDelegatingTracer()) {
314 // We already created a delegating tracer. Just add the new tracer to the
315 // list.
316 static_cast<DelegatingClientCallTracer*>(orig_tracer)->AddTracer(tracer);
317 } else {
318 // Create a new delegating tracer and add the first tracer and the new
319 // tracer to the list.
320 auto* delegating_tracer =
321 GetContext<Arena>()->ManagedNew<DelegatingClientCallTracer>(
322 orig_tracer);
323 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
324 delegating_tracer;
325 delegating_tracer->AddTracer(tracer);
326 }
327 }
328 }
329
AddServerCallTracerToContext(grpc_call_context_element * call_context,ServerCallTracer * tracer)330 void AddServerCallTracerToContext(grpc_call_context_element* call_context,
331 ServerCallTracer* tracer) {
332 GPR_DEBUG_ASSERT(
333 call_context[GRPC_CONTEXT_CALL_TRACER].value ==
334 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
335 if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
336 nullptr) {
337 // This is the first call tracer. Set it directly.
338 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
339 call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
340 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
341 nullptr;
342 } else {
343 // There was already a call tracer present.
344 auto* orig_tracer = static_cast<ServerCallTracer*>(
345 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
346 if (orig_tracer->IsDelegatingTracer()) {
347 // We already created a delegating tracer. Just add the new tracer to the
348 // list.
349 static_cast<DelegatingServerCallTracer*>(orig_tracer)->AddTracer(tracer);
350 } else {
351 // Create a new delegating tracer and add the first tracer and the new
352 // tracer to the list.
353 auto* delegating_tracer =
354 GetContext<Arena>()->ManagedNew<DelegatingServerCallTracer>(
355 orig_tracer);
356 call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
357 delegating_tracer;
358 call_context[GRPC_CONTEXT_CALL_TRACER].value = delegating_tracer;
359 delegating_tracer->AddTracer(tracer);
360 }
361 }
362 }
363
364 } // namespace grpc_core
365