xref: /aosp_15_r20/external/grpc-grpc/src/core/load_balancing/xds/xds_override_host.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/load_balancing/xds/xds_override_host.h"
20 
21 #include <stddef.h>
22 
23 #include <algorithm>
24 #include <functional>
25 #include <map>
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <tuple>
30 #include <type_traits>
31 #include <utility>
32 #include <vector>
33 
34 #include "absl/base/thread_annotations.h"
35 #include "absl/functional/function_ref.h"
36 #include "absl/status/status.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/str_cat.h"
39 #include "absl/strings/str_join.h"
40 #include "absl/strings/str_split.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/types/optional.h"
43 #include "absl/types/span.h"
44 #include "absl/types/variant.h"
45 
46 #include <grpc/event_engine/event_engine.h>
47 #include <grpc/impl/connectivity_state.h>
48 #include <grpc/support/log.h>
49 
50 #include "src/core/client_channel/client_channel_internal.h"
51 #include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
52 #include "src/core/ext/xds/xds_health_status.h"
53 #include "src/core/lib/address_utils/parse_address.h"
54 #include "src/core/lib/address_utils/sockaddr_utils.h"
55 #include "src/core/lib/channel/channel_args.h"
56 #include "src/core/lib/config/core_configuration.h"
57 #include "src/core/lib/debug/trace.h"
58 #include "src/core/lib/experiments/experiments.h"
59 #include "src/core/lib/gprpp/debug_location.h"
60 #include "src/core/lib/gprpp/match.h"
61 #include "src/core/lib/gprpp/orphanable.h"
62 #include "src/core/lib/gprpp/ref_counted_ptr.h"
63 #include "src/core/lib/gprpp/ref_counted_string.h"
64 #include "src/core/lib/gprpp/sync.h"
65 #include "src/core/lib/gprpp/validation_errors.h"
66 #include "src/core/lib/gprpp/work_serializer.h"
67 #include "src/core/lib/iomgr/closure.h"
68 #include "src/core/lib/iomgr/error.h"
69 #include "src/core/lib/iomgr/exec_ctx.h"
70 #include "src/core/lib/iomgr/iomgr_fwd.h"
71 #include "src/core/lib/iomgr/pollset_set.h"
72 #include "src/core/lib/iomgr/resolved_address.h"
73 #include "src/core/lib/json/json.h"
74 #include "src/core/lib/json/json_args.h"
75 #include "src/core/lib/json/json_object_loader.h"
76 #include "src/core/lib/transport/connectivity_state.h"
77 #include "src/core/load_balancing/child_policy_handler.h"
78 #include "src/core/load_balancing/delegating_helper.h"
79 #include "src/core/load_balancing/lb_policy.h"
80 #include "src/core/load_balancing/lb_policy_factory.h"
81 #include "src/core/load_balancing/lb_policy_registry.h"
82 #include "src/core/load_balancing/subchannel_interface.h"
83 #include "src/core/resolver/endpoint_addresses.h"
84 #include "src/core/resolver/xds/xds_dependency_manager.h"
85 
86 namespace grpc_core {
87 
88 using ::grpc_event_engine::experimental::EventEngine;
89 
90 TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb");
91 
92 namespace {
93 template <typename Value>
94 struct PtrLessThan {
95   using is_transparent = void;
96 
operator ()grpc_core::__anonb917cf940111::PtrLessThan97   bool operator()(const std::unique_ptr<Value>& v1,
98                   const std::unique_ptr<Value>& v2) const {
99     return v1 < v2;
100   }
operator ()grpc_core::__anonb917cf940111::PtrLessThan101   bool operator()(const Value* v1, const Value* v2) const { return v1 < v2; }
operator ()grpc_core::__anonb917cf940111::PtrLessThan102   bool operator()(const Value* v1, const std::unique_ptr<Value>& v2) const {
103     return v1 < v2.get();
104   }
operator ()grpc_core::__anonb917cf940111::PtrLessThan105   bool operator()(const std::unique_ptr<Value>& v1, const Value* v2) const {
106     return v1.get() < v2;
107   }
108 };
109 
110 //
111 // xds_override_host LB policy
112 //
113 
114 class XdsOverrideHostLb final : public LoadBalancingPolicy {
115  public:
116   explicit XdsOverrideHostLb(Args args);
117 
name() const118   absl::string_view name() const override {
119     return XdsOverrideHostLbConfig::Name();
120   }
121 
122   absl::Status UpdateLocked(UpdateArgs args) override;
123   void ExitIdleLocked() override;
124   void ResetBackoffLocked() override;
125 
126  private:
127   class SubchannelEntry;
128 
129   class SubchannelWrapper final : public DelegatingSubchannel {
130    public:
131     SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
132                       RefCountedPtr<XdsOverrideHostLb> policy);
133 
134     // Called immediately after construction.  We use two-phase initialization
135     // to avoid doing an allocation while holding the lock.
set_subchannel_entry(RefCountedPtr<SubchannelEntry> subchannel_entry)136     void set_subchannel_entry(RefCountedPtr<SubchannelEntry> subchannel_entry) {
137       subchannel_entry_ = std::move(subchannel_entry);
138     }
139 
140     void WatchConnectivityState(
141         std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override;
142 
143     void CancelConnectivityStateWatch(
144         ConnectivityStateWatcherInterface* watcher) override;
145 
address_list() const146     RefCountedStringValue address_list() const
147         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
148       return subchannel_entry_->address_list();
149     }
150 
set_last_used_time()151     void set_last_used_time()
152         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
153       subchannel_entry_->set_last_used_time();
154     }
155 
policy() const156     XdsOverrideHostLb* policy() const { return policy_.get(); }
157 
Clone() const158     RefCountedPtr<SubchannelWrapper> Clone() const {
159       auto subchannel =
160           MakeRefCounted<SubchannelWrapper>(wrapped_subchannel(), policy_);
161       subchannel->set_subchannel_entry(subchannel_entry_);
162       return subchannel;
163     }
164 
165    private:
166     class ConnectivityStateWatcher final
167         : public ConnectivityStateWatcherInterface {
168      public:
ConnectivityStateWatcher(WeakRefCountedPtr<SubchannelWrapper> subchannel)169       explicit ConnectivityStateWatcher(
170           WeakRefCountedPtr<SubchannelWrapper> subchannel)
171           : subchannel_(std::move(subchannel)) {}
172 
OnConnectivityStateChange(grpc_connectivity_state state,absl::Status status)173       void OnConnectivityStateChange(grpc_connectivity_state state,
174                                      absl::Status status) override {
175         subchannel_->UpdateConnectivityState(state, status);
176       }
177 
interested_parties()178       grpc_pollset_set* interested_parties() override {
179         return subchannel_->policy()->interested_parties();
180       }
181 
182      private:
183       WeakRefCountedPtr<SubchannelWrapper> subchannel_;
184     };
185 
186     void Orphaned() override;
187     void UpdateConnectivityState(grpc_connectivity_state state,
188                                  absl::Status status);
189 
190     RefCountedPtr<XdsOverrideHostLb> policy_;
191     RefCountedPtr<SubchannelEntry> subchannel_entry_;
192     ConnectivityStateWatcher* watcher_;
193     std::set<std::unique_ptr<ConnectivityStateWatcherInterface>,
194              PtrLessThan<ConnectivityStateWatcherInterface>>
195         watchers_;
196   };
197 
198   // An entry in the subchannel map.
199   //
200   // The entry may hold either an owned (RefCountedPtr<>) or unowned
201   // (raw pointer) SubchannelWrapper, but not both.  It will be unowned
202   // in the case where the SubchannelWrapper is owned by the child policy.
203   // It will be owned in the case where the child policy has not created a
204   // subchannel but we have RPCs whose cookies point to that address.
205   //
206   // Note that when a SubchannelWrapper is orphaned, it will try to
207   // acquire the lock to remove itself from the entry.  This means that
208   // whenever we need to remove an owned subchannel from an entry, if we
209   // released our ref to the SubchannelWrapper immediately, we would
210   // cause a deadlock, since our caller is already holding the lock.  To
211   // avoid that, any method that may result in releasing a ref to the
212   // SubchannelWrapper will instead return that ref to the caller, who is
213   // responsible for releasing the ref after releasing the lock.
214   class SubchannelEntry final : public RefCounted<SubchannelEntry> {
215    public:
HasOwnedSubchannel() const216     bool HasOwnedSubchannel() const
217         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
218       auto* sc = absl::get_if<RefCountedPtr<SubchannelWrapper>>(&subchannel_);
219       return sc != nullptr && *sc != nullptr;
220     }
221 
222     // Sets the unowned subchannel.  If the entry previously had an
223     // owned subchannel, returns the ref to it.
224     RefCountedPtr<SubchannelWrapper> SetUnownedSubchannel(
225         SubchannelWrapper* subchannel)
226         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
227 
228     // Sets the owned subchannel.  Must not be called if the entry
229     // already has an owned subchannel.
SetOwnedSubchannel(RefCountedPtr<SubchannelWrapper> subchannel)230     void SetOwnedSubchannel(RefCountedPtr<SubchannelWrapper> subchannel)
231         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
232       GPR_DEBUG_ASSERT(!HasOwnedSubchannel());
233       subchannel_ = std::move(subchannel);
234     }
235 
236     // Returns a pointer to the subchannel, regardless of whether it's
237     // owned or not.
238     SubchannelWrapper* GetSubchannel() const
239         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
240 
241     // Returns a ref to the subchannel, regardless of whether it's owned
242     // or not.  Returns null if there is no subchannel or if the
243     // subchannel's ref count is 0.
244     RefCountedPtr<SubchannelWrapper> GetSubchannelRef() const
245         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
246 
247     // If the entry has an owned subchannel, moves it out of the entry
248     // and returns it.
249     RefCountedPtr<SubchannelWrapper> TakeOwnedSubchannel()
250         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
251 
252     // Unsets the entry's subchannel.
253     // If the entry had an owned subchannel, moves the ref into
254     // owned_subchannels.
255     void UnsetSubchannel(
256         std::vector<RefCountedPtr<SubchannelWrapper>>* owned_subchannels)
257         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
258 
259     // Called when a SubchannelWrapper is orphaned.  May replace the
260     // unowned SubchannelWrapper with an owned one based on
261     // last_used_time_ and connection_idle_timeout.
262     void OnSubchannelWrapperOrphan(SubchannelWrapper* wrapper,
263                                    Duration connection_idle_timeout)
264         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
265 
connectivity_state() const266     grpc_connectivity_state connectivity_state() const
267         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
268       return connectivity_state_;
269     }
set_connectivity_state(grpc_connectivity_state state)270     void set_connectivity_state(grpc_connectivity_state state)
271         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
272       connectivity_state_ = state;
273     }
274 
eds_health_status() const275     XdsHealthStatus eds_health_status() const
276         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
277       return eds_health_status_;
278     }
set_eds_health_status(XdsHealthStatus eds_health_status)279     void set_eds_health_status(XdsHealthStatus eds_health_status)
280         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
281       eds_health_status_ = eds_health_status;
282     }
283 
address_list() const284     RefCountedStringValue address_list() const
285         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
286       return address_list_;
287     }
set_address_list(RefCountedStringValue address_list)288     void set_address_list(RefCountedStringValue address_list)
289         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
290       address_list_ = std::move(address_list);
291     }
292 
last_used_time() const293     Timestamp last_used_time() const
294         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
295       return last_used_time_;
296     }
set_last_used_time()297     void set_last_used_time()
298         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
299       last_used_time_ = Timestamp::Now();
300     }
301 
302    private:
303     grpc_connectivity_state connectivity_state_
304         ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_) = GRPC_CHANNEL_IDLE;
305     absl::variant<SubchannelWrapper*, RefCountedPtr<SubchannelWrapper>>
306         subchannel_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
307     XdsHealthStatus eds_health_status_ ABSL_GUARDED_BY(
308         &XdsOverrideHostLb::mu_) = XdsHealthStatus(XdsHealthStatus::kUnknown);
309     RefCountedStringValue address_list_
310         ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
311     Timestamp last_used_time_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_) =
312         Timestamp::InfPast();
313   };
314 
315   // A picker that wraps the picker from the child for cases when cookie is
316   // present.
317   class Picker final : public SubchannelPicker {
318    public:
319     Picker(RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
320            RefCountedPtr<SubchannelPicker> picker,
321            XdsHealthStatusSet override_host_health_status_set);
322 
323     PickResult Pick(PickArgs args) override;
324 
325    private:
326     class SubchannelConnectionRequester final {
327      public:
SubchannelConnectionRequester(RefCountedPtr<SubchannelWrapper> subchannel)328       explicit SubchannelConnectionRequester(
329           RefCountedPtr<SubchannelWrapper> subchannel)
330           : subchannel_(std::move(subchannel)) {
331         GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
332         // Hop into ExecCtx, so that we don't get stuck running
333         // arbitrary WorkSerializer callbacks while doing a pick.
334         ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
335       }
336 
337      private:
RunInExecCtx(void * arg,grpc_error_handle)338       static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
339         auto* self = static_cast<SubchannelConnectionRequester*>(arg);
340         self->subchannel_->policy()->work_serializer()->Run(
341             [self]() {
342               self->subchannel_->RequestConnection();
343               delete self;
344             },
345             DEBUG_LOCATION);
346       }
347 
348       RefCountedPtr<SubchannelWrapper> subchannel_;
349       grpc_closure closure_;
350     };
351 
352     class SubchannelCreationRequester final {
353      public:
SubchannelCreationRequester(RefCountedPtr<XdsOverrideHostLb> policy,absl::string_view address)354       SubchannelCreationRequester(RefCountedPtr<XdsOverrideHostLb> policy,
355                                   absl::string_view address)
356           : policy_(std::move(policy)), address_(address) {
357         GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
358         // Hop into ExecCtx, so that we don't get stuck running
359         // arbitrary WorkSerializer callbacks while doing a pick.
360         ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
361       }
362 
363      private:
RunInExecCtx(void * arg,grpc_error_handle)364       static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
365         auto* self = static_cast<SubchannelCreationRequester*>(arg);
366         self->policy_->work_serializer()->Run(
367             [self]() {
368               self->policy_->CreateSubchannelForAddress(self->address_);
369               delete self;
370             },
371             DEBUG_LOCATION);
372       }
373 
374       RefCountedPtr<XdsOverrideHostLb> policy_;
375       std::string address_;
376       grpc_closure closure_;
377     };
378 
379     absl::optional<LoadBalancingPolicy::PickResult> PickOverridenHost(
380         XdsOverrideHostAttribute* override_host_attr) const;
381 
382     RefCountedPtr<XdsOverrideHostLb> policy_;
383     RefCountedPtr<SubchannelPicker> picker_;
384     XdsHealthStatusSet override_host_health_status_set_;
385   };
386 
387   class Helper final
388       : public ParentOwningDelegatingChannelControlHelper<XdsOverrideHostLb> {
389    public:
Helper(RefCountedPtr<XdsOverrideHostLb> xds_override_host_policy)390     explicit Helper(RefCountedPtr<XdsOverrideHostLb> xds_override_host_policy)
391         : ParentOwningDelegatingChannelControlHelper(
392               std::move(xds_override_host_policy)) {}
393 
394     RefCountedPtr<SubchannelInterface> CreateSubchannel(
395         const grpc_resolved_address& address,
396         const ChannelArgs& per_address_args, const ChannelArgs& args) override;
397     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
398                      RefCountedPtr<SubchannelPicker> picker) override;
399   };
400 
401   class IdleTimer final : public InternallyRefCounted<IdleTimer> {
402    public:
403     IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy, Duration duration);
404 
405     void Orphan() override;
406 
407    private:
408     void OnTimerLocked();
409 
410     RefCountedPtr<XdsOverrideHostLb> policy_;
411     absl::optional<EventEngine::TaskHandle> timer_handle_;
412   };
413 
414   ~XdsOverrideHostLb() override;
415 
416   void ShutdownLocked() override;
417 
418   void ResetState();
419   void ReportTransientFailure(absl::Status status);
420 
421   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
422       const ChannelArgs& args);
423 
424   void MaybeUpdatePickerLocked();
425 
426   void UpdateAddressMap(const EndpointAddressesIterator& endpoints);
427 
428   RefCountedPtr<SubchannelWrapper> AdoptSubchannel(
429       const grpc_resolved_address& address,
430       RefCountedPtr<SubchannelInterface> subchannel);
431 
432   void CreateSubchannelForAddress(absl::string_view address);
433 
434   void CleanupSubchannels();
435 
436   // State from most recent resolver update.
437   ChannelArgs args_;
438   XdsHealthStatusSet override_host_status_set_;
439   Duration connection_idle_timeout_;
440 
441   // Internal state.
442   bool shutting_down_ = false;
443 
444   OrphanablePtr<LoadBalancingPolicy> child_policy_;
445 
446   // Latest state and picker reported by the child policy.
447   grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
448   absl::Status status_;
449   RefCountedPtr<SubchannelPicker> picker_;
450   Mutex mu_;
451   std::map<std::string, RefCountedPtr<SubchannelEntry>, std::less<>>
452       subchannel_map_ ABSL_GUARDED_BY(mu_);
453 
454   // Timer handle for periodic subchannel sweep.
455   OrphanablePtr<IdleTimer> idle_timer_;
456 };
457 
458 //
459 // XdsOverrideHostLb::Picker
460 //
461 
Picker(RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,RefCountedPtr<SubchannelPicker> picker,XdsHealthStatusSet override_host_health_status_set)462 XdsOverrideHostLb::Picker::Picker(
463     RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
464     RefCountedPtr<SubchannelPicker> picker,
465     XdsHealthStatusSet override_host_health_status_set)
466     : policy_(std::move(xds_override_host_lb)),
467       picker_(std::move(picker)),
468       override_host_health_status_set_(override_host_health_status_set) {
469   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
470     gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p",
471             policy_.get(), this);
472   }
473 }
474 
475 absl::optional<LoadBalancingPolicy::PickResult>
PickOverridenHost(XdsOverrideHostAttribute * override_host_attr) const476 XdsOverrideHostLb::Picker::PickOverridenHost(
477     XdsOverrideHostAttribute* override_host_attr) const {
478   GPR_ASSERT(override_host_attr != nullptr);
479   auto cookie_address_list = override_host_attr->cookie_address_list();
480   if (cookie_address_list.empty()) return absl::nullopt;
481   // The cookie has an address list, so look through the addresses in order.
482   absl::string_view address_with_no_subchannel;
483   RefCountedPtr<SubchannelWrapper> idle_subchannel;
484   bool found_connecting = false;
485   {
486     MutexLock lock(&policy_->mu_);
487     for (absl::string_view address : absl::StrSplit(cookie_address_list, ',')) {
488       auto it = policy_->subchannel_map_.find(address);
489       if (it == policy_->subchannel_map_.end()) continue;
490       if (!override_host_health_status_set_.Contains(
491               it->second->eds_health_status())) {
492         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
493           gpr_log(GPR_INFO,
494                   "Subchannel %s health status is not overridden (%s)",
495                   std::string(address).c_str(),
496                   it->second->eds_health_status().ToString());
497         }
498         continue;
499       }
500       auto subchannel = it->second->GetSubchannelRef();
501       if (subchannel == nullptr) {
502         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
503           gpr_log(GPR_INFO, "No subchannel for %s",
504                   std::string(address).c_str());
505         }
506         if (address_with_no_subchannel.empty()) {
507           address_with_no_subchannel = it->first;
508         }
509         continue;
510       }
511       auto connectivity_state = it->second->connectivity_state();
512       if (connectivity_state == GRPC_CHANNEL_READY) {
513         // Found a READY subchannel.  Pass back the actual address list
514         // and return the subchannel.
515         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
516           gpr_log(GPR_INFO, "Picker override found READY subchannel %s",
517                   std::string(address).c_str());
518         }
519         it->second->set_last_used_time();
520         override_host_attr->set_actual_address_list(it->second->address_list());
521         return PickResult::Complete(subchannel->wrapped_subchannel());
522       } else if (connectivity_state == GRPC_CHANNEL_IDLE) {
523         if (idle_subchannel == nullptr) idle_subchannel = std::move(subchannel);
524       } else if (connectivity_state == GRPC_CHANNEL_CONNECTING) {
525         found_connecting = true;
526       }
527     }
528   }
529   // No READY subchannel found.  If we found an IDLE subchannel, trigger
530   // a connection attempt and queue the pick until that attempt completes.
531   if (idle_subchannel != nullptr) {
532     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
533       gpr_log(GPR_INFO, "Picker override found IDLE subchannel");
534     }
535     // Deletes itself after the connection is requested.
536     new SubchannelConnectionRequester(std::move(idle_subchannel));
537     return PickResult::Queue();
538   }
539   // No READY or IDLE subchannels.  If we found a CONNECTING subchannel,
540   // queue the pick and wait for the connection attempt to complete.
541   if (found_connecting) {
542     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
543       gpr_log(GPR_INFO, "Picker override found CONNECTING subchannel");
544     }
545     return PickResult::Queue();
546   }
547   // No READY, IDLE, or CONNECTING subchannels found.  If we found an
548   // entry that has no subchannel, then queue the pick and trigger
549   // creation of a subchannel for that entry.
550   if (!address_with_no_subchannel.empty()) {
551     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
552       gpr_log(GPR_INFO, "Picker override found entry with no subchannel");
553     }
554     if (!IsWorkSerializerDispatchEnabled()) {
555       new SubchannelCreationRequester(policy_, address_with_no_subchannel);
556     } else {
557       policy_->work_serializer()->Run(
558           [policy = policy_,
559            address = std::string(address_with_no_subchannel)]() {
560             policy->CreateSubchannelForAddress(address);
561           },
562           DEBUG_LOCATION);
563     }
564     return PickResult::Queue();
565   }
566   // No entry found that was not in TRANSIENT_FAILURE.
567   return absl::nullopt;
568 }
569 
Pick(PickArgs args)570 LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(PickArgs args) {
571   auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
572   auto* override_host_attr = static_cast<XdsOverrideHostAttribute*>(
573       call_state->GetCallAttribute(XdsOverrideHostAttribute::TypeName()));
574   if (override_host_attr != nullptr) {
575     auto overridden_host_pick = PickOverridenHost(override_host_attr);
576     if (overridden_host_pick.has_value()) {
577       return std::move(*overridden_host_pick);
578     }
579   }
580   // No usable override.  Delegate to child picker.
581   if (picker_ == nullptr) {  // Should never happen.
582     return PickResult::Fail(absl::InternalError(
583         "xds_override_host picker not given any child picker"));
584   }
585   auto result = picker_->Pick(args);
586   auto complete_pick = absl::get_if<PickResult::Complete>(&result.result);
587   if (complete_pick != nullptr) {
588     auto* wrapper =
589         static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
590     // Populate the address list in the override host attribute so that
591     // the StatefulSession filter can set the cookie.
592     if (override_host_attr != nullptr) {
593       MutexLock lock(&wrapper->policy()->mu_);
594       wrapper->set_last_used_time();
595       override_host_attr->set_actual_address_list(wrapper->address_list());
596     }
597     // Unwrap the subchannel.
598     complete_pick->subchannel = wrapper->wrapped_subchannel();
599   }
600   return result;
601 }
602 
603 //
604 // XdsOverrideHostLb::IdleTimer
605 //
606 
IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy,Duration duration)607 XdsOverrideHostLb::IdleTimer::IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy,
608                                         Duration duration)
609     : policy_(std::move(policy)) {
610   // Min time between timer runs is 5s so that we don't kill ourselves
611   // with lock contention and CPU usage due to sweeps over the map.
612   duration = std::max(duration, Duration::Seconds(5));
613   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
614     gpr_log(GPR_INFO,
615             "[xds_override_host_lb %p] idle timer %p: subchannel cleanup "
616             "pass will run in %s",
617             policy_.get(), this, duration.ToString().c_str());
618   }
619   timer_handle_ = policy_->channel_control_helper()->GetEventEngine()->RunAfter(
620       duration, [self = RefAsSubclass<IdleTimer>()]() mutable {
621         ApplicationCallbackExecCtx callback_exec_ctx;
622         ExecCtx exec_ctx;
623         auto self_ptr = self.get();
624         self_ptr->policy_->work_serializer()->Run(
625             [self = std::move(self)]() { self->OnTimerLocked(); },
626             DEBUG_LOCATION);
627       });
628 }
629 
Orphan()630 void XdsOverrideHostLb::IdleTimer::Orphan() {
631   if (timer_handle_.has_value()) {
632     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
633       gpr_log(GPR_INFO, "[xds_override_host_lb %p] idle timer %p: cancelling",
634               policy_.get(), this);
635     }
636     policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
637     timer_handle_.reset();
638   }
639   Unref();
640 }
641 
OnTimerLocked()642 void XdsOverrideHostLb::IdleTimer::OnTimerLocked() {
643   if (timer_handle_.has_value()) {
644     timer_handle_.reset();
645     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
646       gpr_log(GPR_INFO, "[xds_override_host_lb %p] idle timer %p: timer fired",
647               policy_.get(), this);
648     }
649     policy_->CleanupSubchannels();
650   }
651 }
652 
653 //
654 // XdsOverrideHostLb
655 //
656 
XdsOverrideHostLb(Args args)657 XdsOverrideHostLb::XdsOverrideHostLb(Args args)
658     : LoadBalancingPolicy(std::move(args)) {
659   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
660     gpr_log(GPR_INFO, "[xds_override_host_lb %p] created", this);
661   }
662 }
663 
~XdsOverrideHostLb()664 XdsOverrideHostLb::~XdsOverrideHostLb() {
665   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
666     gpr_log(GPR_INFO,
667             "[xds_override_host_lb %p] destroying xds_override_host LB policy",
668             this);
669   }
670 }
671 
ShutdownLocked()672 void XdsOverrideHostLb::ShutdownLocked() {
673   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
674     gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this);
675   }
676   shutting_down_ = true;
677   ResetState();
678 }
679 
ResetState()680 void XdsOverrideHostLb::ResetState() {
681   {
682     // Drop subchannel refs after releasing the lock to avoid deadlock.
683     std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
684     MutexLock lock(&mu_);
685     subchannel_refs_to_drop.reserve(subchannel_map_.size());
686     for (auto& p : subchannel_map_) {
687       p.second->UnsetSubchannel(&subchannel_refs_to_drop);
688     }
689     subchannel_map_.clear();
690   }
691   // Cancel timer, if any.
692   idle_timer_.reset();
693   // Remove the child policy's interested_parties pollset_set from the
694   // xDS policy.
695   if (child_policy_ != nullptr) {
696     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
697                                      interested_parties());
698     child_policy_.reset();
699   }
700   // Drop our ref to the child's picker, in case it's holding a ref to
701   // the child.
702   picker_.reset();
703 }
704 
ReportTransientFailure(absl::Status status)705 void XdsOverrideHostLb::ReportTransientFailure(absl::Status status) {
706   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
707     gpr_log(GPR_INFO,
708             "[xds_override_host_lb %p] reporting TRANSIENT_FAILURE: %s", this,
709             status.ToString().c_str());
710   }
711   ResetState();
712   channel_control_helper()->UpdateState(
713       GRPC_CHANNEL_TRANSIENT_FAILURE, status,
714       MakeRefCounted<TransientFailurePicker>(status));
715 }
716 
ExitIdleLocked()717 void XdsOverrideHostLb::ExitIdleLocked() {
718   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
719 }
720 
ResetBackoffLocked()721 void XdsOverrideHostLb::ResetBackoffLocked() {
722   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
723 }
724 
GetEndpointHealthStatus(const EndpointAddresses & endpoint)725 XdsHealthStatus GetEndpointHealthStatus(const EndpointAddresses& endpoint) {
726   return XdsHealthStatus(static_cast<XdsHealthStatus::HealthStatus>(
727       endpoint.args()
728           .GetInt(GRPC_ARG_XDS_HEALTH_STATUS)
729           .value_or(XdsHealthStatus::HealthStatus::kUnknown)));
730 }
731 
732 // Wraps the endpoint iterator and filters out endpoints in state DRAINING.
733 class ChildEndpointIterator final : public EndpointAddressesIterator {
734  public:
ChildEndpointIterator(std::shared_ptr<EndpointAddressesIterator> parent_it)735   explicit ChildEndpointIterator(
736       std::shared_ptr<EndpointAddressesIterator> parent_it)
737       : parent_it_(std::move(parent_it)) {}
738 
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const739   void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
740       const override {
741     parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
742       XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
743       if (status.status() != XdsHealthStatus::kDraining) {
744         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
745           gpr_log(GPR_INFO,
746                   "[xds_override_host_lb %p] endpoint %s: not draining, "
747                   "passing to child",
748                   this, endpoint.ToString().c_str());
749         }
750         callback(endpoint);
751       }
752     });
753   }
754 
755  private:
756   std::shared_ptr<EndpointAddressesIterator> parent_it_;
757 };
758 
UpdateLocked(UpdateArgs args)759 absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
760   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
761     gpr_log(GPR_INFO, "[xds_override_host_lb %p] Received update", this);
762   }
763   // Grab new LB policy config.
764   if (args.config == nullptr) {
765     return absl::InvalidArgumentError("Missing policy config");
766   }
767   auto new_config = args.config.TakeAsSubclass<XdsOverrideHostLbConfig>();
768   // Get xDS config.
769   auto new_xds_config =
770       args.args.GetObjectRef<XdsDependencyManager::XdsConfig>();
771   if (new_xds_config == nullptr) {
772     // Should never happen.
773     absl::Status status = absl::InternalError(
774         "xDS config not passed to xds_cluster_impl LB policy");
775     ReportTransientFailure(status);
776     return status;
777   }
778   auto it = new_xds_config->clusters.find(new_config->cluster_name());
779   if (it == new_xds_config->clusters.end() || !it->second.ok() ||
780       it->second->cluster == nullptr) {
781     // Should never happen.
782     absl::Status status = absl::InternalError(absl::StrCat(
783         "xDS config has no entry for cluster ", new_config->cluster_name()));
784     ReportTransientFailure(status);
785     return status;
786   }
787   args_ = std::move(args.args);
788   override_host_status_set_ = it->second->cluster->override_host_statuses;
789   connection_idle_timeout_ = it->second->cluster->connection_idle_timeout;
790   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
791     gpr_log(GPR_INFO,
792             "[xds_override_host_lb %p] override host status set: %s "
793             "connection idle timeout: %s",
794             this, override_host_status_set_.ToString().c_str(),
795             connection_idle_timeout_.ToString().c_str());
796   }
797   // Update address map and wrap endpoint iterator for child policy.
798   if (args.addresses.ok()) {
799     UpdateAddressMap(**args.addresses);
800     args.addresses =
801         std::make_shared<ChildEndpointIterator>(std::move(*args.addresses));
802   } else {
803     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
804       gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this,
805               args.addresses.status().ToString().c_str());
806     }
807   }
808   // Create child policy if needed.
809   if (child_policy_ == nullptr) {
810     child_policy_ = CreateChildPolicyLocked(args.args);
811   }
812   // Update child policy.
813   UpdateArgs update_args;
814   update_args.addresses = std::move(args.addresses);
815   update_args.resolution_note = std::move(args.resolution_note);
816   update_args.config = new_config->child_config();
817   update_args.args = args_;
818   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
819     gpr_log(GPR_INFO,
820             "[xds_override_host_lb %p] Updating child policy handler %p", this,
821             child_policy_.get());
822   }
823   return child_policy_->UpdateLocked(std::move(update_args));
824 }
825 
MaybeUpdatePickerLocked()826 void XdsOverrideHostLb::MaybeUpdatePickerLocked() {
827   if (picker_ != nullptr) {
828     auto xds_override_host_picker = MakeRefCounted<Picker>(
829         RefAsSubclass<XdsOverrideHostLb>(), picker_, override_host_status_set_);
830     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
831       gpr_log(GPR_INFO,
832               "[xds_override_host_lb %p] updating connectivity: state=%s "
833               "status=(%s) picker=%p",
834               this, ConnectivityStateName(state_), status_.ToString().c_str(),
835               xds_override_host_picker.get());
836     }
837     channel_control_helper()->UpdateState(state_, status_,
838                                           std::move(xds_override_host_picker));
839   }
840 }
841 
CreateChildPolicyLocked(const ChannelArgs & args)842 OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
843     const ChannelArgs& args) {
844   LoadBalancingPolicy::Args lb_policy_args;
845   lb_policy_args.work_serializer = work_serializer();
846   lb_policy_args.args = args;
847   lb_policy_args.channel_control_helper = std::make_unique<Helper>(
848       RefAsSubclass<XdsOverrideHostLb>(DEBUG_LOCATION, "Helper"));
849   OrphanablePtr<LoadBalancingPolicy> lb_policy =
850       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
851                                          &grpc_lb_xds_override_host_trace);
852   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
853     gpr_log(GPR_INFO,
854             "[xds_override_host_lb %p] Created new child policy handler %p",
855             this, lb_policy.get());
856   }
857   // Add our interested_parties pollset_set to that of the newly created
858   // child policy. This will make the child policy progress upon activity on
859   // this policy, which in turn is tied to the application's call.
860   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
861                                    interested_parties());
862   return lb_policy;
863 }
864 
UpdateAddressMap(const EndpointAddressesIterator & endpoints)865 void XdsOverrideHostLb::UpdateAddressMap(
866     const EndpointAddressesIterator& endpoints) {
867   // Construct a map of address info from which to update subchannel_map_.
868   struct AddressInfo {
869     XdsHealthStatus eds_health_status;
870     RefCountedStringValue address_list;
871     AddressInfo(XdsHealthStatus status, RefCountedStringValue addresses)
872         : eds_health_status(status), address_list(std::move(addresses)) {}
873   };
874   std::map<const std::string, AddressInfo> addresses_for_map;
875   endpoints.ForEach([&](const EndpointAddresses& endpoint) {
876     XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
877     // Skip draining hosts if not in the override status set.
878     if (status.status() == XdsHealthStatus::kDraining &&
879         !override_host_status_set_.Contains(status)) {
880       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
881         gpr_log(GPR_INFO,
882                 "[xds_override_host_lb %p] endpoint %s: draining but not in "
883                 "override_host_status set -- ignoring",
884                 this, endpoint.ToString().c_str());
885       }
886       return;
887     }
888     std::vector<std::string> addresses;
889     addresses.reserve(endpoint.addresses().size());
890     for (const auto& address : endpoint.addresses()) {
891       auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
892       if (!key.ok()) {
893         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
894           gpr_log(GPR_INFO,
895                   "[xds_override_host_lb %p] no key for endpoint address; "
896                   "not adding to map",
897                   this);
898         }
899       } else {
900         addresses.push_back(*std::move(key));
901       }
902     }
903     absl::Span<const std::string> addresses_span = addresses;
904     for (size_t i = 0; i < addresses.size(); ++i) {
905       std::string start = absl::StrJoin(addresses_span.subspan(0, i), ",");
906       std::string end = absl::StrJoin(addresses_span.subspan(i + 1), ",");
907       RefCountedStringValue address_list(
908           absl::StrCat(addresses[i], (start.empty() ? "" : ","), start,
909                        (end.empty() ? "" : ","), end));
910       addresses_for_map.emplace(
911           std::piecewise_construct, std::forward_as_tuple(addresses[i]),
912           std::forward_as_tuple(status, std::move(address_list)));
913     }
914   });
915   // Now grab the lock and update subchannel_map_ from addresses_for_map.
916   const Timestamp now = Timestamp::Now();
917   const Timestamp idle_threshold = now - connection_idle_timeout_;
918   Duration next_time = connection_idle_timeout_;
919   {
920     // Drop subchannel refs after releasing the lock to avoid deadlock.
921     std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
922     MutexLock lock(&mu_);
923     for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
924       if (addresses_for_map.find(it->first) == addresses_for_map.end()) {
925         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
926           gpr_log(GPR_INFO, "[xds_override_host_lb %p] removing map key %s",
927                   this, it->first.c_str());
928         }
929         it->second->UnsetSubchannel(&subchannel_refs_to_drop);
930         it = subchannel_map_.erase(it);
931       } else {
932         ++it;
933       }
934     }
935     for (auto& p : addresses_for_map) {
936       const auto& address = p.first;
937       auto& address_info = p.second;
938       auto it = subchannel_map_.find(address);
939       if (it == subchannel_map_.end()) {
940         if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
941           gpr_log(GPR_INFO, "[xds_override_host_lb %p] adding map key %s", this,
942                   address.c_str());
943         }
944         it = subchannel_map_.emplace(address, MakeRefCounted<SubchannelEntry>())
945                  .first;
946       }
947       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
948         gpr_log(GPR_INFO,
949                 "[xds_override_host_lb %p] map key %s: setting "
950                 "eds_health_status=%s address_list=%s",
951                 this, address.c_str(),
952                 address_info.eds_health_status.ToString(),
953                 address_info.address_list.c_str());
954       }
955       it->second->set_eds_health_status(address_info.eds_health_status);
956       it->second->set_address_list(std::move(address_info.address_list));
957       // Check the entry's last_used_time to determine the next time at
958       // which the timer needs to run.
959       if (it->second->last_used_time() > idle_threshold) {
960         const Duration next_time_for_entry =
961             it->second->last_used_time() + connection_idle_timeout_ - now;
962         next_time = std::min(next_time, next_time_for_entry);
963       }
964     }
965   }
966   idle_timer_ =
967       MakeOrphanable<IdleTimer>(RefAsSubclass<XdsOverrideHostLb>(), next_time);
968 }
969 
970 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
AdoptSubchannel(const grpc_resolved_address & address,RefCountedPtr<SubchannelInterface> subchannel)971 XdsOverrideHostLb::AdoptSubchannel(
972     const grpc_resolved_address& address,
973     RefCountedPtr<SubchannelInterface> subchannel) {
974   auto wrapper = MakeRefCounted<SubchannelWrapper>(
975       std::move(subchannel), RefAsSubclass<XdsOverrideHostLb>());
976   auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
977   if (key.ok()) {
978     // Drop ref to previously owned subchannel (if any) after releasing
979     // the lock.
980     RefCountedPtr<SubchannelWrapper> subchannel_ref_to_drop;
981     MutexLock lock(&mu_);
982     auto it = subchannel_map_.find(*key);
983     if (it != subchannel_map_.end()) {
984       wrapper->set_subchannel_entry(it->second);
985       subchannel_ref_to_drop = it->second->SetUnownedSubchannel(wrapper.get());
986     }
987   }
988   return wrapper;
989 }
990 
CreateSubchannelForAddress(absl::string_view address)991 void XdsOverrideHostLb::CreateSubchannelForAddress(absl::string_view address) {
992   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
993     gpr_log(GPR_INFO,
994             "[xds_override_host_lb %p] creating owned subchannel for %s", this,
995             std::string(address).c_str());
996   }
997   auto addr = StringToSockaddr(address);
998   GPR_ASSERT(addr.ok());
999   // Note: We don't currently have any cases where per_address_args need to
1000   // be passed through.  If we encounter any such cases in the future, we
1001   // will need to change this to store those attributes from the resolver
1002   // update in the map entry.
1003   auto subchannel = channel_control_helper()->CreateSubchannel(
1004       *addr, /*per_address_args=*/ChannelArgs(), args_);
1005   auto wrapper = MakeRefCounted<SubchannelWrapper>(
1006       std::move(subchannel), RefAsSubclass<XdsOverrideHostLb>());
1007   {
1008     MutexLock lock(&mu_);
1009     auto it = subchannel_map_.find(address);
1010     // This can happen if the map entry was removed between the time that
1011     // the picker requested the subchannel creation and the time that we got
1012     // here.  In that case, we can just make it a no-op, since the update
1013     // that removed the entry will have generated a new picker already.
1014     if (it == subchannel_map_.end()) return;
1015     // This can happen if the picker requests subchannel creation for
1016     // the same address multiple times.
1017     if (it->second->HasOwnedSubchannel()) return;
1018     wrapper->set_subchannel_entry(it->second);
1019     it->second->SetOwnedSubchannel(std::move(wrapper));
1020   }
1021   MaybeUpdatePickerLocked();
1022 }
1023 
CleanupSubchannels()1024 void XdsOverrideHostLb::CleanupSubchannels() {
1025   const Timestamp now = Timestamp::Now();
1026   const Timestamp idle_threshold = now - connection_idle_timeout_;
1027   Duration next_time = connection_idle_timeout_;
1028   std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
1029   {
1030     MutexLock lock(&mu_);
1031     if (subchannel_map_.empty()) return;
1032     for (const auto& p : subchannel_map_) {
1033       if (p.second->last_used_time() <= idle_threshold) {
1034         auto subchannel = p.second->TakeOwnedSubchannel();
1035         if (subchannel != nullptr) {
1036           if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1037             gpr_log(GPR_INFO,
1038                     "[xds_override_host_lb %p] dropping subchannel for %s",
1039                     this, p.first.c_str());
1040           }
1041           subchannel_refs_to_drop.push_back(std::move(subchannel));
1042         }
1043       } else {
1044         // Not dropping the subchannel.  Check the entry's last_used_time to
1045         // determine the next time at which the timer needs to run.
1046         const Duration next_time_for_entry =
1047             p.second->last_used_time() + connection_idle_timeout_ - now;
1048         next_time = std::min(next_time, next_time_for_entry);
1049       }
1050     }
1051   }
1052   idle_timer_ =
1053       MakeOrphanable<IdleTimer>(RefAsSubclass<XdsOverrideHostLb>(), next_time);
1054 }
1055 
1056 //
1057 // XdsOverrideHostLb::Helper
1058 //
1059 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)1060 RefCountedPtr<SubchannelInterface> XdsOverrideHostLb::Helper::CreateSubchannel(
1061     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
1062     const ChannelArgs& args) {
1063   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1064     auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
1065     gpr_log(GPR_INFO,
1066             "[xds_override_host_lb %p] creating subchannel for %s, "
1067             "per_address_args=%s, args=%s",
1068             this, key.value_or("<unknown>").c_str(),
1069             per_address_args.ToString().c_str(), args.ToString().c_str());
1070   }
1071   auto subchannel = parent()->channel_control_helper()->CreateSubchannel(
1072       address, per_address_args, args);
1073   return parent()->AdoptSubchannel(address, std::move(subchannel));
1074 }
1075 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)1076 void XdsOverrideHostLb::Helper::UpdateState(
1077     grpc_connectivity_state state, const absl::Status& status,
1078     RefCountedPtr<SubchannelPicker> picker) {
1079   if (parent()->shutting_down_) return;
1080   // Save the state and picker.
1081   parent()->state_ = state;
1082   parent()->status_ = status;
1083   parent()->picker_ = std::move(picker);
1084   // Wrap the picker and return it to the channel.
1085   parent()->MaybeUpdatePickerLocked();
1086 }
1087 
1088 //
1089 // XdsOverrideHostLb::SubchannelWrapper
1090 //
1091 
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,RefCountedPtr<XdsOverrideHostLb> policy)1092 XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper(
1093     RefCountedPtr<SubchannelInterface> subchannel,
1094     RefCountedPtr<XdsOverrideHostLb> policy)
1095     : DelegatingSubchannel(std::move(subchannel)), policy_(std::move(policy)) {
1096   auto watcher = std::make_unique<ConnectivityStateWatcher>(
1097       WeakRefAsSubclass<SubchannelWrapper>());
1098   watcher_ = watcher.get();
1099   wrapped_subchannel()->WatchConnectivityState(std::move(watcher));
1100 }
1101 
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)1102 void XdsOverrideHostLb::SubchannelWrapper::WatchConnectivityState(
1103     std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
1104   watchers_.insert(std::move(watcher));
1105 }
1106 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)1107 void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch(
1108     ConnectivityStateWatcherInterface* watcher) {
1109   auto it = watchers_.find(watcher);
1110   if (it != watchers_.end()) {
1111     watchers_.erase(it);
1112   }
1113 }
1114 
Orphaned()1115 void XdsOverrideHostLb::SubchannelWrapper::Orphaned() {
1116   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1117     gpr_log(GPR_INFO,
1118             "[xds_override_host_lb %p] subchannel wrapper %p orphaned",
1119             policy_.get(), this);
1120   }
1121   if (!IsWorkSerializerDispatchEnabled()) {
1122     wrapped_subchannel()->CancelConnectivityStateWatch(watcher_);
1123     if (subchannel_entry_ != nullptr) {
1124       MutexLock lock(&policy()->mu_);
1125       subchannel_entry_->OnSubchannelWrapperOrphan(
1126           this, policy()->connection_idle_timeout_);
1127     }
1128     return;
1129   }
1130   policy()->work_serializer()->Run(
1131       [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
1132         self->wrapped_subchannel()->CancelConnectivityStateWatch(
1133             self->watcher_);
1134         if (self->subchannel_entry_ != nullptr) {
1135           MutexLock lock(&self->policy()->mu_);
1136           self->subchannel_entry_->OnSubchannelWrapperOrphan(
1137               self.get(), self->policy()->connection_idle_timeout_);
1138         }
1139       },
1140       DEBUG_LOCATION);
1141 }
1142 
UpdateConnectivityState(grpc_connectivity_state state,absl::Status status)1143 void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState(
1144     grpc_connectivity_state state, absl::Status status) {
1145   bool update_picker = false;
1146   if (subchannel_entry_ != nullptr) {
1147     MutexLock lock(&policy()->mu_);
1148     if (subchannel_entry_->connectivity_state() != state) {
1149       subchannel_entry_->set_connectivity_state(state);
1150       update_picker = subchannel_entry_->HasOwnedSubchannel() &&
1151                       subchannel_entry_->GetSubchannel() == this;
1152     }
1153   }
1154   // Sending connectivity state notifications to the watchers may cause the set
1155   // of watchers to change, so we can't be iterating over the set of watchers
1156   // while we send the notifications
1157   std::vector<ConnectivityStateWatcherInterface*> watchers;
1158   watchers.reserve(watchers_.size());
1159   for (const auto& watcher : watchers_) {
1160     watchers.push_back(watcher.get());
1161   }
1162   for (const auto& watcher : watchers) {
1163     if (watchers_.find(watcher) != watchers_.end()) {
1164       watcher->OnConnectivityStateChange(state, status);
1165     }
1166   }
1167   if (update_picker) policy()->MaybeUpdatePickerLocked();
1168 }
1169 
1170 //
1171 // XdsOverrideHostLb::SubchannelEntry
1172 //
1173 
1174 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
SetUnownedSubchannel(SubchannelWrapper * subchannel)1175 XdsOverrideHostLb::SubchannelEntry::SetUnownedSubchannel(
1176     SubchannelWrapper* subchannel)
1177     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1178   auto owned_subchannel = TakeOwnedSubchannel();
1179   subchannel_ = subchannel;
1180   return owned_subchannel;
1181 }
1182 
1183 XdsOverrideHostLb::SubchannelWrapper*
GetSubchannel() const1184 XdsOverrideHostLb::SubchannelEntry::GetSubchannel() const
1185     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1186   return Match(
1187       subchannel_, [](SubchannelWrapper* subchannel) { return subchannel; },
1188       [](const RefCountedPtr<SubchannelWrapper>& subchannel) {
1189         return subchannel.get();
1190       });
1191 }
1192 
1193 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
GetSubchannelRef() const1194 XdsOverrideHostLb::SubchannelEntry::GetSubchannelRef() const
1195     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1196   auto* sc = GetSubchannel();
1197   if (sc == nullptr) return nullptr;
1198   return sc->RefIfNonZero().TakeAsSubclass<SubchannelWrapper>();
1199 }
1200 
1201 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
TakeOwnedSubchannel()1202 XdsOverrideHostLb::SubchannelEntry::TakeOwnedSubchannel()
1203     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1204   return MatchMutable(
1205       &subchannel_,
1206       [](SubchannelWrapper**) -> RefCountedPtr<SubchannelWrapper> {
1207         return nullptr;
1208       },
1209       [](RefCountedPtr<SubchannelWrapper>* subchannel) {
1210         return std::move(*subchannel);
1211       });
1212 }
1213 
UnsetSubchannel(std::vector<RefCountedPtr<SubchannelWrapper>> * owned_subchannels)1214 void XdsOverrideHostLb::SubchannelEntry::UnsetSubchannel(
1215     std::vector<RefCountedPtr<SubchannelWrapper>>* owned_subchannels)
1216     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1217   auto subchannel = TakeOwnedSubchannel();
1218   if (subchannel != nullptr) {
1219     owned_subchannels->push_back(std::move(subchannel));
1220   }
1221   subchannel_ = nullptr;
1222 }
1223 
OnSubchannelWrapperOrphan(SubchannelWrapper * wrapper,Duration connection_idle_timeout)1224 void XdsOverrideHostLb::SubchannelEntry::OnSubchannelWrapperOrphan(
1225     SubchannelWrapper* wrapper, Duration connection_idle_timeout)
1226     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1227   auto* subchannel = GetSubchannel();
1228   if (subchannel != wrapper) return;
1229   if (last_used_time_ < (Timestamp::Now() - connection_idle_timeout)) {
1230     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1231       gpr_log(GPR_INFO,
1232               "[xds_override_host_lb] removing unowned subchannel wrapper %p",
1233               subchannel);
1234     }
1235     subchannel_ = nullptr;
1236   } else {
1237     // The subchannel is being released by the child policy, but it
1238     // is still within its idle timeout, so we make a new copy of
1239     // the wrapper with the same underlying subchannel, and we hold
1240     // our own ref to it.
1241     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1242       gpr_log(GPR_INFO,
1243               "[xds_override_host_lb] subchannel wrapper %p: cloning "
1244               "to gain ownership",
1245               subchannel);
1246     }
1247     subchannel_ = wrapper->Clone();
1248   }
1249 }
1250 
1251 //
1252 // factory
1253 //
1254 
1255 class XdsOverrideHostLbFactory final : public LoadBalancingPolicyFactory {
1256  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1257   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1258       LoadBalancingPolicy::Args args) const override {
1259     return MakeOrphanable<XdsOverrideHostLb>(std::move(args));
1260   }
1261 
name() const1262   absl::string_view name() const override {
1263     return XdsOverrideHostLbConfig::Name();
1264   }
1265 
1266   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1267   ParseLoadBalancingConfig(const Json& json) const override {
1268     return LoadFromJson<RefCountedPtr<XdsOverrideHostLbConfig>>(
1269         json, JsonArgs(),
1270         "errors validating xds_override_host LB policy config");
1271   }
1272 };
1273 
1274 }  // namespace
1275 
RegisterXdsOverrideHostLbPolicy(CoreConfiguration::Builder * builder)1276 void RegisterXdsOverrideHostLbPolicy(CoreConfiguration::Builder* builder) {
1277   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1278       std::make_unique<XdsOverrideHostLbFactory>());
1279 }
1280 
1281 //
1282 // XdsOverrideHostLbConfig
1283 //
1284 
JsonLoader(const JsonArgs &)1285 const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader(
1286     const JsonArgs&) {
1287   static const auto kJsonLoader =
1288       JsonObjectLoader<XdsOverrideHostLbConfig>()
1289           // Child policy config is parsed in JsonPostLoad
1290           .Field("clusterName", &XdsOverrideHostLbConfig::cluster_name_)
1291           .Finish();
1292   return kJsonLoader;
1293 }
1294 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)1295 void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
1296                                            ValidationErrors* errors) {
1297   ValidationErrors::ScopedField field(errors, ".childPolicy");
1298   auto it = json.object().find("childPolicy");
1299   if (it == json.object().end()) {
1300     errors->AddError("field not present");
1301   } else {
1302     auto child_policy_config =
1303         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1304             it->second);
1305     if (!child_policy_config.ok()) {
1306       errors->AddError(child_policy_config.status().message());
1307     } else {
1308       child_config_ = std::move(*child_policy_config);
1309     }
1310   }
1311 }
1312 
1313 }  // namespace grpc_core
1314