1 // 2 // Copyright 2022 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H 18 #define GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <map> 23 #include <memory> 24 #include <set> 25 #include <string> 26 #include <utility> 27 28 #include "absl/base/thread_annotations.h" 29 #include "absl/status/status.h" 30 #include "absl/strings/string_view.h" 31 #include "absl/types/optional.h" 32 33 #include <grpc/impl/connectivity_state.h> 34 35 #include "src/core/client_channel/subchannel.h" 36 #include "src/core/client_channel/subchannel_interface_internal.h" 37 #include "src/core/client_channel/subchannel_stream_client.h" 38 #include "src/core/lib/gprpp/orphanable.h" 39 #include "src/core/lib/gprpp/ref_counted_ptr.h" 40 #include "src/core/lib/gprpp/sync.h" 41 #include "src/core/lib/gprpp/unique_type_name.h" 42 #include "src/core/lib/gprpp/work_serializer.h" 43 #include "src/core/lib/iomgr/iomgr_fwd.h" 44 #include "src/core/lib/iomgr/pollset_set.h" 45 #include "src/core/load_balancing/subchannel_interface.h" 46 47 namespace grpc_core { 48 49 class HealthWatcher; 50 51 // This producer is registered with a subchannel. It creates a streaming 52 // health watch call for each health check service name that is being 53 // watched and reports the resulting connectivity state to all 54 // registered watchers. 55 class HealthProducer final : public Subchannel::DataProducerInterface { 56 public: HealthProducer()57 HealthProducer() : interested_parties_(grpc_pollset_set_create()) {} ~HealthProducer()58 ~HealthProducer() override { grpc_pollset_set_destroy(interested_parties_); } 59 60 void Start(RefCountedPtr<Subchannel> subchannel); 61 Type()62 static UniqueTypeName Type() { 63 static UniqueTypeName::Factory kFactory("health_check"); 64 return kFactory.Create(); 65 } 66 type()67 UniqueTypeName type() const override { return Type(); } 68 69 void AddWatcher(HealthWatcher* watcher, 70 const absl::optional<std::string>& health_check_service_name); 71 void RemoveWatcher( 72 HealthWatcher* watcher, 73 const absl::optional<std::string>& health_check_service_name); 74 75 private: 76 class ConnectivityWatcher; 77 78 // Health checker for a given health check service name. Contains the 79 // health check client and the list of watchers. 80 class HealthChecker final : public InternallyRefCounted<HealthChecker> { 81 public: 82 HealthChecker(WeakRefCountedPtr<HealthProducer> producer, 83 absl::string_view health_check_service_name); 84 85 // Disable thread-safety analysis because this method is called via 86 // OrphanablePtr<>, but there's no way to pass the lock annotation 87 // through there. 88 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; 89 90 void AddWatcherLocked(HealthWatcher* watcher) 91 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 92 93 // Returns true if this was the last watcher. 94 bool RemoveWatcherLocked(HealthWatcher* watcher) 95 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 96 97 // Called when the subchannel's connectivity state changes. 98 void OnConnectivityStateChangeLocked(grpc_connectivity_state state, 99 const absl::Status& status) 100 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 101 102 private: 103 class HealthStreamEventHandler; 104 105 // Starts a new stream if we have a connected subchannel. 106 // Called whenever the subchannel transitions to state READY or when a 107 // watcher is added. 108 void StartHealthStreamLocked() 109 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 110 111 // Notifies watchers of a new state. 112 // Called while holding the SubchannelStreamClient lock and possibly 113 // the producer lock, so must notify asynchronously, but in guaranteed 114 // order (hence the use of WorkSerializer). 115 void NotifyWatchersLocked(grpc_connectivity_state state, 116 absl::Status status) 117 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 118 119 // Called by the health check client when receiving an update. 120 void OnHealthWatchStatusChange(grpc_connectivity_state state, 121 const absl::Status& status); 122 123 WeakRefCountedPtr<HealthProducer> producer_; 124 absl::string_view health_check_service_name_; 125 std::shared_ptr<WorkSerializer> work_serializer_ = 126 std::make_shared<WorkSerializer>( 127 producer_->subchannel_->event_engine()); 128 129 absl::optional<grpc_connectivity_state> state_ 130 ABSL_GUARDED_BY(&HealthProducer::mu_); 131 absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_); 132 OrphanablePtr<SubchannelStreamClient> stream_client_ 133 ABSL_GUARDED_BY(&HealthProducer::mu_); 134 std::set<HealthWatcher*> watchers_ ABSL_GUARDED_BY(&HealthProducer::mu_); 135 }; 136 137 // Handles a connectivity state change on the subchannel. 138 void OnConnectivityStateChange(grpc_connectivity_state state, 139 const absl::Status& status); 140 void Orphaned() override; 141 142 RefCountedPtr<Subchannel> subchannel_; 143 ConnectivityWatcher* connectivity_watcher_; 144 grpc_pollset_set* interested_parties_; 145 146 Mutex mu_; 147 absl::optional<grpc_connectivity_state> state_ ABSL_GUARDED_BY(&mu_); 148 absl::Status status_ ABSL_GUARDED_BY(&mu_); 149 RefCountedPtr<ConnectedSubchannel> connected_subchannel_ 150 ABSL_GUARDED_BY(&mu_); 151 std::map<std::string /*health_check_service_name*/, 152 OrphanablePtr<HealthChecker>> 153 health_checkers_ ABSL_GUARDED_BY(&mu_); 154 std::set<HealthWatcher*> non_health_watchers_ ABSL_GUARDED_BY(&mu_); 155 }; 156 157 // A data watcher that handles health checking. 158 class HealthWatcher final : public InternalSubchannelDataWatcherInterface { 159 public: HealthWatcher(std::shared_ptr<WorkSerializer> work_serializer,absl::optional<std::string> health_check_service_name,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)160 HealthWatcher( 161 std::shared_ptr<WorkSerializer> work_serializer, 162 absl::optional<std::string> health_check_service_name, 163 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> 164 watcher) 165 : work_serializer_(std::move(work_serializer)), 166 health_check_service_name_(std::move(health_check_service_name)), 167 watcher_(std::move(watcher)) {} 168 ~HealthWatcher() override; 169 type()170 UniqueTypeName type() const override { return HealthProducer::Type(); } 171 172 // When the client channel sees this wrapper, it will pass it the real 173 // subchannel to use. 174 void SetSubchannel(Subchannel* subchannel) override; 175 176 // For intercepting the watcher before it gets up to the real subchannel. 177 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> TakeWatcher()178 TakeWatcher() { 179 return std::move(watcher_); 180 } SetWatcher(std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)181 void SetWatcher( 182 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> 183 watcher) { 184 watcher_ = std::move(watcher); 185 } 186 187 void Notify(grpc_connectivity_state state, absl::Status status); 188 interested_parties()189 grpc_pollset_set* interested_parties() const { 190 return watcher_->interested_parties(); 191 } 192 193 private: 194 std::shared_ptr<WorkSerializer> work_serializer_; 195 absl::optional<std::string> health_check_service_name_; 196 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> 197 watcher_; 198 RefCountedPtr<HealthProducer> producer_; 199 }; 200 201 } // namespace grpc_core 202 203 #endif // GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H 204