1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <stdint.h>
20 #include <string.h>
21
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 #include "upb/base/string_view.h"
35 #include "upb/mem/arena.hpp"
36
37 #include <grpc/impl/channel_arg_names.h>
38 #include <grpc/impl/connectivity_state.h>
39 #include <grpc/slice.h>
40 #include <grpc/status.h>
41 #include <grpc/support/log.h>
42
43 #include "src/core/client_channel/client_channel_channelz.h"
44 #include "src/core/client_channel/client_channel_internal.h"
45 #include "src/core/client_channel/subchannel.h"
46 #include "src/core/client_channel/subchannel_stream_client.h"
47 #include "src/core/lib/address_utils/sockaddr_utils.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/channel_trace.h"
50 #include "src/core/lib/debug/trace.h"
51 #include "src/core/lib/gprpp/debug_location.h"
52 #include "src/core/lib/gprpp/orphanable.h"
53 #include "src/core/lib/gprpp/ref_counted_ptr.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/gprpp/work_serializer.h"
56 #include "src/core/lib/iomgr/closure.h"
57 #include "src/core/lib/iomgr/error.h"
58 #include "src/core/lib/iomgr/exec_ctx.h"
59 #include "src/core/lib/iomgr/iomgr_fwd.h"
60 #include "src/core/lib/iomgr/pollset_set.h"
61 #include "src/core/lib/slice/slice.h"
62 #include "src/core/lib/transport/connectivity_state.h"
63 #include "src/core/load_balancing/health_check_client_internal.h"
64 #include "src/core/load_balancing/subchannel_interface.h"
65 #include "src/proto/grpc/health/v1/health.upb.h"
66
67 namespace grpc_core {
68
69 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
70
71 namespace {
72
73 // A fire-and-forget class to asynchronously drain a WorkSerializer queue.
74 class AsyncWorkSerializerDrainer final {
75 public:
AsyncWorkSerializerDrainer(std::shared_ptr<WorkSerializer> work_serializer)76 explicit AsyncWorkSerializerDrainer(
77 std::shared_ptr<WorkSerializer> work_serializer)
78 : work_serializer_(std::move(work_serializer)) {
79 GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
80 ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
81 }
82
83 private:
RunInExecCtx(void * arg,grpc_error_handle)84 static void RunInExecCtx(void* arg, grpc_error_handle) {
85 auto* self = static_cast<AsyncWorkSerializerDrainer*>(arg);
86 self->work_serializer_->DrainQueue();
87 delete self;
88 }
89
90 std::shared_ptr<WorkSerializer> work_serializer_;
91 grpc_closure closure_;
92 };
93
94 } // namespace
95
96 //
97 // HealthProducer::HealthChecker
98 //
99
HealthChecker(WeakRefCountedPtr<HealthProducer> producer,absl::string_view health_check_service_name)100 HealthProducer::HealthChecker::HealthChecker(
101 WeakRefCountedPtr<HealthProducer> producer,
102 absl::string_view health_check_service_name)
103 : producer_(std::move(producer)),
104 health_check_service_name_(health_check_service_name),
105 state_(producer_->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
106 : producer_->state_),
107 status_(producer_->status_) {
108 // If the subchannel is already connected, start health checking.
109 if (producer_->state_ == GRPC_CHANNEL_READY) StartHealthStreamLocked();
110 }
111
Orphan()112 void HealthProducer::HealthChecker::Orphan() {
113 stream_client_.reset();
114 Unref();
115 }
116
AddWatcherLocked(HealthWatcher * watcher)117 void HealthProducer::HealthChecker::AddWatcherLocked(HealthWatcher* watcher) {
118 watchers_.insert(watcher);
119 if (state_.has_value()) watcher->Notify(*state_, status_);
120 }
121
RemoveWatcherLocked(HealthWatcher * watcher)122 bool HealthProducer::HealthChecker::RemoveWatcherLocked(
123 HealthWatcher* watcher) {
124 watchers_.erase(watcher);
125 return watchers_.empty();
126 }
127
OnConnectivityStateChangeLocked(grpc_connectivity_state state,const absl::Status & status)128 void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(
129 grpc_connectivity_state state, const absl::Status& status) {
130 if (state == GRPC_CHANNEL_READY) {
131 // We should already be in CONNECTING, and we don't want to change
132 // that until we see the initial response on the stream.
133 if (!state_.has_value()) {
134 state_ = GRPC_CHANNEL_CONNECTING;
135 status_ = absl::OkStatus();
136 } else {
137 GPR_ASSERT(state_ == GRPC_CHANNEL_CONNECTING);
138 }
139 // Start the health watch stream.
140 StartHealthStreamLocked();
141 } else {
142 state_ = state;
143 status_ = status;
144 NotifyWatchersLocked(*state_, status_);
145 // We're not connected, so stop health checking.
146 stream_client_.reset();
147 }
148 }
149
StartHealthStreamLocked()150 void HealthProducer::HealthChecker::StartHealthStreamLocked() {
151 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
152 gpr_log(GPR_INFO,
153 "HealthProducer %p HealthChecker %p: "
154 "creating HealthClient for \"%s\"",
155 producer_.get(), this,
156 std::string(health_check_service_name_).c_str());
157 }
158 stream_client_ = MakeOrphanable<SubchannelStreamClient>(
159 producer_->connected_subchannel_, producer_->subchannel_->pollset_set(),
160 std::make_unique<HealthStreamEventHandler>(Ref()),
161 GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace) ? "HealthClient"
162 : nullptr);
163 }
164
NotifyWatchersLocked(grpc_connectivity_state state,absl::Status status)165 void HealthProducer::HealthChecker::NotifyWatchersLocked(
166 grpc_connectivity_state state, absl::Status status) {
167 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168 gpr_log(
169 GPR_INFO,
170 "HealthProducer %p HealthChecker %p: reporting state %s to watchers",
171 producer_.get(), this, ConnectivityStateName(state));
172 }
173 work_serializer_->Schedule(
174 [self = Ref(), state, status = std::move(status)]() {
175 MutexLock lock(&self->producer_->mu_);
176 for (HealthWatcher* watcher : self->watchers_) {
177 watcher->Notify(state, status);
178 }
179 },
180 DEBUG_LOCATION);
181 new AsyncWorkSerializerDrainer(work_serializer_);
182 }
183
OnHealthWatchStatusChange(grpc_connectivity_state state,const absl::Status & status)184 void HealthProducer::HealthChecker::OnHealthWatchStatusChange(
185 grpc_connectivity_state state, const absl::Status& status) {
186 if (state == GRPC_CHANNEL_SHUTDOWN) return;
187 // Prepend the subchannel's address to the status if needed.
188 absl::Status use_status;
189 if (!status.ok()) {
190 std::string address_str =
191 grpc_sockaddr_to_uri(&producer_->subchannel_->address())
192 .value_or("<unknown address type>");
193 use_status = absl::Status(
194 status.code(), absl::StrCat(address_str, ": ", status.message()));
195 }
196 work_serializer_->Schedule(
197 [self = Ref(), state, status = std::move(use_status)]() mutable {
198 MutexLock lock(&self->producer_->mu_);
199 if (self->stream_client_ != nullptr) {
200 self->state_ = state;
201 self->status_ = std::move(status);
202 for (HealthWatcher* watcher : self->watchers_) {
203 watcher->Notify(state, self->status_);
204 }
205 }
206 },
207 DEBUG_LOCATION);
208 new AsyncWorkSerializerDrainer(work_serializer_);
209 }
210
211 //
212 // HealthProducer::HealthChecker::HealthStreamEventHandler
213 //
214
215 class HealthProducer::HealthChecker::HealthStreamEventHandler final
216 : public SubchannelStreamClient::CallEventHandler {
217 public:
HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)218 explicit HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)
219 : health_checker_(std::move(health_checker)) {}
220
GetPathLocked()221 Slice GetPathLocked() override {
222 return Slice::FromStaticString("/grpc.health.v1.Health/Watch");
223 }
224
OnCallStartLocked(SubchannelStreamClient * client)225 void OnCallStartLocked(SubchannelStreamClient* client) override {
226 SetHealthStatusLocked(client, GRPC_CHANNEL_CONNECTING,
227 "starting health watch");
228 }
229
OnRetryTimerStartLocked(SubchannelStreamClient * client)230 void OnRetryTimerStartLocked(SubchannelStreamClient* client) override {
231 SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
232 "health check call failed; will retry after backoff");
233 }
234
EncodeSendMessageLocked()235 grpc_slice EncodeSendMessageLocked() override {
236 upb::Arena arena;
237 grpc_health_v1_HealthCheckRequest* request_struct =
238 grpc_health_v1_HealthCheckRequest_new(arena.ptr());
239 grpc_health_v1_HealthCheckRequest_set_service(
240 request_struct,
241 upb_StringView_FromDataAndSize(
242 health_checker_->health_check_service_name_.data(),
243 health_checker_->health_check_service_name_.size()));
244 size_t buf_length;
245 char* buf = grpc_health_v1_HealthCheckRequest_serialize(
246 request_struct, arena.ptr(), &buf_length);
247 grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
248 memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
249 return request_slice;
250 }
251
RecvMessageReadyLocked(SubchannelStreamClient * client,absl::string_view serialized_message)252 absl::Status RecvMessageReadyLocked(
253 SubchannelStreamClient* client,
254 absl::string_view serialized_message) override {
255 auto healthy = DecodeResponse(serialized_message);
256 if (!healthy.ok()) {
257 SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
258 healthy.status().ToString().c_str());
259 return healthy.status();
260 }
261 if (!*healthy) {
262 SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
263 "backend unhealthy");
264 } else {
265 SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK");
266 }
267 return absl::OkStatus();
268 }
269
RecvTrailingMetadataReadyLocked(SubchannelStreamClient * client,grpc_status_code status)270 void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
271 grpc_status_code status) override {
272 if (status == GRPC_STATUS_UNIMPLEMENTED) {
273 static const char kErrorMessage[] =
274 "health checking Watch method returned UNIMPLEMENTED; "
275 "disabling health checks but assuming server is healthy";
276 gpr_log(GPR_ERROR, kErrorMessage);
277 auto* channelz_node =
278 health_checker_->producer_->subchannel_->channelz_node();
279 if (channelz_node != nullptr) {
280 channelz_node->AddTraceEvent(
281 channelz::ChannelTrace::Error,
282 grpc_slice_from_static_string(kErrorMessage));
283 }
284 SetHealthStatusLocked(client, GRPC_CHANNEL_READY, kErrorMessage);
285 }
286 }
287
288 private:
289 // Returns true if healthy.
DecodeResponse(absl::string_view serialized_message)290 static absl::StatusOr<bool> DecodeResponse(
291 absl::string_view serialized_message) {
292 // Deserialize message.
293 upb::Arena arena;
294 auto* response = grpc_health_v1_HealthCheckResponse_parse(
295 serialized_message.data(), serialized_message.size(), arena.ptr());
296 if (response == nullptr) {
297 // Can't parse message; assume unhealthy.
298 return absl::InvalidArgumentError("cannot parse health check response");
299 }
300 int32_t status = grpc_health_v1_HealthCheckResponse_status(response);
301 return status == grpc_health_v1_HealthCheckResponse_SERVING;
302 }
303
SetHealthStatusLocked(SubchannelStreamClient * client,grpc_connectivity_state state,const char * reason)304 void SetHealthStatusLocked(SubchannelStreamClient* client,
305 grpc_connectivity_state state,
306 const char* reason) {
307 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
308 gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s",
309 client, ConnectivityStateName(state), reason);
310 }
311 health_checker_->OnHealthWatchStatusChange(
312 state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
313 ? absl::UnavailableError(reason)
314 : absl::OkStatus());
315 }
316
317 RefCountedPtr<HealthChecker> health_checker_;
318 };
319
320 //
321 // HealthProducer::ConnectivityWatcher
322 //
323
324 class HealthProducer::ConnectivityWatcher final
325 : public Subchannel::ConnectivityStateWatcherInterface {
326 public:
ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)327 explicit ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)
328 : producer_(std::move(producer)) {}
329
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)330 void OnConnectivityStateChange(
331 RefCountedPtr<ConnectivityStateWatcherInterface> self,
332 grpc_connectivity_state state, const absl::Status& status) override {
333 producer_->OnConnectivityStateChange(state, status);
334 self.reset();
335 }
336
interested_parties()337 grpc_pollset_set* interested_parties() override {
338 return producer_->interested_parties_;
339 }
340
341 private:
342 WeakRefCountedPtr<HealthProducer> producer_;
343 };
344
345 //
346 // HealthProducer
347 //
348
Start(RefCountedPtr<Subchannel> subchannel)349 void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
350 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
351 gpr_log(GPR_INFO, "HealthProducer %p: starting with subchannel %p", this,
352 subchannel.get());
353 }
354 subchannel_ = std::move(subchannel);
355 {
356 MutexLock lock(&mu_);
357 connected_subchannel_ = subchannel_->connected_subchannel();
358 }
359 auto connectivity_watcher =
360 MakeRefCounted<ConnectivityWatcher>(WeakRefAsSubclass<HealthProducer>());
361 connectivity_watcher_ = connectivity_watcher.get();
362 subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
363 }
364
Orphaned()365 void HealthProducer::Orphaned() {
366 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
367 gpr_log(GPR_INFO, "HealthProducer %p: shutting down", this);
368 }
369 {
370 MutexLock lock(&mu_);
371 health_checkers_.clear();
372 }
373 subchannel_->CancelConnectivityStateWatch(connectivity_watcher_);
374 subchannel_->RemoveDataProducer(this);
375 }
376
AddWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)377 void HealthProducer::AddWatcher(
378 HealthWatcher* watcher,
379 const absl::optional<std::string>& health_check_service_name) {
380 MutexLock lock(&mu_);
381 grpc_pollset_set_add_pollset_set(interested_parties_,
382 watcher->interested_parties());
383 if (!health_check_service_name.has_value()) {
384 if (state_.has_value()) watcher->Notify(*state_, status_);
385 non_health_watchers_.insert(watcher);
386 } else {
387 auto it =
388 health_checkers_.emplace(*health_check_service_name, nullptr).first;
389 auto& health_checker = it->second;
390 if (health_checker == nullptr) {
391 health_checker = MakeOrphanable<HealthChecker>(
392 WeakRefAsSubclass<HealthProducer>(), it->first);
393 }
394 health_checker->AddWatcherLocked(watcher);
395 }
396 }
397
RemoveWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)398 void HealthProducer::RemoveWatcher(
399 HealthWatcher* watcher,
400 const absl::optional<std::string>& health_check_service_name) {
401 MutexLock lock(&mu_);
402 grpc_pollset_set_del_pollset_set(interested_parties_,
403 watcher->interested_parties());
404 if (!health_check_service_name.has_value()) {
405 non_health_watchers_.erase(watcher);
406 } else {
407 auto it = health_checkers_.find(*health_check_service_name);
408 if (it == health_checkers_.end()) return;
409 const bool empty = it->second->RemoveWatcherLocked(watcher);
410 if (empty) health_checkers_.erase(it);
411 }
412 }
413
OnConnectivityStateChange(grpc_connectivity_state state,const absl::Status & status)414 void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
415 const absl::Status& status) {
416 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
417 gpr_log(GPR_INFO,
418 "HealthProducer %p: subchannel state update: state=%s status=%s",
419 this, ConnectivityStateName(state), status.ToString().c_str());
420 }
421 MutexLock lock(&mu_);
422 state_ = state;
423 status_ = status;
424 if (state == GRPC_CHANNEL_READY) {
425 connected_subchannel_ = subchannel_->connected_subchannel();
426 } else {
427 connected_subchannel_.reset();
428 }
429 for (const auto& p : health_checkers_) {
430 p.second->OnConnectivityStateChangeLocked(state, status);
431 }
432 for (HealthWatcher* watcher : non_health_watchers_) {
433 watcher->Notify(state, status);
434 }
435 }
436
437 //
438 // HealthWatcher
439 //
440
~HealthWatcher()441 HealthWatcher::~HealthWatcher() {
442 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
443 gpr_log(GPR_INFO,
444 "HealthWatcher %p: unregistering from producer %p "
445 "(health_check_service_name=\"%s\")",
446 this, producer_.get(),
447 health_check_service_name_.value_or("N/A").c_str());
448 }
449 if (producer_ != nullptr) {
450 producer_->RemoveWatcher(this, health_check_service_name_);
451 }
452 }
453
SetSubchannel(Subchannel * subchannel)454 void HealthWatcher::SetSubchannel(Subchannel* subchannel) {
455 bool created = false;
456 // Check if our producer is already registered with the subchannel.
457 // If not, create a new one.
458 subchannel->GetOrAddDataProducer(
459 HealthProducer::Type(),
460 [&](Subchannel::DataProducerInterface** producer) {
461 if (*producer != nullptr) {
462 producer_ =
463 (*producer)->RefIfNonZero().TakeAsSubclass<HealthProducer>();
464 }
465 if (producer_ == nullptr) {
466 producer_ = MakeRefCounted<HealthProducer>();
467 *producer = producer_.get();
468 created = true;
469 }
470 });
471 // If we just created the producer, start it.
472 // This needs to be done outside of the lambda passed to
473 // GetOrAddDataProducer() to avoid deadlocking by re-acquiring the
474 // subchannel lock while already holding it.
475 if (created) producer_->Start(subchannel->Ref());
476 // Register ourself with the producer.
477 producer_->AddWatcher(this, health_check_service_name_);
478 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
479 gpr_log(GPR_INFO,
480 "HealthWatcher %p: registered with producer %p (created=%d, "
481 "health_check_service_name=\"%s\")",
482 this, producer_.get(), created,
483 health_check_service_name_.value_or("N/A").c_str());
484 }
485 }
486
Notify(grpc_connectivity_state state,absl::Status status)487 void HealthWatcher::Notify(grpc_connectivity_state state, absl::Status status) {
488 work_serializer_->Schedule(
489 [watcher = watcher_, state, status = std::move(status)]() mutable {
490 watcher->OnConnectivityStateChange(state, std::move(status));
491 },
492 DEBUG_LOCATION);
493 new AsyncWorkSerializerDrainer(work_serializer_);
494 }
495
496 //
497 // External API
498 //
499
500 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
MakeHealthCheckWatcher(std::shared_ptr<WorkSerializer> work_serializer,const ChannelArgs & args,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)501 MakeHealthCheckWatcher(
502 std::shared_ptr<WorkSerializer> work_serializer, const ChannelArgs& args,
503 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
504 watcher) {
505 absl::optional<std::string> health_check_service_name;
506 if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
507 health_check_service_name =
508 args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
509 }
510 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
511 gpr_log(GPR_INFO,
512 "creating HealthWatcher -- health_check_service_name=\"%s\"",
513 health_check_service_name.value_or("N/A").c_str());
514 }
515 return std::make_unique<HealthWatcher>(std::move(work_serializer),
516 std::move(health_check_service_name),
517 std::move(watcher));
518 }
519
520 } // namespace grpc_core
521