xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/child_policy_handler.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/child_policy_handler.h"
20 
21 #include <memory>
22 #include <string>
23 
24 #include "absl/status/status.h"
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/string_view.h"
27 
28 #include <grpc/impl/connectivity_state.h>
29 #include <grpc/support/log.h>
30 
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/config/core_configuration.h"
33 #include "src/core/lib/gprpp/debug_location.h"
34 #include "src/core/lib/iomgr/pollset_set.h"
35 #include "src/core/lib/iomgr/resolved_address.h"
36 #include "src/core/load_balancing/delegating_helper.h"
37 #include "src/core/load_balancing/lb_policy_registry.h"
38 #include "src/core/load_balancing/subchannel_interface.h"
39 #include "src/core/lib/transport/connectivity_state.h"
40 
41 namespace grpc_core {
42 
43 //
44 // ChildPolicyHandler::Helper
45 //
46 
47 class ChildPolicyHandler::Helper final
48     : public LoadBalancingPolicy::ParentOwningDelegatingChannelControlHelper<
49           ChildPolicyHandler> {
50  public:
Helper(RefCountedPtr<ChildPolicyHandler> parent)51   explicit Helper(RefCountedPtr<ChildPolicyHandler> parent)
52       : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
53 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)54   RefCountedPtr<SubchannelInterface> CreateSubchannel(
55       const grpc_resolved_address& address, const ChannelArgs& per_address_args,
56       const ChannelArgs& args) override {
57     if (parent()->shutting_down_) return nullptr;
58     if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
59     return parent()->channel_control_helper()->CreateSubchannel(
60         address, per_address_args, args);
61   }
62 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)63   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
64                    RefCountedPtr<SubchannelPicker> picker) override {
65     if (parent()->shutting_down_) return;
66     // If this request is from the pending child policy, ignore it until
67     // it reports something other than CONNECTING, at which point we swap it
68     // into place.
69     if (CalledByPendingChild()) {
70       if (GRPC_TRACE_FLAG_ENABLED(*(parent()->tracer_))) {
71         gpr_log(GPR_INFO,
72                 "[child_policy_handler %p] helper %p: pending child policy %p "
73                 "reports state=%s (%s)",
74                 parent(), this, child_, ConnectivityStateName(state),
75                 status.ToString().c_str());
76       }
77       if (state == GRPC_CHANNEL_CONNECTING) return;
78       grpc_pollset_set_del_pollset_set(
79           parent()->child_policy_->interested_parties(),
80           parent()->interested_parties());
81       parent()->child_policy_ = std::move(parent()->pending_child_policy_);
82     } else if (!CalledByCurrentChild()) {
83       // This request is from an outdated child, so ignore it.
84       return;
85     }
86     parent()->channel_control_helper()->UpdateState(state, status,
87                                                     std::move(picker));
88   }
89 
RequestReresolution()90   void RequestReresolution() override {
91     if (parent()->shutting_down_) return;
92     // Only forward re-resolution requests from the most recent child,
93     // since that's the one that will be receiving any update we receive
94     // from the resolver.
95     const LoadBalancingPolicy* latest_child_policy =
96         parent()->pending_child_policy_ != nullptr
97             ? parent()->pending_child_policy_.get()
98             : parent()->child_policy_.get();
99     if (child_ != latest_child_policy) return;
100     if (GRPC_TRACE_FLAG_ENABLED(*(parent()->tracer_))) {
101       gpr_log(GPR_INFO, "[child_policy_handler %p] requesting re-resolution",
102               parent());
103     }
104     parent()->channel_control_helper()->RequestReresolution();
105   }
106 
AddTraceEvent(TraceSeverity severity,absl::string_view message)107   void AddTraceEvent(TraceSeverity severity,
108                      absl::string_view message) override {
109     if (parent()->shutting_down_) return;
110     if (!CalledByPendingChild() && !CalledByCurrentChild()) return;
111     parent()->channel_control_helper()->AddTraceEvent(severity, message);
112   }
113 
set_child(LoadBalancingPolicy * child)114   void set_child(LoadBalancingPolicy* child) { child_ = child; }
115 
116  private:
CalledByPendingChild() const117   bool CalledByPendingChild() const {
118     GPR_ASSERT(child_ != nullptr);
119     return child_ == parent()->pending_child_policy_.get();
120   }
121 
CalledByCurrentChild() const122   bool CalledByCurrentChild() const {
123     GPR_ASSERT(child_ != nullptr);
124     return child_ == parent()->child_policy_.get();
125   };
126 
127   LoadBalancingPolicy* child_ = nullptr;
128 };
129 
130 //
131 // ChildPolicyHandler
132 //
133 
ShutdownLocked()134 void ChildPolicyHandler::ShutdownLocked() {
135   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
136     gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down", this);
137   }
138   shutting_down_ = true;
139   if (child_policy_ != nullptr) {
140     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
141       gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down lb_policy %p",
142               this, child_policy_.get());
143     }
144     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
145                                      interested_parties());
146     child_policy_.reset();
147   }
148   if (pending_child_policy_ != nullptr) {
149     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
150       gpr_log(GPR_INFO,
151               "[child_policy_handler %p] shutting down pending lb_policy %p",
152               this, pending_child_policy_.get());
153     }
154     grpc_pollset_set_del_pollset_set(
155         pending_child_policy_->interested_parties(), interested_parties());
156     pending_child_policy_.reset();
157   }
158 }
159 
UpdateLocked(UpdateArgs args)160 absl::Status ChildPolicyHandler::UpdateLocked(UpdateArgs args) {
161   // If the child policy name changes, we need to create a new child
162   // policy.  When this happens, we leave child_policy_ as-is and store
163   // the new child policy in pending_child_policy_.  Once the new child
164   // policy transitions into state READY, we swap it into child_policy_,
165   // replacing the original child policy.  So pending_child_policy_ is
166   // non-null only between when we apply an update that changes the child
167   // policy name and when the new child reports state READY.
168   //
169   // Updates can arrive at any point during this transition.  We always
170   // apply updates relative to the most recently created child policy,
171   // even if the most recent one is still in pending_child_policy_.  This
172   // is true both when applying the updates to an existing child policy
173   // and when determining whether we need to create a new policy.
174   //
175   // As a result of this, there are several cases to consider here:
176   //
177   // 1. We have no existing child policy (i.e., this is the first update
178   //    we receive after being created; in this case, both child_policy_
179   //    and pending_child_policy_ are null).  In this case, we create a
180   //    new child policy and store it in child_policy_.
181   //
182   // 2. We have an existing child policy and have no pending child policy
183   //    from a previous update (i.e., either there has not been a
184   //    previous update that changed the policy name, or we have already
185   //    finished swapping in the new policy; in this case, child_policy_
186   //    is non-null but pending_child_policy_ is null).  In this case:
187   //    a. If going from the current config to the new config does not
188   //       require a new policy, then we update the existing child policy.
189   //    b. If going from the current config to the new config does require a
190   //       new policy, we create a new policy.  The policy will be stored in
191   //       pending_child_policy_ and will later be swapped into
192   //       child_policy_ by the helper when the new child transitions
193   //       into state READY.
194   //
195   // 3. We have an existing child policy and have a pending child policy
196   //    from a previous update (i.e., a previous update set
197   //    pending_child_policy_ as per case 2b above and that policy has
198   //    not yet transitioned into state READY and been swapped into
199   //    child_policy_; in this case, both child_policy_ and
200   //    pending_child_policy_ are non-null).  In this case:
201   //    a. If going from the current config to the new config does not
202   //       require a new policy, then we update the existing pending
203   //       child policy.
204   //    b. If going from the current config to the new config does require a
205   //       new child policy, then we create a new policy.  The new
206   //       policy is stored in pending_child_policy_ (replacing the one
207   //       that was there before, which will be immediately shut down)
208   //       and will later be swapped into child_policy_ by the helper
209   //       when the new child transitions into state READY.
210   const bool create_policy =
211       // case 1
212       child_policy_ == nullptr ||
213       // cases 2b and 3b
214       ConfigChangeRequiresNewPolicyInstance(current_config_.get(),
215                                             args.config.get());
216   current_config_ = args.config;
217   LoadBalancingPolicy* policy_to_update = nullptr;
218   if (create_policy) {
219     // Cases 1, 2b, and 3b: create a new child policy.
220     // If child_policy_ is null, we set it (case 1), else we set
221     // pending_child_policy_ (cases 2b and 3b).
222     // TODO(roth): In cases 2b and 3b, we should start a timer here, so
223     // that there's an upper bound on the amount of time it takes us to
224     // switch to the new policy, even if the new policy stays in
225     // CONNECTING for a very long period of time.
226     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
227       gpr_log(GPR_INFO,
228               "[child_policy_handler %p] creating new %schild policy %s", this,
229               child_policy_ == nullptr ? "" : "pending ",
230               std::string(args.config->name()).c_str());
231     }
232     auto& lb_policy =
233         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
234     lb_policy = CreateChildPolicy(args.config->name(), args.args);
235     policy_to_update = lb_policy.get();
236   } else {
237     // Cases 2a and 3a: update an existing policy.
238     // If we have a pending child policy, send the update to the pending
239     // policy (case 3a), else send it to the current policy (case 2a).
240     policy_to_update = pending_child_policy_ != nullptr
241                            ? pending_child_policy_.get()
242                            : child_policy_.get();
243   }
244   GPR_ASSERT(policy_to_update != nullptr);
245   // Update the policy.
246   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
247     gpr_log(GPR_INFO, "[child_policy_handler %p] updating %schild policy %p",
248             this,
249             policy_to_update == pending_child_policy_.get() ? "pending " : "",
250             policy_to_update);
251   }
252   return policy_to_update->UpdateLocked(std::move(args));
253 }
254 
ExitIdleLocked()255 void ChildPolicyHandler::ExitIdleLocked() {
256   if (child_policy_ != nullptr) {
257     child_policy_->ExitIdleLocked();
258     if (pending_child_policy_ != nullptr) {
259       pending_child_policy_->ExitIdleLocked();
260     }
261   }
262 }
263 
ResetBackoffLocked()264 void ChildPolicyHandler::ResetBackoffLocked() {
265   if (child_policy_ != nullptr) {
266     child_policy_->ResetBackoffLocked();
267     if (pending_child_policy_ != nullptr) {
268       pending_child_policy_->ResetBackoffLocked();
269     }
270   }
271 }
272 
CreateChildPolicy(absl::string_view child_policy_name,const ChannelArgs & args)273 OrphanablePtr<LoadBalancingPolicy> ChildPolicyHandler::CreateChildPolicy(
274     absl::string_view child_policy_name, const ChannelArgs& args) {
275   Helper* helper =
276       new Helper(RefAsSubclass<ChildPolicyHandler>(DEBUG_LOCATION, "Helper"));
277   LoadBalancingPolicy::Args lb_policy_args;
278   lb_policy_args.work_serializer = work_serializer();
279   lb_policy_args.channel_control_helper =
280       std::unique_ptr<ChannelControlHelper>(helper);
281   lb_policy_args.args = args;
282   OrphanablePtr<LoadBalancingPolicy> lb_policy =
283       CreateLoadBalancingPolicy(child_policy_name, std::move(lb_policy_args));
284   if (GPR_UNLIKELY(lb_policy == nullptr)) {
285     gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
286             std::string(child_policy_name).c_str());
287     return nullptr;
288   }
289   helper->set_child(lb_policy.get());
290   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
291     gpr_log(GPR_INFO,
292             "[child_policy_handler %p] created new LB policy \"%s\" (%p)", this,
293             std::string(child_policy_name).c_str(), lb_policy.get());
294   }
295   channel_control_helper()->AddTraceEvent(
296       ChannelControlHelper::TRACE_INFO,
297       absl::StrCat("Created new LB policy \"", child_policy_name, "\""));
298   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
299                                    interested_parties());
300   return lb_policy;
301 }
302 
ConfigChangeRequiresNewPolicyInstance(LoadBalancingPolicy::Config * old_config,LoadBalancingPolicy::Config * new_config) const303 bool ChildPolicyHandler::ConfigChangeRequiresNewPolicyInstance(
304     LoadBalancingPolicy::Config* old_config,
305     LoadBalancingPolicy::Config* new_config) const {
306   return old_config->name() != new_config->name();
307 }
308 
309 OrphanablePtr<LoadBalancingPolicy>
CreateLoadBalancingPolicy(absl::string_view name,LoadBalancingPolicy::Args args) const310 ChildPolicyHandler::CreateLoadBalancingPolicy(
311     absl::string_view name, LoadBalancingPolicy::Args args) const {
312   return CoreConfiguration::Get()
313       .lb_policy_registry()
314       .CreateLoadBalancingPolicy(name, std::move(args));
315 }
316 
317 }  // namespace grpc_core
318