1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
18 #define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <inttypes.h>
23 #include <string.h>
24 
25 #include <memory>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/status/status.h"
31 #include "absl/types/optional.h"
32 
33 #include <grpc/grpc.h>
34 #include <grpc/impl/connectivity_state.h>
35 #include <grpc/support/log.h>
36 
37 #include "src/core/ext/filters/client_channel/client_channel_internal.h"
38 #include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/gprpp/debug_location.h"
41 #include "src/core/lib/gprpp/dual_ref_counted.h"
42 #include "src/core/lib/gprpp/manual_constructor.h"
43 #include "src/core/lib/gprpp/ref_counted_ptr.h"
44 #include "src/core/lib/gprpp/work_serializer.h"
45 #include "src/core/lib/iomgr/iomgr_fwd.h"
46 #include "src/core/lib/load_balancing/lb_policy.h"
47 #include "src/core/lib/load_balancing/subchannel_interface.h"
48 #include "src/core/lib/resolver/server_address.h"
49 #include "src/core/lib/transport/connectivity_state.h"
50 
51 // Code for maintaining a list of subchannels within an LB policy.
52 //
53 // To use this, callers must create their own subclasses, like so:
54 //
55 
56 // class MySubchannelList;  // Forward declaration.
57 
58 // class MySubchannelData
59 //   : public SubchannelData<MySubchannelList, MySubchannelData> {
60 // public:
61 // void ProcessConnectivityChangeLocked(
62 //     absl::optional<grpc_connectivity_state> old_state,
63 //     grpc_connectivity_state new_state) override {
64 //   // ...code to handle connectivity changes...
65 // }
66 // };
67 
68 // class MySubchannelList
69 //   : public SubchannelList<MySubchannelList, MySubchannelData> {
70 // };
71 
72 //
73 // All methods will be called from within the client_channel work serializer.
74 
75 namespace grpc_core {
76 
77 // Forward declaration.
78 template <typename SubchannelListType, typename SubchannelDataType>
79 class SubchannelList;
80 
81 // Stores data for a particular subchannel in a subchannel list.
82 // Callers must create a subclass that implements the
83 // ProcessConnectivityChangeLocked() method.
84 template <typename SubchannelListType, typename SubchannelDataType>
85 class SubchannelData {
86  public:
87   // Returns a pointer to the subchannel list containing this object.
subchannel_list()88   SubchannelListType* subchannel_list() const {
89     return static_cast<SubchannelListType*>(subchannel_list_);
90   }
91 
92   // Returns the index into the subchannel list of this object.
Index()93   size_t Index() const {
94     return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
95                                subchannel_list_->subchannel(0));
96   }
97 
98   // Returns a pointer to the subchannel.
subchannel()99   SubchannelInterface* subchannel() const { return subchannel_.get(); }
100 
101   // Returns the cached connectivity state, if any.
connectivity_state()102   absl::optional<grpc_connectivity_state> connectivity_state() {
103     return connectivity_state_;
104   }
connectivity_status()105   absl::Status connectivity_status() { return connectivity_status_; }
106 
107   // Resets the connection backoff.
108   void ResetBackoffLocked();
109 
110   // Cancels any pending connectivity watch and unrefs the subchannel.
111   void ShutdownLocked();
112 
113  protected:
114   SubchannelData(
115       SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
116       const ServerAddress& address,
117       RefCountedPtr<SubchannelInterface> subchannel);
118 
119   virtual ~SubchannelData();
120 
121   // This method will be invoked once soon after instantiation to report
122   // the current connectivity state, and it will then be invoked again
123   // whenever the connectivity state changes.
124   virtual void ProcessConnectivityChangeLocked(
125       absl::optional<grpc_connectivity_state> old_state,
126       grpc_connectivity_state new_state) = 0;
127 
128  private:
129   // For accessing StartConnectivityWatchLocked().
130   friend class SubchannelList<SubchannelListType, SubchannelDataType>;
131 
132   // Watcher for subchannel connectivity state.
133   class Watcher
134       : public SubchannelInterface::ConnectivityStateWatcherInterface {
135    public:
Watcher(SubchannelData<SubchannelListType,SubchannelDataType> * subchannel_data,WeakRefCountedPtr<SubchannelListType> subchannel_list)136     Watcher(
137         SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
138         WeakRefCountedPtr<SubchannelListType> subchannel_list)
139         : subchannel_data_(subchannel_data),
140           subchannel_list_(std::move(subchannel_list)) {}
141 
~Watcher()142     ~Watcher() override {
143       subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
144     }
145 
146     void OnConnectivityStateChange(grpc_connectivity_state new_state,
147                                    absl::Status status) override;
148 
interested_parties()149     grpc_pollset_set* interested_parties() override {
150       return subchannel_list_->policy()->interested_parties();
151     }
152 
153    private:
154     SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
155     WeakRefCountedPtr<SubchannelListType> subchannel_list_;
156   };
157 
158   // Starts watching the connectivity state of the subchannel.
159   // ProcessConnectivityChangeLocked() will be called whenever the
160   // connectivity state changes.
161   void StartConnectivityWatchLocked();
162 
163   // Cancels watching the connectivity state of the subchannel.
164   void CancelConnectivityWatchLocked(const char* reason);
165 
166   // Unrefs the subchannel.
167   void UnrefSubchannelLocked(const char* reason);
168 
169   // Backpointer to owning subchannel list.  Not owned.
170   SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
171   // The subchannel.
172   RefCountedPtr<SubchannelInterface> subchannel_;
173   // Will be non-null when the subchannel's state is being watched.
174   SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
175       nullptr;
176   // Data updated by the watcher.
177   absl::optional<grpc_connectivity_state> connectivity_state_;
178   absl::Status connectivity_status_;
179 };
180 
181 // A list of subchannels.
182 template <typename SubchannelListType, typename SubchannelDataType>
183 class SubchannelList : public DualRefCounted<SubchannelListType> {
184  public:
185   // Starts watching the connectivity state of all subchannels.
186   // Must be called immediately after instantiation.
187   void StartWatchingLocked();
188 
189   // The number of subchannels in the list.
num_subchannels()190   size_t num_subchannels() const { return subchannels_.size(); }
191 
192   // The data for the subchannel at a particular index.
subchannel(size_t index)193   SubchannelDataType* subchannel(size_t index) {
194     return subchannels_[index].get();
195   }
196 
197   // Returns true if the subchannel list is shutting down.
shutting_down()198   bool shutting_down() const { return shutting_down_; }
199 
200   // Accessors.
policy()201   LoadBalancingPolicy* policy() const { return policy_; }
tracer()202   const char* tracer() const { return tracer_; }
203 
204   // Resets connection backoff of all subchannels.
205   void ResetBackoffLocked();
206 
207   // Returns true if all subchannels have seen their initial
208   // connectivity state notifications.
209   bool AllSubchannelsSeenInitialState();
210 
211   void Orphan() override;
212 
213  protected:
214   SubchannelList(LoadBalancingPolicy* policy, const char* tracer,
215                  ServerAddressList addresses,
216                  LoadBalancingPolicy::ChannelControlHelper* helper,
217                  const ChannelArgs& args);
218 
219   virtual ~SubchannelList();
220 
221  private:
222   // For accessing Ref() and Unref().
223   friend class SubchannelData<SubchannelListType, SubchannelDataType>;
224 
225   virtual std::shared_ptr<WorkSerializer> work_serializer() const = 0;
226 
227   // Backpointer to owning policy.
228   LoadBalancingPolicy* policy_;
229 
230   const char* tracer_;
231 
232   absl::optional<std::string> health_check_service_name_;
233 
234   // The list of subchannels.
235   // We use ManualConstructor here to support SubchannelDataType classes
236   // that are not copyable.
237   std::vector<ManualConstructor<SubchannelDataType>> subchannels_;
238 
239   // Is this list shutting down? This may be true due to the shutdown of the
240   // policy itself or because a newer update has arrived while this one hadn't
241   // finished processing.
242   bool shutting_down_ = false;
243 };
244 
245 //
246 // implementation -- no user-servicable parts below
247 //
248 
249 //
250 // SubchannelData::Watcher
251 //
252 
253 template <typename SubchannelListType, typename SubchannelDataType>
254 void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange(grpc_connectivity_state new_state,absl::Status status)255     OnConnectivityStateChange(grpc_connectivity_state new_state,
256                               absl::Status status) {
257   if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
258     gpr_log(
259         GPR_INFO,
260         "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
261         " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, "
262         "status=%s, shutting_down=%d, pending_watcher=%p",
263         subchannel_list_->tracer(), subchannel_list_->policy(),
264         subchannel_list_.get(), subchannel_data_->Index(),
265         subchannel_list_->num_subchannels(),
266         subchannel_data_->subchannel_.get(),
267         (subchannel_data_->connectivity_state_.has_value()
268              ? ConnectivityStateName(*subchannel_data_->connectivity_state_)
269              : "N/A"),
270         ConnectivityStateName(new_state), status.ToString().c_str(),
271         subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_);
272   }
273   if (!subchannel_list_->shutting_down() &&
274       subchannel_data_->pending_watcher_ != nullptr) {
275     absl::optional<grpc_connectivity_state> old_state =
276         subchannel_data_->connectivity_state_;
277     subchannel_data_->connectivity_state_ = new_state;
278     subchannel_data_->connectivity_status_ = status;
279     // Call the subclass's ProcessConnectivityChangeLocked() method.
280     subchannel_data_->ProcessConnectivityChangeLocked(old_state, new_state);
281   }
282 }
283 
284 //
285 // SubchannelData
286 //
287 
288 template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData(SubchannelList<SubchannelListType,SubchannelDataType> * subchannel_list,const ServerAddress &,RefCountedPtr<SubchannelInterface> subchannel)289 SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
290     SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
291     const ServerAddress& /*address*/,
292     RefCountedPtr<SubchannelInterface> subchannel)
293     : subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) {}
294 
295 template <typename SubchannelListType, typename SubchannelDataType>
~SubchannelData()296 SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
297   GPR_ASSERT(subchannel_ == nullptr);
298 }
299 
300 template <typename SubchannelListType, typename SubchannelDataType>
301 void SubchannelData<SubchannelListType, SubchannelDataType>::
UnrefSubchannelLocked(const char * reason)302     UnrefSubchannelLocked(const char* reason) {
303   if (subchannel_ != nullptr) {
304     if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
305       gpr_log(GPR_INFO,
306               "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
307               " (subchannel %p): unreffing subchannel (%s)",
308               subchannel_list_->tracer(), subchannel_list_->policy(),
309               subchannel_list_, Index(), subchannel_list_->num_subchannels(),
310               subchannel_.get(), reason);
311     }
312     subchannel_.reset();
313   }
314 }
315 
316 template <typename SubchannelListType, typename SubchannelDataType>
317 void SubchannelData<SubchannelListType,
ResetBackoffLocked()318                     SubchannelDataType>::ResetBackoffLocked() {
319   if (subchannel_ != nullptr) {
320     subchannel_->ResetBackoff();
321   }
322 }
323 
324 template <typename SubchannelListType, typename SubchannelDataType>
325 void SubchannelData<SubchannelListType,
StartConnectivityWatchLocked()326                     SubchannelDataType>::StartConnectivityWatchLocked() {
327   if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
328     gpr_log(
329         GPR_INFO,
330         "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
331         " (subchannel %p): starting watch "
332         "(health_check_service_name=\"%s\")",
333         subchannel_list_->tracer(), subchannel_list_->policy(),
334         subchannel_list_, Index(), subchannel_list_->num_subchannels(),
335         subchannel_.get(),
336         subchannel_list()->health_check_service_name_.value_or("N/A").c_str());
337   }
338   GPR_ASSERT(pending_watcher_ == nullptr);
339   auto watcher = std::make_unique<Watcher>(
340       this, subchannel_list()->WeakRef(DEBUG_LOCATION, "Watcher"));
341   pending_watcher_ = watcher.get();
342   if (subchannel_list()->health_check_service_name_.has_value()) {
343     subchannel_->AddDataWatcher(MakeHealthCheckWatcher(
344         subchannel_list_->work_serializer(),
345         *subchannel_list()->health_check_service_name_, std::move(watcher)));
346   } else {
347     subchannel_->WatchConnectivityState(std::move(watcher));
348   }
349 }
350 
351 template <typename SubchannelListType, typename SubchannelDataType>
352 void SubchannelData<SubchannelListType, SubchannelDataType>::
CancelConnectivityWatchLocked(const char * reason)353     CancelConnectivityWatchLocked(const char* reason) {
354   if (pending_watcher_ != nullptr) {
355     if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) {
356       gpr_log(GPR_INFO,
357               "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
358               " (subchannel %p): canceling connectivity watch (%s)",
359               subchannel_list_->tracer(), subchannel_list_->policy(),
360               subchannel_list_, Index(), subchannel_list_->num_subchannels(),
361               subchannel_.get(), reason);
362     }
363     // No need to cancel if using health checking, because the data
364     // watcher will be destroyed automatically when the subchannel is.
365     if (!subchannel_list()->health_check_service_name_.has_value()) {
366       subchannel_->CancelConnectivityStateWatch(pending_watcher_);
367     }
368     pending_watcher_ = nullptr;
369   }
370 }
371 
372 template <typename SubchannelListType, typename SubchannelDataType>
ShutdownLocked()373 void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
374   CancelConnectivityWatchLocked("shutdown");
375   UnrefSubchannelLocked("shutdown");
376 }
377 
378 //
379 // SubchannelList
380 //
381 
382 template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList(LoadBalancingPolicy * policy,const char * tracer,ServerAddressList addresses,LoadBalancingPolicy::ChannelControlHelper * helper,const ChannelArgs & args)383 SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
384     LoadBalancingPolicy* policy, const char* tracer,
385     ServerAddressList addresses,
386     LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args)
387     : DualRefCounted<SubchannelListType>(tracer),
388       policy_(policy),
389       tracer_(tracer) {
390   if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
391     health_check_service_name_ =
392         args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
393   }
394   if (GPR_UNLIKELY(tracer_ != nullptr)) {
395     gpr_log(GPR_INFO,
396             "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
397             tracer_, policy, this, addresses.size());
398   }
399   subchannels_.reserve(addresses.size());
400   // Create a subchannel for each address.
401   for (ServerAddress address : addresses) {
402     RefCountedPtr<SubchannelInterface> subchannel =
403         helper->CreateSubchannel(address, args);
404     if (subchannel == nullptr) {
405       // Subchannel could not be created.
406       if (GPR_UNLIKELY(tracer_ != nullptr)) {
407         gpr_log(GPR_INFO,
408                 "[%s %p] could not create subchannel for address %s, ignoring",
409                 tracer_, policy_, address.ToString().c_str());
410       }
411       continue;
412     }
413     if (GPR_UNLIKELY(tracer_ != nullptr)) {
414       gpr_log(GPR_INFO,
415               "[%s %p] subchannel list %p index %" PRIuPTR
416               ": Created subchannel %p for address %s",
417               tracer_, policy_, this, subchannels_.size(), subchannel.get(),
418               address.ToString().c_str());
419     }
420     subchannels_.emplace_back();
421     subchannels_.back().Init(this, std::move(address), std::move(subchannel));
422   }
423 }
424 
425 template <typename SubchannelListType, typename SubchannelDataType>
~SubchannelList()426 SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
427   if (GPR_UNLIKELY(tracer_ != nullptr)) {
428     gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_, policy_,
429             this);
430   }
431   for (auto& sd : subchannels_) {
432     sd.Destroy();
433   }
434 }
435 
436 template <typename SubchannelListType, typename SubchannelDataType>
437 void SubchannelList<SubchannelListType,
StartWatchingLocked()438                     SubchannelDataType>::StartWatchingLocked() {
439   for (auto& sd : subchannels_) {
440     sd->StartConnectivityWatchLocked();
441   }
442 }
443 
444 template <typename SubchannelListType, typename SubchannelDataType>
Orphan()445 void SubchannelList<SubchannelListType, SubchannelDataType>::Orphan() {
446   if (GPR_UNLIKELY(tracer_ != nullptr)) {
447     gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", tracer_,
448             policy_, this);
449   }
450   GPR_ASSERT(!shutting_down_);
451   shutting_down_ = true;
452   for (auto& sd : subchannels_) {
453     sd->ShutdownLocked();
454   }
455 }
456 
457 template <typename SubchannelListType, typename SubchannelDataType>
458 void SubchannelList<SubchannelListType,
ResetBackoffLocked()459                     SubchannelDataType>::ResetBackoffLocked() {
460   for (auto& sd : subchannels_) {
461     sd->ResetBackoffLocked();
462   }
463 }
464 
465 template <typename SubchannelListType, typename SubchannelDataType>
466 bool SubchannelList<SubchannelListType,
AllSubchannelsSeenInitialState()467                     SubchannelDataType>::AllSubchannelsSeenInitialState() {
468   for (size_t i = 0; i < num_subchannels(); ++i) {
469     if (!subchannel(i)->connectivity_state().has_value()) return false;
470   }
471   return true;
472 }
473 
474 }  // namespace grpc_core
475 
476 #endif  // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
477