xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/health_check_client_internal.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
18 #define GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <utility>
27 
28 #include "absl/base/thread_annotations.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/string_view.h"
31 #include "absl/types/optional.h"
32 
33 #include <grpc/impl/connectivity_state.h>
34 
35 #include "src/core/client_channel/subchannel.h"
36 #include "src/core/client_channel/subchannel_interface_internal.h"
37 #include "src/core/client_channel/subchannel_stream_client.h"
38 #include "src/core/lib/gprpp/orphanable.h"
39 #include "src/core/lib/gprpp/ref_counted_ptr.h"
40 #include "src/core/lib/gprpp/sync.h"
41 #include "src/core/lib/gprpp/unique_type_name.h"
42 #include "src/core/lib/gprpp/work_serializer.h"
43 #include "src/core/lib/iomgr/iomgr_fwd.h"
44 #include "src/core/lib/iomgr/pollset_set.h"
45 #include "src/core/load_balancing/subchannel_interface.h"
46 
47 namespace grpc_core {
48 
49 class HealthWatcher;
50 
51 // This producer is registered with a subchannel.  It creates a streaming
52 // health watch call for each health check service name that is being
53 // watched and reports the resulting connectivity state to all
54 // registered watchers.
55 class HealthProducer final : public Subchannel::DataProducerInterface {
56  public:
HealthProducer()57   HealthProducer() : interested_parties_(grpc_pollset_set_create()) {}
~HealthProducer()58   ~HealthProducer() override { grpc_pollset_set_destroy(interested_parties_); }
59 
60   void Start(RefCountedPtr<Subchannel> subchannel);
61 
Type()62   static UniqueTypeName Type() {
63     static UniqueTypeName::Factory kFactory("health_check");
64     return kFactory.Create();
65   }
66 
type()67   UniqueTypeName type() const override { return Type(); }
68 
69   void AddWatcher(HealthWatcher* watcher,
70                   const absl::optional<std::string>& health_check_service_name);
71   void RemoveWatcher(
72       HealthWatcher* watcher,
73       const absl::optional<std::string>& health_check_service_name);
74 
75  private:
76   class ConnectivityWatcher;
77 
78   // Health checker for a given health check service name.  Contains the
79   // health check client and the list of watchers.
80   class HealthChecker final : public InternallyRefCounted<HealthChecker> {
81    public:
82     HealthChecker(WeakRefCountedPtr<HealthProducer> producer,
83                   absl::string_view health_check_service_name);
84 
85     // Disable thread-safety analysis because this method is called via
86     // OrphanablePtr<>, but there's no way to pass the lock annotation
87     // through there.
88     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
89 
90     void AddWatcherLocked(HealthWatcher* watcher)
91         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
92 
93     // Returns true if this was the last watcher.
94     bool RemoveWatcherLocked(HealthWatcher* watcher)
95         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
96 
97     // Called when the subchannel's connectivity state changes.
98     void OnConnectivityStateChangeLocked(grpc_connectivity_state state,
99                                          const absl::Status& status)
100         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
101 
102    private:
103     class HealthStreamEventHandler;
104 
105     // Starts a new stream if we have a connected subchannel.
106     // Called whenever the subchannel transitions to state READY or when a
107     // watcher is added.
108     void StartHealthStreamLocked()
109         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
110 
111     // Notifies watchers of a new state.
112     // Called while holding the SubchannelStreamClient lock and possibly
113     // the producer lock, so must notify asynchronously, but in guaranteed
114     // order (hence the use of WorkSerializer).
115     void NotifyWatchersLocked(grpc_connectivity_state state,
116                               absl::Status status)
117         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
118 
119     // Called by the health check client when receiving an update.
120     void OnHealthWatchStatusChange(grpc_connectivity_state state,
121                                    const absl::Status& status);
122 
123     WeakRefCountedPtr<HealthProducer> producer_;
124     absl::string_view health_check_service_name_;
125     std::shared_ptr<WorkSerializer> work_serializer_ =
126         std::make_shared<WorkSerializer>(
127             producer_->subchannel_->event_engine());
128 
129     absl::optional<grpc_connectivity_state> state_
130         ABSL_GUARDED_BY(&HealthProducer::mu_);
131     absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_);
132     OrphanablePtr<SubchannelStreamClient> stream_client_
133         ABSL_GUARDED_BY(&HealthProducer::mu_);
134     std::set<HealthWatcher*> watchers_ ABSL_GUARDED_BY(&HealthProducer::mu_);
135   };
136 
137   // Handles a connectivity state change on the subchannel.
138   void OnConnectivityStateChange(grpc_connectivity_state state,
139                                  const absl::Status& status);
140   void Orphaned() override;
141 
142   RefCountedPtr<Subchannel> subchannel_;
143   ConnectivityWatcher* connectivity_watcher_;
144   grpc_pollset_set* interested_parties_;
145 
146   Mutex mu_;
147   absl::optional<grpc_connectivity_state> state_ ABSL_GUARDED_BY(&mu_);
148   absl::Status status_ ABSL_GUARDED_BY(&mu_);
149   RefCountedPtr<ConnectedSubchannel> connected_subchannel_
150       ABSL_GUARDED_BY(&mu_);
151   std::map<std::string /*health_check_service_name*/,
152            OrphanablePtr<HealthChecker>>
153       health_checkers_ ABSL_GUARDED_BY(&mu_);
154   std::set<HealthWatcher*> non_health_watchers_ ABSL_GUARDED_BY(&mu_);
155 };
156 
157 // A data watcher that handles health checking.
158 class HealthWatcher final : public InternalSubchannelDataWatcherInterface {
159  public:
HealthWatcher(std::shared_ptr<WorkSerializer> work_serializer,absl::optional<std::string> health_check_service_name,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)160   HealthWatcher(
161       std::shared_ptr<WorkSerializer> work_serializer,
162       absl::optional<std::string> health_check_service_name,
163       std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
164           watcher)
165       : work_serializer_(std::move(work_serializer)),
166         health_check_service_name_(std::move(health_check_service_name)),
167         watcher_(std::move(watcher)) {}
168   ~HealthWatcher() override;
169 
type()170   UniqueTypeName type() const override { return HealthProducer::Type(); }
171 
172   // When the client channel sees this wrapper, it will pass it the real
173   // subchannel to use.
174   void SetSubchannel(Subchannel* subchannel) override;
175 
176   // For intercepting the watcher before it gets up to the real subchannel.
177   std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
TakeWatcher()178   TakeWatcher() {
179     return std::move(watcher_);
180   }
SetWatcher(std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)181   void SetWatcher(
182       std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
183           watcher) {
184     watcher_ = std::move(watcher);
185   }
186 
187   void Notify(grpc_connectivity_state state, absl::Status status);
188 
interested_parties()189   grpc_pollset_set* interested_parties() const {
190     return watcher_->interested_parties();
191   }
192 
193  private:
194   std::shared_ptr<WorkSerializer> work_serializer_;
195   absl::optional<std::string> health_check_service_name_;
196   std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
197       watcher_;
198   RefCountedPtr<HealthProducer> producer_;
199 };
200 
201 }  // namespace grpc_core
202 
203 #endif  // GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
204