xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/channel/call_tracer.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/channel/call_tracer.h"
22 
23 #include <memory>
24 #include <utility>
25 #include <vector>
26 
27 #include <grpc/support/log.h>
28 
29 #include "src/core/lib/channel/tcp_tracer.h"
30 #include "src/core/lib/promise/context.h"
31 
32 namespace grpc_core {
33 
34 //
35 // ServerCallTracerFactory
36 //
37 
38 namespace {
39 
40 ServerCallTracerFactory* g_server_call_tracer_factory_ = nullptr;
41 
42 const char* kServerCallTracerFactoryChannelArgName =
43     "grpc.experimental.server_call_tracer_factory";
44 }  // namespace
45 
Get(const ChannelArgs & channel_args)46 ServerCallTracerFactory* ServerCallTracerFactory::Get(
47     const ChannelArgs& channel_args) {
48   ServerCallTracerFactory* factory =
49       channel_args.GetObject<ServerCallTracerFactory>();
50   if (factory == nullptr) {
51     factory = g_server_call_tracer_factory_;
52   }
53   if (factory && factory->IsServerTraced(channel_args)) {
54     return factory;
55   }
56   return nullptr;
57 }
58 
RegisterGlobal(ServerCallTracerFactory * factory)59 void ServerCallTracerFactory::RegisterGlobal(ServerCallTracerFactory* factory) {
60   g_server_call_tracer_factory_ = factory;
61 }
62 
TestOnlyReset()63 void ServerCallTracerFactory::TestOnlyReset() {
64   delete g_server_call_tracer_factory_;
65   g_server_call_tracer_factory_ = nullptr;
66 }
67 
ChannelArgName()68 absl::string_view ServerCallTracerFactory::ChannelArgName() {
69   return kServerCallTracerFactoryChannelArgName;
70 }
71 
72 class DelegatingClientCallTracer : public ClientCallTracer {
73  public:
74   class DelegatingClientCallAttemptTracer
75       : public ClientCallTracer::CallAttemptTracer {
76    public:
DelegatingClientCallAttemptTracer(std::vector<CallAttemptTracer * > tracers)77     explicit DelegatingClientCallAttemptTracer(
78         std::vector<CallAttemptTracer*> tracers)
79         : tracers_(std::move(tracers)) {
80       GPR_DEBUG_ASSERT(!tracers_.empty());
81     }
~DelegatingClientCallAttemptTracer()82     ~DelegatingClientCallAttemptTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)83     void RecordSendInitialMetadata(
84         grpc_metadata_batch* send_initial_metadata) override {
85       for (auto* tracer : tracers_) {
86         tracer->RecordSendInitialMetadata(send_initial_metadata);
87       }
88     }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)89     void RecordSendTrailingMetadata(
90         grpc_metadata_batch* send_trailing_metadata) override {
91       for (auto* tracer : tracers_) {
92         tracer->RecordSendTrailingMetadata(send_trailing_metadata);
93       }
94     }
RecordSendMessage(const SliceBuffer & send_message)95     void RecordSendMessage(const SliceBuffer& send_message) override {
96       for (auto* tracer : tracers_) {
97         tracer->RecordSendMessage(send_message);
98       }
99     }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)100     void RecordSendCompressedMessage(
101         const SliceBuffer& send_compressed_message) override {
102       for (auto* tracer : tracers_) {
103         tracer->RecordSendCompressedMessage(send_compressed_message);
104       }
105     }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)106     void RecordReceivedInitialMetadata(
107         grpc_metadata_batch* recv_initial_metadata) override {
108       for (auto* tracer : tracers_) {
109         tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
110       }
111     }
RecordReceivedMessage(const SliceBuffer & recv_message)112     void RecordReceivedMessage(const SliceBuffer& recv_message) override {
113       for (auto* tracer : tracers_) {
114         tracer->RecordReceivedMessage(recv_message);
115       }
116     }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)117     void RecordReceivedDecompressedMessage(
118         const SliceBuffer& recv_decompressed_message) override {
119       for (auto* tracer : tracers_) {
120         tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
121       }
122     }
RecordCancel(grpc_error_handle cancel_error)123     void RecordCancel(grpc_error_handle cancel_error) override {
124       for (auto* tracer : tracers_) {
125         tracer->RecordCancel(cancel_error);
126       }
127     }
RecordReceivedTrailingMetadata(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,const grpc_transport_stream_stats * transport_stream_stats)128     void RecordReceivedTrailingMetadata(
129         absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
130         const grpc_transport_stream_stats* transport_stream_stats) override {
131       for (auto* tracer : tracers_) {
132         tracer->RecordReceivedTrailingMetadata(status, recv_trailing_metadata,
133                                                transport_stream_stats);
134       }
135     }
RecordEnd(const gpr_timespec & latency)136     void RecordEnd(const gpr_timespec& latency) override {
137       for (auto* tracer : tracers_) {
138         tracer->RecordEnd(latency);
139       }
140     }
RecordAnnotation(absl::string_view annotation)141     void RecordAnnotation(absl::string_view annotation) override {
142       for (auto* tracer : tracers_) {
143         tracer->RecordAnnotation(annotation);
144       }
145     }
RecordAnnotation(const Annotation & annotation)146     void RecordAnnotation(const Annotation& annotation) override {
147       for (auto* tracer : tracers_) {
148         tracer->RecordAnnotation(annotation);
149       }
150     }
StartNewTcpTrace()151     std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
152       return nullptr;
153     }
SetOptionalLabel(OptionalLabelKey key,RefCountedStringValue value)154     void SetOptionalLabel(OptionalLabelKey key,
155                           RefCountedStringValue value) override {
156       for (auto* tracer : tracers_) {
157         tracer->SetOptionalLabel(key, value);
158       }
159     }
TraceId()160     std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()161     std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()162     bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()163     bool IsDelegatingTracer() override { return true; }
164 
165    private:
166     // There is no additional synchronization needed since filters/interceptors
167     // will be adding call tracers to the context and these are already
168     // synchronized through promises/call combiners (single promise running per
169     // call at any moment).
170     std::vector<CallAttemptTracer*> tracers_;
171   };
DelegatingClientCallTracer(ClientCallTracer * tracer)172   explicit DelegatingClientCallTracer(ClientCallTracer* tracer)
173       : tracers_{tracer} {}
~DelegatingClientCallTracer()174   ~DelegatingClientCallTracer() override {}
StartNewAttempt(bool is_transparent_retry)175   CallAttemptTracer* StartNewAttempt(bool is_transparent_retry) override {
176     std::vector<CallAttemptTracer*> attempt_tracers;
177     attempt_tracers.reserve(tracers_.size());
178     for (auto* tracer : tracers_) {
179       auto* attempt_tracer = tracer->StartNewAttempt(is_transparent_retry);
180       GPR_DEBUG_ASSERT(attempt_tracer != nullptr);
181       attempt_tracers.push_back(attempt_tracer);
182     }
183     return GetContext<Arena>()->ManagedNew<DelegatingClientCallAttemptTracer>(
184         std::move(attempt_tracers));
185   }
186 
RecordAnnotation(absl::string_view annotation)187   void RecordAnnotation(absl::string_view annotation) override {
188     for (auto* tracer : tracers_) {
189       tracer->RecordAnnotation(annotation);
190     }
191   }
RecordAnnotation(const Annotation & annotation)192   void RecordAnnotation(const Annotation& annotation) override {
193     for (auto* tracer : tracers_) {
194       tracer->RecordAnnotation(annotation);
195     }
196   }
TraceId()197   std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()198   std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()199   bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()200   bool IsDelegatingTracer() override { return true; }
201 
202   // There is no additional synchronization needed since filters/interceptors
203   // will be adding call tracers to the context and these are already
204   // synchronized through promises/call combiners (single promise running per
205   // call at any moment).
AddTracer(ClientCallTracer * tracer)206   void AddTracer(ClientCallTracer* tracer) { tracers_.push_back(tracer); }
207 
208  private:
209   std::vector<ClientCallTracer*> tracers_;
210 };
211 
212 class DelegatingServerCallTracer : public ServerCallTracer {
213  public:
DelegatingServerCallTracer(ServerCallTracer * tracer)214   explicit DelegatingServerCallTracer(ServerCallTracer* tracer)
215       : tracers_{tracer} {}
~DelegatingServerCallTracer()216   ~DelegatingServerCallTracer() override {}
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)217   void RecordSendInitialMetadata(
218       grpc_metadata_batch* send_initial_metadata) override {
219     for (auto* tracer : tracers_) {
220       tracer->RecordSendInitialMetadata(send_initial_metadata);
221     }
222   }
RecordSendTrailingMetadata(grpc_metadata_batch * send_trailing_metadata)223   void RecordSendTrailingMetadata(
224       grpc_metadata_batch* send_trailing_metadata) override {
225     for (auto* tracer : tracers_) {
226       tracer->RecordSendTrailingMetadata(send_trailing_metadata);
227     }
228   }
RecordSendMessage(const SliceBuffer & send_message)229   void RecordSendMessage(const SliceBuffer& send_message) override {
230     for (auto* tracer : tracers_) {
231       tracer->RecordSendMessage(send_message);
232     }
233   }
RecordSendCompressedMessage(const SliceBuffer & send_compressed_message)234   void RecordSendCompressedMessage(
235       const SliceBuffer& send_compressed_message) override {
236     for (auto* tracer : tracers_) {
237       tracer->RecordSendCompressedMessage(send_compressed_message);
238     }
239   }
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)240   void RecordReceivedInitialMetadata(
241       grpc_metadata_batch* recv_initial_metadata) override {
242     for (auto* tracer : tracers_) {
243       tracer->RecordReceivedInitialMetadata(recv_initial_metadata);
244     }
245   }
RecordReceivedMessage(const SliceBuffer & recv_message)246   void RecordReceivedMessage(const SliceBuffer& recv_message) override {
247     for (auto* tracer : tracers_) {
248       tracer->RecordReceivedMessage(recv_message);
249     }
250   }
RecordReceivedDecompressedMessage(const SliceBuffer & recv_decompressed_message)251   void RecordReceivedDecompressedMessage(
252       const SliceBuffer& recv_decompressed_message) override {
253     for (auto* tracer : tracers_) {
254       tracer->RecordReceivedDecompressedMessage(recv_decompressed_message);
255     }
256   }
RecordCancel(grpc_error_handle cancel_error)257   void RecordCancel(grpc_error_handle cancel_error) override {
258     for (auto* tracer : tracers_) {
259       tracer->RecordCancel(cancel_error);
260     }
261   }
RecordReceivedTrailingMetadata(grpc_metadata_batch * recv_trailing_metadata)262   void RecordReceivedTrailingMetadata(
263       grpc_metadata_batch* recv_trailing_metadata) override {
264     for (auto* tracer : tracers_) {
265       tracer->RecordReceivedTrailingMetadata(recv_trailing_metadata);
266     }
267   }
RecordEnd(const grpc_call_final_info * final_info)268   void RecordEnd(const grpc_call_final_info* final_info) override {
269     for (auto* tracer : tracers_) {
270       tracer->RecordEnd(final_info);
271     }
272   }
RecordAnnotation(absl::string_view annotation)273   void RecordAnnotation(absl::string_view annotation) override {
274     for (auto* tracer : tracers_) {
275       tracer->RecordAnnotation(annotation);
276     }
277   }
RecordAnnotation(const Annotation & annotation)278   void RecordAnnotation(const Annotation& annotation) override {
279     for (auto* tracer : tracers_) {
280       tracer->RecordAnnotation(annotation);
281     }
282   }
StartNewTcpTrace()283   std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
284     return nullptr;
285   }
TraceId()286   std::string TraceId() override { return tracers_[0]->TraceId(); }
SpanId()287   std::string SpanId() override { return tracers_[0]->SpanId(); }
IsSampled()288   bool IsSampled() override { return tracers_[0]->IsSampled(); }
IsDelegatingTracer()289   bool IsDelegatingTracer() override { return true; }
290 
AddTracer(ServerCallTracer * tracer)291   void AddTracer(ServerCallTracer* tracer) { tracers_.push_back(tracer); }
292 
293  private:
294   // The ServerCallTracerFilter will be responsible for making sure that the
295   // tracers are added in a thread-safe manner. It is imagined that the filter
296   // will just invoke the factories in the server call tracer factory list
297   // sequentially, removing the need for any synchronization.
298   std::vector<ServerCallTracer*> tracers_;
299 };
300 
AddClientCallTracerToContext(grpc_call_context_element * call_context,ClientCallTracer * tracer)301 void AddClientCallTracerToContext(grpc_call_context_element* call_context,
302                                   ClientCallTracer* tracer) {
303   if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
304       nullptr) {
305     // This is the first call tracer. Set it directly.
306     call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
307     call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
308         nullptr;
309   } else {
310     // There was already a call tracer present.
311     auto* orig_tracer = static_cast<ClientCallTracer*>(
312         call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
313     if (orig_tracer->IsDelegatingTracer()) {
314       // We already created a delegating tracer. Just add the new tracer to the
315       // list.
316       static_cast<DelegatingClientCallTracer*>(orig_tracer)->AddTracer(tracer);
317     } else {
318       // Create a new delegating tracer and add the first tracer and the new
319       // tracer to the list.
320       auto* delegating_tracer =
321           GetContext<Arena>()->ManagedNew<DelegatingClientCallTracer>(
322               orig_tracer);
323       call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
324           delegating_tracer;
325       delegating_tracer->AddTracer(tracer);
326     }
327   }
328 }
329 
AddServerCallTracerToContext(grpc_call_context_element * call_context,ServerCallTracer * tracer)330 void AddServerCallTracerToContext(grpc_call_context_element* call_context,
331                                   ServerCallTracer* tracer) {
332   GPR_DEBUG_ASSERT(
333       call_context[GRPC_CONTEXT_CALL_TRACER].value ==
334       call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
335   if (call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
336       nullptr) {
337     // This is the first call tracer. Set it directly.
338     call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
339     call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
340     call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
341         nullptr;
342   } else {
343     // There was already a call tracer present.
344     auto* orig_tracer = static_cast<ServerCallTracer*>(
345         call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
346     if (orig_tracer->IsDelegatingTracer()) {
347       // We already created a delegating tracer. Just add the new tracer to the
348       // list.
349       static_cast<DelegatingServerCallTracer*>(orig_tracer)->AddTracer(tracer);
350     } else {
351       // Create a new delegating tracer and add the first tracer and the new
352       // tracer to the list.
353       auto* delegating_tracer =
354           GetContext<Arena>()->ManagedNew<DelegatingServerCallTracer>(
355               orig_tracer);
356       call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
357           delegating_tracer;
358       call_context[GRPC_CONTEXT_CALL_TRACER].value = delegating_tracer;
359       delegating_tracer->AddTracer(tracer);
360     }
361   }
362 }
363 
364 }  // namespace grpc_core
365