xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/health_check_client.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28 
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 #include "upb/base/string_view.h"
35 #include "upb/mem/arena.hpp"
36 
37 #include <grpc/impl/channel_arg_names.h>
38 #include <grpc/impl/connectivity_state.h>
39 #include <grpc/slice.h>
40 #include <grpc/status.h>
41 #include <grpc/support/log.h>
42 
43 #include "src/core/client_channel/client_channel_channelz.h"
44 #include "src/core/client_channel/client_channel_internal.h"
45 #include "src/core/client_channel/subchannel.h"
46 #include "src/core/client_channel/subchannel_stream_client.h"
47 #include "src/core/lib/address_utils/sockaddr_utils.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/channel_trace.h"
50 #include "src/core/lib/debug/trace.h"
51 #include "src/core/lib/gprpp/debug_location.h"
52 #include "src/core/lib/gprpp/orphanable.h"
53 #include "src/core/lib/gprpp/ref_counted_ptr.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/gprpp/work_serializer.h"
56 #include "src/core/lib/iomgr/closure.h"
57 #include "src/core/lib/iomgr/error.h"
58 #include "src/core/lib/iomgr/exec_ctx.h"
59 #include "src/core/lib/iomgr/iomgr_fwd.h"
60 #include "src/core/lib/iomgr/pollset_set.h"
61 #include "src/core/lib/slice/slice.h"
62 #include "src/core/lib/transport/connectivity_state.h"
63 #include "src/core/load_balancing/health_check_client_internal.h"
64 #include "src/core/load_balancing/subchannel_interface.h"
65 #include "src/proto/grpc/health/v1/health.upb.h"
66 
67 namespace grpc_core {
68 
69 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
70 
71 namespace {
72 
73 // A fire-and-forget class to asynchronously drain a WorkSerializer queue.
74 class AsyncWorkSerializerDrainer final {
75  public:
AsyncWorkSerializerDrainer(std::shared_ptr<WorkSerializer> work_serializer)76   explicit AsyncWorkSerializerDrainer(
77       std::shared_ptr<WorkSerializer> work_serializer)
78       : work_serializer_(std::move(work_serializer)) {
79     GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
80     ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
81   }
82 
83  private:
RunInExecCtx(void * arg,grpc_error_handle)84   static void RunInExecCtx(void* arg, grpc_error_handle) {
85     auto* self = static_cast<AsyncWorkSerializerDrainer*>(arg);
86     self->work_serializer_->DrainQueue();
87     delete self;
88   }
89 
90   std::shared_ptr<WorkSerializer> work_serializer_;
91   grpc_closure closure_;
92 };
93 
94 }  // namespace
95 
96 //
97 // HealthProducer::HealthChecker
98 //
99 
HealthChecker(WeakRefCountedPtr<HealthProducer> producer,absl::string_view health_check_service_name)100 HealthProducer::HealthChecker::HealthChecker(
101     WeakRefCountedPtr<HealthProducer> producer,
102     absl::string_view health_check_service_name)
103     : producer_(std::move(producer)),
104       health_check_service_name_(health_check_service_name),
105       state_(producer_->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
106                                                      : producer_->state_),
107       status_(producer_->status_) {
108   // If the subchannel is already connected, start health checking.
109   if (producer_->state_ == GRPC_CHANNEL_READY) StartHealthStreamLocked();
110 }
111 
Orphan()112 void HealthProducer::HealthChecker::Orphan() {
113   stream_client_.reset();
114   Unref();
115 }
116 
AddWatcherLocked(HealthWatcher * watcher)117 void HealthProducer::HealthChecker::AddWatcherLocked(HealthWatcher* watcher) {
118   watchers_.insert(watcher);
119   if (state_.has_value()) watcher->Notify(*state_, status_);
120 }
121 
RemoveWatcherLocked(HealthWatcher * watcher)122 bool HealthProducer::HealthChecker::RemoveWatcherLocked(
123     HealthWatcher* watcher) {
124   watchers_.erase(watcher);
125   return watchers_.empty();
126 }
127 
OnConnectivityStateChangeLocked(grpc_connectivity_state state,const absl::Status & status)128 void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(
129     grpc_connectivity_state state, const absl::Status& status) {
130   if (state == GRPC_CHANNEL_READY) {
131     // We should already be in CONNECTING, and we don't want to change
132     // that until we see the initial response on the stream.
133     if (!state_.has_value()) {
134       state_ = GRPC_CHANNEL_CONNECTING;
135       status_ = absl::OkStatus();
136     } else {
137       GPR_ASSERT(state_ == GRPC_CHANNEL_CONNECTING);
138     }
139     // Start the health watch stream.
140     StartHealthStreamLocked();
141   } else {
142     state_ = state;
143     status_ = status;
144     NotifyWatchersLocked(*state_, status_);
145     // We're not connected, so stop health checking.
146     stream_client_.reset();
147   }
148 }
149 
StartHealthStreamLocked()150 void HealthProducer::HealthChecker::StartHealthStreamLocked() {
151   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
152     gpr_log(GPR_INFO,
153             "HealthProducer %p HealthChecker %p: "
154             "creating HealthClient for \"%s\"",
155             producer_.get(), this,
156             std::string(health_check_service_name_).c_str());
157   }
158   stream_client_ = MakeOrphanable<SubchannelStreamClient>(
159       producer_->connected_subchannel_, producer_->subchannel_->pollset_set(),
160       std::make_unique<HealthStreamEventHandler>(Ref()),
161       GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace) ? "HealthClient"
162                                                               : nullptr);
163 }
164 
NotifyWatchersLocked(grpc_connectivity_state state,absl::Status status)165 void HealthProducer::HealthChecker::NotifyWatchersLocked(
166     grpc_connectivity_state state, absl::Status status) {
167   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168     gpr_log(
169         GPR_INFO,
170         "HealthProducer %p HealthChecker %p: reporting state %s to watchers",
171         producer_.get(), this, ConnectivityStateName(state));
172   }
173   work_serializer_->Schedule(
174       [self = Ref(), state, status = std::move(status)]() {
175         MutexLock lock(&self->producer_->mu_);
176         for (HealthWatcher* watcher : self->watchers_) {
177           watcher->Notify(state, status);
178         }
179       },
180       DEBUG_LOCATION);
181   new AsyncWorkSerializerDrainer(work_serializer_);
182 }
183 
OnHealthWatchStatusChange(grpc_connectivity_state state,const absl::Status & status)184 void HealthProducer::HealthChecker::OnHealthWatchStatusChange(
185     grpc_connectivity_state state, const absl::Status& status) {
186   if (state == GRPC_CHANNEL_SHUTDOWN) return;
187   // Prepend the subchannel's address to the status if needed.
188   absl::Status use_status;
189   if (!status.ok()) {
190     std::string address_str =
191         grpc_sockaddr_to_uri(&producer_->subchannel_->address())
192             .value_or("<unknown address type>");
193     use_status = absl::Status(
194         status.code(), absl::StrCat(address_str, ": ", status.message()));
195   }
196   work_serializer_->Schedule(
197       [self = Ref(), state, status = std::move(use_status)]() mutable {
198         MutexLock lock(&self->producer_->mu_);
199         if (self->stream_client_ != nullptr) {
200           self->state_ = state;
201           self->status_ = std::move(status);
202           for (HealthWatcher* watcher : self->watchers_) {
203             watcher->Notify(state, self->status_);
204           }
205         }
206       },
207       DEBUG_LOCATION);
208   new AsyncWorkSerializerDrainer(work_serializer_);
209 }
210 
211 //
212 // HealthProducer::HealthChecker::HealthStreamEventHandler
213 //
214 
215 class HealthProducer::HealthChecker::HealthStreamEventHandler final
216     : public SubchannelStreamClient::CallEventHandler {
217  public:
HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)218   explicit HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)
219       : health_checker_(std::move(health_checker)) {}
220 
GetPathLocked()221   Slice GetPathLocked() override {
222     return Slice::FromStaticString("/grpc.health.v1.Health/Watch");
223   }
224 
OnCallStartLocked(SubchannelStreamClient * client)225   void OnCallStartLocked(SubchannelStreamClient* client) override {
226     SetHealthStatusLocked(client, GRPC_CHANNEL_CONNECTING,
227                           "starting health watch");
228   }
229 
OnRetryTimerStartLocked(SubchannelStreamClient * client)230   void OnRetryTimerStartLocked(SubchannelStreamClient* client) override {
231     SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
232                           "health check call failed; will retry after backoff");
233   }
234 
EncodeSendMessageLocked()235   grpc_slice EncodeSendMessageLocked() override {
236     upb::Arena arena;
237     grpc_health_v1_HealthCheckRequest* request_struct =
238         grpc_health_v1_HealthCheckRequest_new(arena.ptr());
239     grpc_health_v1_HealthCheckRequest_set_service(
240         request_struct,
241         upb_StringView_FromDataAndSize(
242             health_checker_->health_check_service_name_.data(),
243             health_checker_->health_check_service_name_.size()));
244     size_t buf_length;
245     char* buf = grpc_health_v1_HealthCheckRequest_serialize(
246         request_struct, arena.ptr(), &buf_length);
247     grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
248     memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
249     return request_slice;
250   }
251 
RecvMessageReadyLocked(SubchannelStreamClient * client,absl::string_view serialized_message)252   absl::Status RecvMessageReadyLocked(
253       SubchannelStreamClient* client,
254       absl::string_view serialized_message) override {
255     auto healthy = DecodeResponse(serialized_message);
256     if (!healthy.ok()) {
257       SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
258                             healthy.status().ToString().c_str());
259       return healthy.status();
260     }
261     if (!*healthy) {
262       SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
263                             "backend unhealthy");
264     } else {
265       SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK");
266     }
267     return absl::OkStatus();
268   }
269 
RecvTrailingMetadataReadyLocked(SubchannelStreamClient * client,grpc_status_code status)270   void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
271                                        grpc_status_code status) override {
272     if (status == GRPC_STATUS_UNIMPLEMENTED) {
273       static const char kErrorMessage[] =
274           "health checking Watch method returned UNIMPLEMENTED; "
275           "disabling health checks but assuming server is healthy";
276       gpr_log(GPR_ERROR, kErrorMessage);
277       auto* channelz_node =
278           health_checker_->producer_->subchannel_->channelz_node();
279       if (channelz_node != nullptr) {
280         channelz_node->AddTraceEvent(
281             channelz::ChannelTrace::Error,
282             grpc_slice_from_static_string(kErrorMessage));
283       }
284       SetHealthStatusLocked(client, GRPC_CHANNEL_READY, kErrorMessage);
285     }
286   }
287 
288  private:
289   // Returns true if healthy.
DecodeResponse(absl::string_view serialized_message)290   static absl::StatusOr<bool> DecodeResponse(
291       absl::string_view serialized_message) {
292     // Deserialize message.
293     upb::Arena arena;
294     auto* response = grpc_health_v1_HealthCheckResponse_parse(
295         serialized_message.data(), serialized_message.size(), arena.ptr());
296     if (response == nullptr) {
297       // Can't parse message; assume unhealthy.
298       return absl::InvalidArgumentError("cannot parse health check response");
299     }
300     int32_t status = grpc_health_v1_HealthCheckResponse_status(response);
301     return status == grpc_health_v1_HealthCheckResponse_SERVING;
302   }
303 
SetHealthStatusLocked(SubchannelStreamClient * client,grpc_connectivity_state state,const char * reason)304   void SetHealthStatusLocked(SubchannelStreamClient* client,
305                              grpc_connectivity_state state,
306                              const char* reason) {
307     if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
308       gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s",
309               client, ConnectivityStateName(state), reason);
310     }
311     health_checker_->OnHealthWatchStatusChange(
312         state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
313                    ? absl::UnavailableError(reason)
314                    : absl::OkStatus());
315   }
316 
317   RefCountedPtr<HealthChecker> health_checker_;
318 };
319 
320 //
321 // HealthProducer::ConnectivityWatcher
322 //
323 
324 class HealthProducer::ConnectivityWatcher final
325     : public Subchannel::ConnectivityStateWatcherInterface {
326  public:
ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)327   explicit ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)
328       : producer_(std::move(producer)) {}
329 
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)330   void OnConnectivityStateChange(
331       RefCountedPtr<ConnectivityStateWatcherInterface> self,
332       grpc_connectivity_state state, const absl::Status& status) override {
333     producer_->OnConnectivityStateChange(state, status);
334     self.reset();
335   }
336 
interested_parties()337   grpc_pollset_set* interested_parties() override {
338     return producer_->interested_parties_;
339   }
340 
341  private:
342   WeakRefCountedPtr<HealthProducer> producer_;
343 };
344 
345 //
346 // HealthProducer
347 //
348 
Start(RefCountedPtr<Subchannel> subchannel)349 void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
350   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
351     gpr_log(GPR_INFO, "HealthProducer %p: starting with subchannel %p", this,
352             subchannel.get());
353   }
354   subchannel_ = std::move(subchannel);
355   {
356     MutexLock lock(&mu_);
357     connected_subchannel_ = subchannel_->connected_subchannel();
358   }
359   auto connectivity_watcher =
360       MakeRefCounted<ConnectivityWatcher>(WeakRefAsSubclass<HealthProducer>());
361   connectivity_watcher_ = connectivity_watcher.get();
362   subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
363 }
364 
Orphaned()365 void HealthProducer::Orphaned() {
366   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
367     gpr_log(GPR_INFO, "HealthProducer %p: shutting down", this);
368   }
369   {
370     MutexLock lock(&mu_);
371     health_checkers_.clear();
372   }
373   subchannel_->CancelConnectivityStateWatch(connectivity_watcher_);
374   subchannel_->RemoveDataProducer(this);
375 }
376 
AddWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)377 void HealthProducer::AddWatcher(
378     HealthWatcher* watcher,
379     const absl::optional<std::string>& health_check_service_name) {
380   MutexLock lock(&mu_);
381   grpc_pollset_set_add_pollset_set(interested_parties_,
382                                    watcher->interested_parties());
383   if (!health_check_service_name.has_value()) {
384     if (state_.has_value()) watcher->Notify(*state_, status_);
385     non_health_watchers_.insert(watcher);
386   } else {
387     auto it =
388         health_checkers_.emplace(*health_check_service_name, nullptr).first;
389     auto& health_checker = it->second;
390     if (health_checker == nullptr) {
391       health_checker = MakeOrphanable<HealthChecker>(
392           WeakRefAsSubclass<HealthProducer>(), it->first);
393     }
394     health_checker->AddWatcherLocked(watcher);
395   }
396 }
397 
RemoveWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)398 void HealthProducer::RemoveWatcher(
399     HealthWatcher* watcher,
400     const absl::optional<std::string>& health_check_service_name) {
401   MutexLock lock(&mu_);
402   grpc_pollset_set_del_pollset_set(interested_parties_,
403                                    watcher->interested_parties());
404   if (!health_check_service_name.has_value()) {
405     non_health_watchers_.erase(watcher);
406   } else {
407     auto it = health_checkers_.find(*health_check_service_name);
408     if (it == health_checkers_.end()) return;
409     const bool empty = it->second->RemoveWatcherLocked(watcher);
410     if (empty) health_checkers_.erase(it);
411   }
412 }
413 
OnConnectivityStateChange(grpc_connectivity_state state,const absl::Status & status)414 void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
415                                                const absl::Status& status) {
416   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
417     gpr_log(GPR_INFO,
418             "HealthProducer %p: subchannel state update: state=%s status=%s",
419             this, ConnectivityStateName(state), status.ToString().c_str());
420   }
421   MutexLock lock(&mu_);
422   state_ = state;
423   status_ = status;
424   if (state == GRPC_CHANNEL_READY) {
425     connected_subchannel_ = subchannel_->connected_subchannel();
426   } else {
427     connected_subchannel_.reset();
428   }
429   for (const auto& p : health_checkers_) {
430     p.second->OnConnectivityStateChangeLocked(state, status);
431   }
432   for (HealthWatcher* watcher : non_health_watchers_) {
433     watcher->Notify(state, status);
434   }
435 }
436 
437 //
438 // HealthWatcher
439 //
440 
~HealthWatcher()441 HealthWatcher::~HealthWatcher() {
442   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
443     gpr_log(GPR_INFO,
444             "HealthWatcher %p: unregistering from producer %p "
445             "(health_check_service_name=\"%s\")",
446             this, producer_.get(),
447             health_check_service_name_.value_or("N/A").c_str());
448   }
449   if (producer_ != nullptr) {
450     producer_->RemoveWatcher(this, health_check_service_name_);
451   }
452 }
453 
SetSubchannel(Subchannel * subchannel)454 void HealthWatcher::SetSubchannel(Subchannel* subchannel) {
455   bool created = false;
456   // Check if our producer is already registered with the subchannel.
457   // If not, create a new one.
458   subchannel->GetOrAddDataProducer(
459       HealthProducer::Type(),
460       [&](Subchannel::DataProducerInterface** producer) {
461         if (*producer != nullptr) {
462           producer_ =
463               (*producer)->RefIfNonZero().TakeAsSubclass<HealthProducer>();
464         }
465         if (producer_ == nullptr) {
466           producer_ = MakeRefCounted<HealthProducer>();
467           *producer = producer_.get();
468           created = true;
469         }
470       });
471   // If we just created the producer, start it.
472   // This needs to be done outside of the lambda passed to
473   // GetOrAddDataProducer() to avoid deadlocking by re-acquiring the
474   // subchannel lock while already holding it.
475   if (created) producer_->Start(subchannel->Ref());
476   // Register ourself with the producer.
477   producer_->AddWatcher(this, health_check_service_name_);
478   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
479     gpr_log(GPR_INFO,
480             "HealthWatcher %p: registered with producer %p (created=%d, "
481             "health_check_service_name=\"%s\")",
482             this, producer_.get(), created,
483             health_check_service_name_.value_or("N/A").c_str());
484   }
485 }
486 
Notify(grpc_connectivity_state state,absl::Status status)487 void HealthWatcher::Notify(grpc_connectivity_state state, absl::Status status) {
488   work_serializer_->Schedule(
489       [watcher = watcher_, state, status = std::move(status)]() mutable {
490         watcher->OnConnectivityStateChange(state, std::move(status));
491       },
492       DEBUG_LOCATION);
493   new AsyncWorkSerializerDrainer(work_serializer_);
494 }
495 
496 //
497 // External API
498 //
499 
500 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
MakeHealthCheckWatcher(std::shared_ptr<WorkSerializer> work_serializer,const ChannelArgs & args,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)501 MakeHealthCheckWatcher(
502     std::shared_ptr<WorkSerializer> work_serializer, const ChannelArgs& args,
503     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
504         watcher) {
505   absl::optional<std::string> health_check_service_name;
506   if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
507     health_check_service_name =
508         args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
509   }
510   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
511     gpr_log(GPR_INFO,
512             "creating HealthWatcher -- health_check_service_name=\"%s\"",
513             health_check_service_name.value_or("N/A").c_str());
514   }
515   return std::make_unique<HealthWatcher>(std::move(work_serializer),
516                                          std::move(health_check_service_name),
517                                          std::move(watcher));
518 }
519 
520 }  // namespace grpc_core
521