1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
18 #define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
19
20 #include <grpc/support/port_platform.h>
21
22 #include <inttypes.h>
23 #include <string.h>
24
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29
30 #include "absl/status/status.h"
31 #include "absl/types/optional.h"
32
33 #include <grpc/grpc.h>
34 #include <grpc/impl/connectivity_state.h>
35 #include <grpc/support/log.h>
36
37 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
38 #include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/gprpp/debug_location.h"
41 #include "src/core/lib/gprpp/dual_ref_counted.h"
42 #include "src/core/lib/gprpp/manual_constructor.h"
43 #include "src/core/lib/gprpp/ref_counted_ptr.h"
44 #include "src/core/lib/gprpp/work_serializer.h"
45 #include "src/core/lib/iomgr/iomgr_fwd.h"
46 #include "src/core/lib/load_balancing/lb_policy.h"
47 #include "src/core/lib/load_balancing/subchannel_interface.h"
48 #include "src/core/lib/resolver/server_address.h"
49 #include "src/core/lib/transport/connectivity_state.h"
50
51 // Code for maintaining a list of subchannels within an LB policy.
52 //
53 // To use this, callers must create their own subclasses, like so:
54 //
55
56 // class MySubchannelList; // Forward declaration.
57
58 // class MySubchannelData
59 // : public SubchannelData<MySubchannelList, MySubchannelData> {
60 // public:
61 // void ProcessConnectivityChangeLocked(
62 // absl::optional<grpc_connectivity_state> old_state,
63 // grpc_connectivity_state new_state) override {
64 // // ...code to handle connectivity changes...
65 // }
66 // };
67
68 // class MySubchannelList
69 // : public SubchannelList<MySubchannelList, MySubchannelData> {
70 // };
71
72 //
73 // All methods will be called from within the client_channel work serializer.
74
75 namespace grpc_core {
76
77 // Forward declaration.
78 template <typename SubchannelListType, typename SubchannelDataType>
79 class SubchannelList;
80
81 // Stores data for a particular subchannel in a subchannel list.
82 // Callers must create a subclass that implements the
83 // ProcessConnectivityChangeLocked() method.
84 template <typename SubchannelListType, typename SubchannelDataType>
85 class SubchannelData {
86 public:
87 // Returns a pointer to the subchannel list containing this object.
subchannel_list()88 SubchannelListType* subchannel_list() const {
89 return static_cast<SubchannelListType*>(subchannel_list_);
90 }
91
92 // Returns the index into the subchannel list of this object.
Index()93 size_t Index() const {
94 return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
95 subchannel_list_->subchannel(0));
96 }
97
98 // Returns a pointer to the subchannel.
subchannel()99 SubchannelInterface* subchannel() const { return subchannel_.get(); }
100
101 // Returns the cached connectivity state, if any.
connectivity_state()102 absl::optional<grpc_connectivity_state> connectivity_state() {
103 return connectivity_state_;
104 }
connectivity_status()105 absl::Status connectivity_status() { return connectivity_status_; }
106
107 // Resets the connection backoff.
108 void ResetBackoffLocked();
109
110 // Cancels any pending connectivity watch and unrefs the subchannel.
111 void ShutdownLocked();
112
113 protected:
114 SubchannelData(
115 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
116 const ServerAddress& address,
117 RefCountedPtr<SubchannelInterface> subchannel);
118
119 virtual ~SubchannelData();
120
121 // This method will be invoked once soon after instantiation to report
122 // the current connectivity state, and it will then be invoked again
123 // whenever the connectivity state changes.
124 virtual void ProcessConnectivityChangeLocked(
125 absl::optional<grpc_connectivity_state> old_state,
126 grpc_connectivity_state new_state) = 0;
127
128 private:
129 // For accessing StartConnectivityWatchLocked().
130 friend class SubchannelList<SubchannelListType, SubchannelDataType>;
131
132 // Watcher for subchannel connectivity state.
133 class Watcher
134 : public SubchannelInterface::ConnectivityStateWatcherInterface {
135 public:
Watcher(SubchannelData<SubchannelListType,SubchannelDataType> * subchannel_data,WeakRefCountedPtr<SubchannelListType> subchannel_list)136 Watcher(
137 SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
138 WeakRefCountedPtr<SubchannelListType> subchannel_list)
139 : subchannel_data_(subchannel_data),
140 subchannel_list_(std::move(subchannel_list)) {}
141
~Watcher()142 ~Watcher() override {
143 subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
144 }
145
146 void OnConnectivityStateChange(grpc_connectivity_state new_state,
147 absl::Status status) override;
148
interested_parties()149 grpc_pollset_set* interested_parties() override {
150 return subchannel_list_->policy()->interested_parties();
151 }
152
153 private:
154 SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
155 WeakRefCountedPtr<SubchannelListType> subchannel_list_;
156 };
157
158 // Starts watching the connectivity state of the subchannel.
159 // ProcessConnectivityChangeLocked() will be called whenever the
160 // connectivity state changes.
161 void StartConnectivityWatchLocked();
162
163 // Cancels watching the connectivity state of the subchannel.
164 void CancelConnectivityWatchLocked(const char* reason);
165
166 // Unrefs the subchannel.
167 void UnrefSubchannelLocked(const char* reason);
168
169 // Backpointer to owning subchannel list. Not owned.
170 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
171 // The subchannel.
172 RefCountedPtr<SubchannelInterface> subchannel_;
173 // Will be non-null when the subchannel's state is being watched.
174 SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
175 nullptr;
176 // Data updated by the watcher.
177 absl::optional<grpc_connectivity_state> connectivity_state_;
178 absl::Status connectivity_status_;
179 };
180
181 // A list of subchannels.
182 template <typename SubchannelListType, typename SubchannelDataType>
183 class SubchannelList : public DualRefCounted<SubchannelListType> {
184 public:
185 // Starts watching the connectivity state of all subchannels.
186 // Must be called immediately after instantiation.
187 void StartWatchingLocked();
188
189 // The number of subchannels in the list.
num_subchannels()190 size_t num_subchannels() const { return subchannels_.size(); }
191
192 // The data for the subchannel at a particular index.
subchannel(size_t index)193 SubchannelDataType* subchannel(size_t index) {
194 return subchannels_[index].get();
195 }
196
197 // Returns true if the subchannel list is shutting down.
shutting_down()198 bool shutting_down() const { return shutting_down_; }
199
200 // Accessors.
policy()201 LoadBalancingPolicy* policy() const { return policy_; }
tracer()202 const char* tracer() const { return tracer_; }
203
204 // Resets connection backoff of all subchannels.
205 void ResetBackoffLocked();
206
207 // Returns true if all subchannels have seen their initial
208 // connectivity state notifications.
209 bool AllSubchannelsSeenInitialState();
210
211 void Orphan() override;
212
213 protected:
214 SubchannelList(LoadBalancingPolicy* policy, const char* tracer,
215 ServerAddressList addresses,
216 LoadBalancingPolicy::ChannelControlHelper* helper,
217 const ChannelArgs& args);
218
219 virtual ~SubchannelList();
220
221 private:
222 // For accessing Ref() and Unref().
223 friend class SubchannelData<SubchannelListType, SubchannelDataType>;
224
225 virtual std::shared_ptr<WorkSerializer> work_serializer() const = 0;
226
227 // Backpointer to owning policy.
228 LoadBalancingPolicy* policy_;
229
230 const char* tracer_;
231
232 absl::optional<std::string> health_check_service_name_;
233
234 // The list of subchannels.
235 // We use ManualConstructor here to support SubchannelDataType classes
236 // that are not copyable.
237 std::vector<ManualConstructor<SubchannelDataType>> subchannels_;
238
239 // Is this list shutting down? This may be true due to the shutdown of the
240 // policy itself or because a newer update has arrived while this one hadn't
241 // finished processing.
242 bool shutting_down_ = false;
243 };
244
245 //
246 // implementation -- no user-servicable parts below
247 //
248
249 //
250 // SubchannelData::Watcher
251 //
252
253 template <typename SubchannelListType, typename SubchannelDataType>
254 void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)255 OnConnectivityStateChange(grpc_connectivity_state new_state,
256 absl::Status status) {
257 if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
258 gpr_log(
259 GPR_INFO,
260 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
261 " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
262 "status=%s, shutting_down=%d, pending_watcher=%p",
263 subchannel_list_->tracer(), subchannel_list_->policy(),
264 subchannel_list_.get(), subchannel_data_->Index(),
265 subchannel_list_->num_subchannels(),
266 subchannel_data_->subchannel_.get(),
267 (subchannel_data_->connectivity_state_.has_value()
268 ? ConnectivityStateName(*subchannel_data_->connectivity_state_)
269 : "N/A"),
270 ConnectivityStateName(new_state), status.ToString().c_str(),
271 subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_);
272 }
273 if (!subchannel_list_->shutting_down() &&
274 subchannel_data_->pending_watcher_ != nullptr) {
275 absl::optional<grpc_connectivity_state> old_state =
276 subchannel_data_->connectivity_state_;
277 subchannel_data_->connectivity_state_ = new_state;
278 subchannel_data_->connectivity_status_ = status;
279 // Call the subclass's ProcessConnectivityChangeLocked() method.
280 subchannel_data_->ProcessConnectivityChangeLocked(old_state, new_state);
281 }
282 }
283
284 //
285 // SubchannelData
286 //
287
288 template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData(SubchannelList<SubchannelListType,SubchannelDataType> * subchannel_list,const ServerAddress &,RefCountedPtr<SubchannelInterface> subchannel)289 SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
290 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
291 const ServerAddress& /*address*/,
292 RefCountedPtr<SubchannelInterface> subchannel)
293 : subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) {}
294
295 template <typename SubchannelListType, typename SubchannelDataType>
~SubchannelData()296 SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
297 GPR_ASSERT(subchannel_ == nullptr);
298 }
299
300 template <typename SubchannelListType, typename SubchannelDataType>
301 void SubchannelData<SubchannelListType, SubchannelDataType>::
UnrefSubchannelLocked(const char * reason)302 UnrefSubchannelLocked(const char* reason) {
303 if (subchannel_ != nullptr) {
304 if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
305 gpr_log(GPR_INFO,
306 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
307 " (subchannel %p): unreffing subchannel (%s)",
308 subchannel_list_->tracer(), subchannel_list_->policy(),
309 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
310 subchannel_.get(), reason);
311 }
312 subchannel_.reset();
313 }
314 }
315
316 template <typename SubchannelListType, typename SubchannelDataType>
317 void SubchannelData<SubchannelListType,
ResetBackoffLocked()318 SubchannelDataType>::ResetBackoffLocked() {
319 if (subchannel_ != nullptr) {
320 subchannel_->ResetBackoff();
321 }
322 }
323
324 template <typename SubchannelListType, typename SubchannelDataType>
325 void SubchannelData<SubchannelListType,
StartConnectivityWatchLocked()326 SubchannelDataType>::StartConnectivityWatchLocked() {
327 if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
328 gpr_log(
329 GPR_INFO,
330 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
331 " (subchannel %p): starting watch "
332 "(health_check_service_name=\"%s\")",
333 subchannel_list_->tracer(), subchannel_list_->policy(),
334 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
335 subchannel_.get(),
336 subchannel_list()->health_check_service_name_.value_or("N/A").c_str());
337 }
338 GPR_ASSERT(pending_watcher_ == nullptr);
339 auto watcher = std::make_unique<Watcher>(
340 this, subchannel_list()->WeakRef(DEBUG_LOCATION, "Watcher"));
341 pending_watcher_ = watcher.get();
342 if (subchannel_list()->health_check_service_name_.has_value()) {
343 subchannel_->AddDataWatcher(MakeHealthCheckWatcher(
344 subchannel_list_->work_serializer(),
345 *subchannel_list()->health_check_service_name_, std::move(watcher)));
346 } else {
347 subchannel_->WatchConnectivityState(std::move(watcher));
348 }
349 }
350
351 template <typename SubchannelListType, typename SubchannelDataType>
352 void SubchannelData<SubchannelListType, SubchannelDataType>::
CancelConnectivityWatchLocked(const char * reason)353 CancelConnectivityWatchLocked(const char* reason) {
354 if (pending_watcher_ != nullptr) {
355 if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
356 gpr_log(GPR_INFO,
357 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
358 " (subchannel %p): canceling connectivity watch (%s)",
359 subchannel_list_->tracer(), subchannel_list_->policy(),
360 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
361 subchannel_.get(), reason);
362 }
363 // No need to cancel if using health checking, because the data
364 // watcher will be destroyed automatically when the subchannel is.
365 if (!subchannel_list()->health_check_service_name_.has_value()) {
366 subchannel_->CancelConnectivityStateWatch(pending_watcher_);
367 }
368 pending_watcher_ = nullptr;
369 }
370 }
371
372 template <typename SubchannelListType, typename SubchannelDataType>
ShutdownLocked()373 void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
374 CancelConnectivityWatchLocked("shutdown");
375 UnrefSubchannelLocked("shutdown");
376 }
377
378 //
379 // SubchannelList
380 //
381
382 template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList(LoadBalancingPolicy * policy,const char * tracer,ServerAddressList addresses,LoadBalancingPolicy::ChannelControlHelper * helper,const ChannelArgs & args)383 SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
384 LoadBalancingPolicy* policy, const char* tracer,
385 ServerAddressList addresses,
386 LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args)
387 : DualRefCounted<SubchannelListType>(tracer),
388 policy_(policy),
389 tracer_(tracer) {
390 if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
391 health_check_service_name_ =
392 args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
393 }
394 if (GPR_UNLIKELY(tracer_ != nullptr)) {
395 gpr_log(GPR_INFO,
396 "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
397 tracer_, policy, this, addresses.size());
398 }
399 subchannels_.reserve(addresses.size());
400 // Create a subchannel for each address.
401 for (ServerAddress address : addresses) {
402 RefCountedPtr<SubchannelInterface> subchannel =
403 helper->CreateSubchannel(address, args);
404 if (subchannel == nullptr) {
405 // Subchannel could not be created.
406 if (GPR_UNLIKELY(tracer_ != nullptr)) {
407 gpr_log(GPR_INFO,
408 "[%s %p] could not create subchannel for address %s, ignoring",
409 tracer_, policy_, address.ToString().c_str());
410 }
411 continue;
412 }
413 if (GPR_UNLIKELY(tracer_ != nullptr)) {
414 gpr_log(GPR_INFO,
415 "[%s %p] subchannel list %p index %" PRIuPTR
416 ": Created subchannel %p for address %s",
417 tracer_, policy_, this, subchannels_.size(), subchannel.get(),
418 address.ToString().c_str());
419 }
420 subchannels_.emplace_back();
421 subchannels_.back().Init(this, std::move(address), std::move(subchannel));
422 }
423 }
424
425 template <typename SubchannelListType, typename SubchannelDataType>
~SubchannelList()426 SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
427 if (GPR_UNLIKELY(tracer_ != nullptr)) {
428 gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_, policy_,
429 this);
430 }
431 for (auto& sd : subchannels_) {
432 sd.Destroy();
433 }
434 }
435
436 template <typename SubchannelListType, typename SubchannelDataType>
437 void SubchannelList<SubchannelListType,
StartWatchingLocked()438 SubchannelDataType>::StartWatchingLocked() {
439 for (auto& sd : subchannels_) {
440 sd->StartConnectivityWatchLocked();
441 }
442 }
443
444 template <typename SubchannelListType, typename SubchannelDataType>
Orphan()445 void SubchannelList<SubchannelListType, SubchannelDataType>::Orphan() {
446 if (GPR_UNLIKELY(tracer_ != nullptr)) {
447 gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", tracer_,
448 policy_, this);
449 }
450 GPR_ASSERT(!shutting_down_);
451 shutting_down_ = true;
452 for (auto& sd : subchannels_) {
453 sd->ShutdownLocked();
454 }
455 }
456
457 template <typename SubchannelListType, typename SubchannelDataType>
458 void SubchannelList<SubchannelListType,
ResetBackoffLocked()459 SubchannelDataType>::ResetBackoffLocked() {
460 for (auto& sd : subchannels_) {
461 sd->ResetBackoffLocked();
462 }
463 }
464
465 template <typename SubchannelListType, typename SubchannelDataType>
466 bool SubchannelList<SubchannelListType,
AllSubchannelsSeenInitialState()467 SubchannelDataType>::AllSubchannelsSeenInitialState() {
468 for (size_t i = 0; i < num_subchannels(); ++i) {
469 if (!subchannel(i)->connectivity_state().has_value()) return false;
470 }
471 return true;
472 }
473
474 } // namespace grpc_core
475
476 #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
477