1 //
2 //
3 // Copyright 2023 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/ext/otel/otel_server_call_tracer.h"
22
23 #include <array>
24 #include <memory>
25 #include <string>
26 #include <utility>
27
28 #include "absl/functional/any_invocable.h"
29 #include "absl/strings/str_format.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/time/clock.h"
32 #include "absl/time/time.h"
33 #include "absl/types/optional.h"
34 #include "absl/types/span.h"
35 #include "opentelemetry/context/context.h"
36 #include "opentelemetry/metrics/sync_instruments.h"
37
38 #include "src/core/lib/channel/channel_stack.h"
39 #include "src/core/lib/channel/status_util.h"
40 #include "src/core/lib/channel/tcp_tracer.h"
41 #include "src/core/lib/iomgr/error.h"
42 #include "src/core/lib/slice/slice.h"
43 #include "src/core/lib/slice/slice_buffer.h"
44 #include "src/core/lib/transport/metadata_batch.h"
45 #include "src/core/lib/transport/transport.h"
46 #include "src/cpp/ext/otel/key_value_iterable.h"
47 #include "src/cpp/ext/otel/otel_plugin.h"
48
49 namespace grpc {
50 namespace internal {
51
RecordReceivedInitialMetadata(grpc_metadata_batch * recv_initial_metadata)52 void OpenTelemetryPlugin::ServerCallTracer::RecordReceivedInitialMetadata(
53 grpc_metadata_batch* recv_initial_metadata) {
54 path_ =
55 recv_initial_metadata->get_pointer(grpc_core::HttpPathMetadata())->Ref();
56 scope_config_->active_plugin_options_view().ForEach(
57 [&](const InternalOpenTelemetryPluginOption& plugin_option,
58 size_t index) {
59 auto* labels_injector = plugin_option.labels_injector();
60 if (labels_injector != nullptr) {
61 injected_labels_from_plugin_options_[index] =
62 labels_injector->GetLabels(recv_initial_metadata);
63 }
64 return true;
65 },
66 otel_plugin_);
67 registered_method_ =
68 recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod())
69 .value_or(nullptr) != nullptr;
70 std::array<std::pair<absl::string_view, absl::string_view>, 1>
71 additional_labels = {{{OpenTelemetryMethodKey(), MethodForStats()}}};
72 if (otel_plugin_->server_.call.started != nullptr) {
73 // We might not have all the injected labels that we want at this point, so
74 // avoid recording a subset of injected labels here.
75 otel_plugin_->server_.call.started->Add(
76 1, KeyValueIterable(/*injected_labels_from_plugin_options=*/{},
77 additional_labels,
78 /*active_plugin_options_view=*/nullptr, {},
79 /*is_client=*/false, otel_plugin_));
80 }
81 }
82
RecordSendInitialMetadata(grpc_metadata_batch * send_initial_metadata)83 void OpenTelemetryPlugin::ServerCallTracer::RecordSendInitialMetadata(
84 grpc_metadata_batch* send_initial_metadata) {
85 scope_config_->active_plugin_options_view().ForEach(
86 [&](const InternalOpenTelemetryPluginOption& plugin_option,
87 size_t index) {
88 auto* labels_injector = plugin_option.labels_injector();
89 if (labels_injector != nullptr) {
90 labels_injector->AddLabels(
91 send_initial_metadata,
92 injected_labels_from_plugin_options_[index].get());
93 }
94 return true;
95 },
96 otel_plugin_);
97 }
98
RecordSendTrailingMetadata(grpc_metadata_batch *)99 void OpenTelemetryPlugin::ServerCallTracer::RecordSendTrailingMetadata(
100 grpc_metadata_batch* /*send_trailing_metadata*/) {
101 // We need to record the time when the trailing metadata was sent to
102 // mark the completeness of the request.
103 elapsed_time_ = absl::Now() - start_time_;
104 }
105
RecordEnd(const grpc_call_final_info * final_info)106 void OpenTelemetryPlugin::ServerCallTracer::RecordEnd(
107 const grpc_call_final_info* final_info) {
108 std::array<std::pair<absl::string_view, absl::string_view>, 2>
109 additional_labels = {
110 {{OpenTelemetryMethodKey(), MethodForStats()},
111 {OpenTelemetryStatusKey(),
112 grpc_status_code_to_string(final_info->final_status)}}};
113 // Currently we do not have any optional labels on the server side.
114 KeyValueIterable labels(
115 injected_labels_from_plugin_options_, additional_labels,
116 /*active_plugin_options_view=*/nullptr, /*optional_labels=*/{},
117 /*is_client=*/false, otel_plugin_);
118 if (otel_plugin_->server_.call.duration != nullptr) {
119 otel_plugin_->server_.call.duration->Record(
120 absl::ToDoubleSeconds(elapsed_time_), labels,
121 opentelemetry::context::Context{});
122 }
123 if (otel_plugin_->server_.call.sent_total_compressed_message_size !=
124 nullptr) {
125 otel_plugin_->server_.call.sent_total_compressed_message_size->Record(
126 final_info->stats.transport_stream_stats.outgoing.data_bytes, labels,
127 opentelemetry::context::Context{});
128 }
129 if (otel_plugin_->server_.call.rcvd_total_compressed_message_size !=
130 nullptr) {
131 otel_plugin_->server_.call.rcvd_total_compressed_message_size->Record(
132 final_info->stats.transport_stream_stats.incoming.data_bytes, labels,
133 opentelemetry::context::Context{});
134 }
135 }
136
137 } // namespace internal
138 } // namespace grpc
139