xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/oob_backend_metric.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/oob_backend_metric.h"
20 
21 #include <string.h>
22 
23 #include <algorithm>
24 #include <set>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/status/status.h"
29 #include "absl/strings/string_view.h"
30 #include "google/protobuf/duration.upb.h"
31 #include "upb/mem/arena.hpp"
32 #include "xds/service/orca/v3/orca.upb.h"
33 
34 #include <grpc/impl/connectivity_state.h>
35 #include <grpc/slice.h>
36 #include <grpc/status.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/time.h>
40 
41 #include "src/core/client_channel/client_channel_channelz.h"
42 #include "src/core/client_channel/subchannel.h"
43 #include "src/core/client_channel/subchannel_stream_client.h"
44 #include "src/core/lib/channel/channel_trace.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/memory.h"
48 #include "src/core/lib/gprpp/orphanable.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/sync.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/iomgr/closure.h"
53 #include "src/core/lib/iomgr/error.h"
54 #include "src/core/lib/iomgr/exec_ctx.h"
55 #include "src/core/lib/iomgr/iomgr_fwd.h"
56 #include "src/core/lib/iomgr/pollset_set.h"
57 #include "src/core/lib/slice/slice.h"
58 #include "src/core/load_balancing/backend_metric_parser.h"
59 #include "src/core/load_balancing/oob_backend_metric_internal.h"
60 
61 namespace grpc_core {
62 
63 TraceFlag grpc_orca_client_trace(false, "orca_client");
64 
65 //
66 // OrcaProducer::ConnectivityWatcher
67 //
68 
69 class OrcaProducer::ConnectivityWatcher final
70     : public Subchannel::ConnectivityStateWatcherInterface {
71  public:
ConnectivityWatcher(WeakRefCountedPtr<OrcaProducer> producer)72   explicit ConnectivityWatcher(WeakRefCountedPtr<OrcaProducer> producer)
73       : producer_(std::move(producer)),
74         interested_parties_(grpc_pollset_set_create()) {}
75 
~ConnectivityWatcher()76   ~ConnectivityWatcher() override {
77     grpc_pollset_set_destroy(interested_parties_);
78   }
79 
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status &)80   void OnConnectivityStateChange(
81       RefCountedPtr<ConnectivityStateWatcherInterface> self,
82       grpc_connectivity_state state, const absl::Status&) override {
83     producer_->OnConnectivityStateChange(state);
84     self.reset();
85   }
86 
interested_parties()87   grpc_pollset_set* interested_parties() override {
88     return interested_parties_;
89   }
90 
91  private:
92   WeakRefCountedPtr<OrcaProducer> producer_;
93   grpc_pollset_set* interested_parties_;
94 };
95 
96 //
97 // OrcaProducer::OrcaStreamEventHandler
98 //
99 
100 class OrcaProducer::OrcaStreamEventHandler final
101     : public SubchannelStreamClient::CallEventHandler {
102  public:
OrcaStreamEventHandler(WeakRefCountedPtr<OrcaProducer> producer,Duration report_interval)103   OrcaStreamEventHandler(WeakRefCountedPtr<OrcaProducer> producer,
104                          Duration report_interval)
105       : producer_(std::move(producer)), report_interval_(report_interval) {}
106 
GetPathLocked()107   Slice GetPathLocked() override {
108     return Slice::FromStaticString(
109         "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics");
110   }
111 
OnCallStartLocked(SubchannelStreamClient *)112   void OnCallStartLocked(SubchannelStreamClient* /*client*/) override {}
113 
OnRetryTimerStartLocked(SubchannelStreamClient *)114   void OnRetryTimerStartLocked(SubchannelStreamClient* /*client*/) override {}
115 
EncodeSendMessageLocked()116   grpc_slice EncodeSendMessageLocked() override {
117     upb::Arena arena;
118     xds_service_orca_v3_OrcaLoadReportRequest* request =
119         xds_service_orca_v3_OrcaLoadReportRequest_new(arena.ptr());
120     gpr_timespec timespec = report_interval_.as_timespec();
121     auto* report_interval =
122         xds_service_orca_v3_OrcaLoadReportRequest_mutable_report_interval(
123             request, arena.ptr());
124     google_protobuf_Duration_set_seconds(report_interval, timespec.tv_sec);
125     google_protobuf_Duration_set_nanos(report_interval, timespec.tv_nsec);
126     size_t buf_length;
127     char* buf = xds_service_orca_v3_OrcaLoadReportRequest_serialize(
128         request, arena.ptr(), &buf_length);
129     grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
130     memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
131     return request_slice;
132   }
133 
RecvMessageReadyLocked(SubchannelStreamClient *,absl::string_view serialized_message)134   absl::Status RecvMessageReadyLocked(
135       SubchannelStreamClient* /*client*/,
136       absl::string_view serialized_message) override {
137     auto* allocator = new BackendMetricAllocator(producer_);
138     auto* backend_metric_data =
139         ParseBackendMetricData(serialized_message, allocator);
140     if (backend_metric_data == nullptr) {
141       delete allocator;
142       return absl::InvalidArgumentError("unable to parse Orca response");
143     }
144     allocator->AsyncNotifyWatchersAndDelete();
145     return absl::OkStatus();
146   }
147 
RecvTrailingMetadataReadyLocked(SubchannelStreamClient *,grpc_status_code status)148   void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* /*client*/,
149                                        grpc_status_code status) override {
150     if (status == GRPC_STATUS_UNIMPLEMENTED) {
151       static const char kErrorMessage[] =
152           "Orca stream returned UNIMPLEMENTED; disabling";
153       gpr_log(GPR_ERROR, kErrorMessage);
154       auto* channelz_node = producer_->subchannel_->channelz_node();
155       if (channelz_node != nullptr) {
156         channelz_node->AddTraceEvent(
157             channelz::ChannelTrace::Error,
158             grpc_slice_from_static_string(kErrorMessage));
159       }
160     }
161   }
162 
163  private:
164   // This class acts as storage for the parsed backend metric data.  It
165   // is injected into ParseBackendMetricData() as an allocator that
166   // returns internal storage.  It then also acts as a place to hold
167   // onto the data during an async hop into the ExecCtx before sending
168   // notifications, which avoids lock inversion problems due to
169   // acquiring producer_->mu_ while holding the lock from inside of
170   // SubchannelStreamClient.
171   class BackendMetricAllocator final : public BackendMetricAllocatorInterface {
172    public:
BackendMetricAllocator(WeakRefCountedPtr<OrcaProducer> producer)173     explicit BackendMetricAllocator(WeakRefCountedPtr<OrcaProducer> producer)
174         : producer_(std::move(producer)) {}
175 
AllocateBackendMetricData()176     BackendMetricData* AllocateBackendMetricData() override {
177       return &backend_metric_data_;
178     }
179 
AllocateString(size_t size)180     char* AllocateString(size_t size) override {
181       char* string = static_cast<char*>(gpr_malloc(size));
182       string_storage_.emplace_back(string);
183       return string;
184     }
185 
186     // Notifies watchers asynchronously and then deletes the
187     // BackendMetricAllocator object.
AsyncNotifyWatchersAndDelete()188     void AsyncNotifyWatchersAndDelete() {
189       GRPC_CLOSURE_INIT(&closure_, NotifyWatchersInExecCtx, this, nullptr);
190       ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
191     }
192 
193    private:
NotifyWatchersInExecCtx(void * arg,grpc_error_handle)194     static void NotifyWatchersInExecCtx(void* arg,
195                                         grpc_error_handle /*error*/) {
196       auto* self = static_cast<BackendMetricAllocator*>(arg);
197       self->producer_->NotifyWatchers(self->backend_metric_data_);
198       delete self;
199     }
200 
201     WeakRefCountedPtr<OrcaProducer> producer_;
202     BackendMetricData backend_metric_data_;
203     std::vector<UniquePtr<char>> string_storage_;
204     grpc_closure closure_;
205   };
206 
207   WeakRefCountedPtr<OrcaProducer> producer_;
208   const Duration report_interval_;
209 };
210 
211 //
212 // OrcaProducer
213 //
214 
Start(RefCountedPtr<Subchannel> subchannel)215 void OrcaProducer::Start(RefCountedPtr<Subchannel> subchannel) {
216   subchannel_ = std::move(subchannel);
217   connected_subchannel_ = subchannel_->connected_subchannel();
218   auto connectivity_watcher =
219       MakeRefCounted<ConnectivityWatcher>(WeakRefAsSubclass<OrcaProducer>());
220   connectivity_watcher_ = connectivity_watcher.get();
221   subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
222 }
223 
Orphaned()224 void OrcaProducer::Orphaned() {
225   {
226     MutexLock lock(&mu_);
227     stream_client_.reset();
228   }
229   GPR_ASSERT(subchannel_ != nullptr);  // Should not be called before Start().
230   subchannel_->CancelConnectivityStateWatch(connectivity_watcher_);
231   subchannel_->RemoveDataProducer(this);
232 }
233 
AddWatcher(OrcaWatcher * watcher)234 void OrcaProducer::AddWatcher(OrcaWatcher* watcher) {
235   MutexLock lock(&mu_);
236   watchers_.insert(watcher);
237   Duration watcher_interval = watcher->report_interval();
238   if (watcher_interval < report_interval_) {
239     report_interval_ = watcher_interval;
240     stream_client_.reset();
241     MaybeStartStreamLocked();
242   }
243 }
244 
RemoveWatcher(OrcaWatcher * watcher)245 void OrcaProducer::RemoveWatcher(OrcaWatcher* watcher) {
246   MutexLock lock(&mu_);
247   watchers_.erase(watcher);
248   if (watchers_.empty()) {
249     stream_client_.reset();
250     return;
251   }
252   Duration new_interval = GetMinIntervalLocked();
253   if (new_interval < report_interval_) {
254     report_interval_ = new_interval;
255     stream_client_.reset();
256     MaybeStartStreamLocked();
257   }
258 }
259 
GetMinIntervalLocked() const260 Duration OrcaProducer::GetMinIntervalLocked() const {
261   Duration duration = Duration::Infinity();
262   for (OrcaWatcher* watcher : watchers_) {
263     Duration watcher_interval = watcher->report_interval();
264     if (watcher_interval < duration) duration = watcher_interval;
265   }
266   return duration;
267 }
268 
MaybeStartStreamLocked()269 void OrcaProducer::MaybeStartStreamLocked() {
270   if (connected_subchannel_ == nullptr) return;
271   stream_client_ = MakeOrphanable<SubchannelStreamClient>(
272       connected_subchannel_, subchannel_->pollset_set(),
273       std::make_unique<OrcaStreamEventHandler>(
274           WeakRefAsSubclass<OrcaProducer>(), report_interval_),
275       GRPC_TRACE_FLAG_ENABLED(grpc_orca_client_trace) ? "OrcaClient" : nullptr);
276 }
277 
NotifyWatchers(const BackendMetricData & backend_metric_data)278 void OrcaProducer::NotifyWatchers(
279     const BackendMetricData& backend_metric_data) {
280   if (GRPC_TRACE_FLAG_ENABLED(grpc_orca_client_trace)) {
281     gpr_log(GPR_INFO, "OrcaProducer %p: reporting backend metrics to watchers",
282             this);
283   }
284   MutexLock lock(&mu_);
285   for (OrcaWatcher* watcher : watchers_) {
286     watcher->watcher()->OnBackendMetricReport(backend_metric_data);
287   }
288 }
289 
OnConnectivityStateChange(grpc_connectivity_state state)290 void OrcaProducer::OnConnectivityStateChange(grpc_connectivity_state state) {
291   MutexLock lock(&mu_);
292   if (state == GRPC_CHANNEL_READY) {
293     connected_subchannel_ = subchannel_->connected_subchannel();
294     if (!watchers_.empty()) MaybeStartStreamLocked();
295   } else {
296     connected_subchannel_.reset();
297     stream_client_.reset();
298   }
299 }
300 
301 //
302 // OrcaWatcher
303 //
304 
~OrcaWatcher()305 OrcaWatcher::~OrcaWatcher() {
306   if (producer_ != nullptr) producer_->RemoveWatcher(this);
307 }
308 
SetSubchannel(Subchannel * subchannel)309 void OrcaWatcher::SetSubchannel(Subchannel* subchannel) {
310   bool created = false;
311   // Check if our producer is already registered with the subchannel.
312   // If not, create a new one.
313   subchannel->GetOrAddDataProducer(
314       OrcaProducer::Type(), [&](Subchannel::DataProducerInterface** producer) {
315         if (*producer != nullptr) {
316           producer_ =
317               (*producer)->RefIfNonZero().TakeAsSubclass<OrcaProducer>();
318         }
319         if (producer_ == nullptr) {
320           producer_ = MakeRefCounted<OrcaProducer>();
321           *producer = producer_.get();
322           created = true;
323         }
324       });
325   // If we just created the producer, start it.
326   // This needs to be done outside of the lambda passed to
327   // GetOrAddDataProducer() to avoid deadlocking by re-acquiring the
328   // subchannel lock while already holding it.
329   if (created) producer_->Start(subchannel->Ref());
330   // Register ourself with the producer.
331   producer_->AddWatcher(this);
332 }
333 
334 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
MakeOobBackendMetricWatcher(Duration report_interval,std::unique_ptr<OobBackendMetricWatcher> watcher)335 MakeOobBackendMetricWatcher(Duration report_interval,
336                             std::unique_ptr<OobBackendMetricWatcher> watcher) {
337   return std::make_unique<OrcaWatcher>(report_interval, std::move(watcher));
338 }
339 
340 }  // namespace grpc_core
341