// // Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include #include #include #include #include #include #include #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "absl/time/clock.h" #include "absl/time/time.h" #include "absl/types/optional.h" #include "google/api/monitored_resource.pb.h" #include "google/devtools/cloudtrace/v2/tracing.grpc.pb.h" #include "google/monitoring/v3/metric_service.grpc.pb.h" #include "opencensus/exporters/stats/stackdriver/stackdriver_exporter.h" #include "opencensus/exporters/trace/stackdriver/stackdriver_exporter.h" #include "opencensus/stats/stats.h" #include "opencensus/trace/sampler.h" #include "opencensus/trace/trace_config.h" #include #include #include #include #include #include #include "src/core/ext/filters/logging/logging_filter.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/notification.h" #include "src/cpp/client/client_stats_interceptor.h" #include "src/cpp/ext/filters/census/client_filter.h" #include "src/cpp/ext/filters/census/grpc_plugin.h" #include "src/cpp/ext/filters/census/open_census_call_tracer.h" #include "src/cpp/ext/gcp/environment_autodetect.h" #include "src/cpp/ext/gcp/observability_config.h" #include "src/cpp/ext/gcp/observability_logging_sink.h" namespace grpc { namespace internal { namespace { grpc::internal::ObservabilityLoggingSink* g_logging_sink = nullptr; bool g_gcp_observability_initialized = false; // TODO(yashykt): These constants are currently derived from the example at // https://cloud.google.com/traffic-director/docs/observability-proxyless#c++. // We might want these to be configurable. constexpr uint32_t kMaxAttributes = 128; constexpr uint32_t kMaxAnnotations = 128; constexpr uint32_t kMaxMessageEvents = 128; constexpr uint32_t kMaxLinks = 128; constexpr char kGoogleStackdriverTraceAddress[] = "cloudtrace.googleapis.com"; constexpr char kGoogleStackdriverStatsAddress[] = "monitoring.googleapis.com"; void RegisterOpenCensusViewsForGcpObservability() { // Register client default views for GCP observability experimental::ClientStartedRpcs().RegisterForExport(); experimental::ClientCompletedRpcs().RegisterForExport(); experimental::ClientRoundtripLatency().RegisterForExport(); internal::ClientApiLatency().RegisterForExport(); experimental::ClientSentCompressedMessageBytesPerRpc().RegisterForExport(); experimental::ClientReceivedCompressedMessageBytesPerRpc() .RegisterForExport(); // Register server default views for GCP observability experimental::ServerStartedRpcs().RegisterForExport(); experimental::ServerCompletedRpcs().RegisterForExport(); experimental::ServerSentCompressedMessageBytesPerRpc().RegisterForExport(); experimental::ServerReceivedCompressedMessageBytesPerRpc() .RegisterForExport(); experimental::ServerServerLatency().RegisterForExport(); } } // namespace absl::Status GcpObservabilityInit() { auto config = grpc::internal::GcpObservabilityConfig::ReadFromEnv(); if (!config.ok()) { return config.status(); } if (!config->cloud_trace.has_value() && !config->cloud_monitoring.has_value() && !config->cloud_logging.has_value()) { return absl::OkStatus(); } if (g_gcp_observability_initialized) { grpc_core::Crash("GCP Observability for gRPC was already initialized."); } g_gcp_observability_initialized = true; grpc::internal::EnvironmentAutoDetect::Create(config->project_id); if (!config->cloud_trace.has_value()) { // Disable OpenCensus tracing grpc::internal::EnableOpenCensusTracing(false); } if (!config->cloud_monitoring.has_value()) { // Disable OpenCensus stats grpc::internal::EnableOpenCensusStats(false); } else { // Register the OpenCensus client stats interceptor factory if stats are // enabled. Note that this is currently separate from the OpenCensus Plugin // to avoid changing the behavior of the currently available OpenCensus // plugin. grpc::internal::RegisterGlobalClientStatsInterceptorFactory( new grpc::internal::OpenCensusClientInterceptorFactory); } if (config->cloud_logging.has_value()) { g_logging_sink = new grpc::internal::ObservabilityLoggingSink( config->cloud_logging.value(), config->project_id, config->labels); grpc_core::RegisterLoggingFilter(g_logging_sink); } // If tracing or monitoring is enabled, we need to register the OpenCensus // plugin as well. if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) { grpc::RegisterOpenCensusPlugin(); } // If tracing or monitoring is enabled, we need to detect the environment for // OpenCensus, set the labels and attributes and prepare the StackDriver // exporter. // Note that this should be the last step of GcpObservabilityInit() since we // can't register any more filters after grpc_init. if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) { grpc_init(); grpc_core::Notification notification; grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone( [&]() { notification.Notify(); }); notification.WaitForNotification(); auto* resource = grpc::internal::EnvironmentAutoDetect::Get().resource(); if (config->cloud_trace.has_value()) { // Set up attributes for constant tracing std::vector attributes; attributes.reserve(resource->labels.size() + config->labels.size()); // First insert in environment labels for (const auto& resource_label : resource->labels) { attributes.push_back(internal::OpenCensusRegistry::Attribute{ absl::StrCat(resource->resource_type, ".", resource_label.first), resource_label.second}); } // Then insert in labels from the GCP Observability config. for (const auto& constant_label : config->labels) { attributes.push_back(internal::OpenCensusRegistry::Attribute{ constant_label.first, constant_label.second}); } grpc::internal::OpenCensusRegistry::Get().RegisterConstantAttributes( std::move(attributes)); } if (config->cloud_monitoring.has_value()) { grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels( config->labels); RegisterOpenCensusViewsForGcpObservability(); } // Note that we are setting up the exporters after registering the // attributes and labels to avoid a case where the exporters start an RPC // before we are ready. if (config->cloud_trace.has_value()) { // Set up the StackDriver Exporter for tracing. opencensus::trace::TraceConfig::SetCurrentTraceParams( {kMaxAttributes, kMaxAnnotations, kMaxMessageEvents, kMaxLinks, opencensus::trace::ProbabilitySampler( config->cloud_trace->sampling_rate)}); opencensus::exporters::trace::StackdriverOptions trace_opts; trace_opts.project_id = config->project_id; ChannelArguments args; args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); trace_opts.trace_service_stub = ::google::devtools::cloudtrace::v2::TraceService::NewStub( CreateCustomChannel(kGoogleStackdriverTraceAddress, GoogleDefaultCredentials(), args)); opencensus::exporters::trace::StackdriverExporter::Register( std::move(trace_opts)); } if (config->cloud_monitoring.has_value()) { // Set up the StackDriver Exporter for monitoring. opencensus::exporters::stats::StackdriverOptions stats_opts; stats_opts.project_id = config->project_id; stats_opts.monitored_resource.set_type(resource->resource_type); stats_opts.monitored_resource.mutable_labels()->insert( resource->labels.begin(), resource->labels.end()); ChannelArguments args; args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); stats_opts.metric_service_stub = google::monitoring::v3::MetricService::NewStub( CreateCustomChannel(kGoogleStackdriverStatsAddress, GoogleDefaultCredentials(), args)); opencensus::exporters::stats::StackdriverExporter::Register( std::move(stats_opts)); } grpc_shutdown(); } return absl::OkStatus(); } void GcpObservabilityClose() { if (g_logging_sink != nullptr) { g_logging_sink->FlushAndClose(); } // Currently, GcpObservabilityClose() only supports flushing logs. Stats and // tracing get automatically flushed at a regular interval, so sleep for an // interval to make sure that those are flushed too. absl::SleepFor(absl::Seconds(25)); } } // namespace internal namespace experimental { absl::Status GcpObservabilityInit() { return grpc::internal::GcpObservabilityInit(); } void GcpObservabilityClose() { return grpc::internal::GcpObservabilityClose(); } } // namespace experimental // // GcpObservability // absl::StatusOr GcpObservability::Init() { absl::Status status = grpc::internal::GcpObservabilityInit(); if (!status.ok()) { return status; } GcpObservability obj; obj.impl_ = std::make_unique(); return obj; } GcpObservability::GcpObservability(GcpObservability&& other) noexcept : impl_(std::move(other.impl_)) {} GcpObservability& GcpObservability::operator=( GcpObservability&& other) noexcept { if (this != &other) { impl_ = std::move(other.impl_); } return *this; } // // GcpObservability::GcpObservabilityImpl // GcpObservability::GcpObservabilityImpl::~GcpObservabilityImpl() { grpc::internal::GcpObservabilityClose(); } } // namespace grpc