1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/load_balancing/xds/xds_override_host.h"
20
21 #include <stddef.h>
22
23 #include <algorithm>
24 #include <functional>
25 #include <map>
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <tuple>
30 #include <type_traits>
31 #include <utility>
32 #include <vector>
33
34 #include "absl/base/thread_annotations.h"
35 #include "absl/functional/function_ref.h"
36 #include "absl/status/status.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/str_cat.h"
39 #include "absl/strings/str_join.h"
40 #include "absl/strings/str_split.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/types/optional.h"
43 #include "absl/types/span.h"
44 #include "absl/types/variant.h"
45
46 #include <grpc/event_engine/event_engine.h>
47 #include <grpc/impl/connectivity_state.h>
48 #include <grpc/support/log.h>
49
50 #include "src/core/client_channel/client_channel_internal.h"
51 #include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
52 #include "src/core/ext/xds/xds_health_status.h"
53 #include "src/core/lib/address_utils/parse_address.h"
54 #include "src/core/lib/address_utils/sockaddr_utils.h"
55 #include "src/core/lib/channel/channel_args.h"
56 #include "src/core/lib/config/core_configuration.h"
57 #include "src/core/lib/debug/trace.h"
58 #include "src/core/lib/experiments/experiments.h"
59 #include "src/core/lib/gprpp/debug_location.h"
60 #include "src/core/lib/gprpp/match.h"
61 #include "src/core/lib/gprpp/orphanable.h"
62 #include "src/core/lib/gprpp/ref_counted_ptr.h"
63 #include "src/core/lib/gprpp/ref_counted_string.h"
64 #include "src/core/lib/gprpp/sync.h"
65 #include "src/core/lib/gprpp/validation_errors.h"
66 #include "src/core/lib/gprpp/work_serializer.h"
67 #include "src/core/lib/iomgr/closure.h"
68 #include "src/core/lib/iomgr/error.h"
69 #include "src/core/lib/iomgr/exec_ctx.h"
70 #include "src/core/lib/iomgr/iomgr_fwd.h"
71 #include "src/core/lib/iomgr/pollset_set.h"
72 #include "src/core/lib/iomgr/resolved_address.h"
73 #include "src/core/lib/json/json.h"
74 #include "src/core/lib/json/json_args.h"
75 #include "src/core/lib/json/json_object_loader.h"
76 #include "src/core/lib/transport/connectivity_state.h"
77 #include "src/core/load_balancing/child_policy_handler.h"
78 #include "src/core/load_balancing/delegating_helper.h"
79 #include "src/core/load_balancing/lb_policy.h"
80 #include "src/core/load_balancing/lb_policy_factory.h"
81 #include "src/core/load_balancing/lb_policy_registry.h"
82 #include "src/core/load_balancing/subchannel_interface.h"
83 #include "src/core/resolver/endpoint_addresses.h"
84 #include "src/core/resolver/xds/xds_dependency_manager.h"
85
86 namespace grpc_core {
87
88 using ::grpc_event_engine::experimental::EventEngine;
89
90 TraceFlag grpc_lb_xds_override_host_trace(false, "xds_override_host_lb");
91
92 namespace {
93 template <typename Value>
94 struct PtrLessThan {
95 using is_transparent = void;
96
operator ()grpc_core::__anonb917cf940111::PtrLessThan97 bool operator()(const std::unique_ptr<Value>& v1,
98 const std::unique_ptr<Value>& v2) const {
99 return v1 < v2;
100 }
operator ()grpc_core::__anonb917cf940111::PtrLessThan101 bool operator()(const Value* v1, const Value* v2) const { return v1 < v2; }
operator ()grpc_core::__anonb917cf940111::PtrLessThan102 bool operator()(const Value* v1, const std::unique_ptr<Value>& v2) const {
103 return v1 < v2.get();
104 }
operator ()grpc_core::__anonb917cf940111::PtrLessThan105 bool operator()(const std::unique_ptr<Value>& v1, const Value* v2) const {
106 return v1.get() < v2;
107 }
108 };
109
110 //
111 // xds_override_host LB policy
112 //
113
114 class XdsOverrideHostLb final : public LoadBalancingPolicy {
115 public:
116 explicit XdsOverrideHostLb(Args args);
117
name() const118 absl::string_view name() const override {
119 return XdsOverrideHostLbConfig::Name();
120 }
121
122 absl::Status UpdateLocked(UpdateArgs args) override;
123 void ExitIdleLocked() override;
124 void ResetBackoffLocked() override;
125
126 private:
127 class SubchannelEntry;
128
129 class SubchannelWrapper final : public DelegatingSubchannel {
130 public:
131 SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
132 RefCountedPtr<XdsOverrideHostLb> policy);
133
134 // Called immediately after construction. We use two-phase initialization
135 // to avoid doing an allocation while holding the lock.
set_subchannel_entry(RefCountedPtr<SubchannelEntry> subchannel_entry)136 void set_subchannel_entry(RefCountedPtr<SubchannelEntry> subchannel_entry) {
137 subchannel_entry_ = std::move(subchannel_entry);
138 }
139
140 void WatchConnectivityState(
141 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override;
142
143 void CancelConnectivityStateWatch(
144 ConnectivityStateWatcherInterface* watcher) override;
145
address_list() const146 RefCountedStringValue address_list() const
147 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
148 return subchannel_entry_->address_list();
149 }
150
set_last_used_time()151 void set_last_used_time()
152 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
153 subchannel_entry_->set_last_used_time();
154 }
155
policy() const156 XdsOverrideHostLb* policy() const { return policy_.get(); }
157
Clone() const158 RefCountedPtr<SubchannelWrapper> Clone() const {
159 auto subchannel =
160 MakeRefCounted<SubchannelWrapper>(wrapped_subchannel(), policy_);
161 subchannel->set_subchannel_entry(subchannel_entry_);
162 return subchannel;
163 }
164
165 private:
166 class ConnectivityStateWatcher final
167 : public ConnectivityStateWatcherInterface {
168 public:
ConnectivityStateWatcher(WeakRefCountedPtr<SubchannelWrapper> subchannel)169 explicit ConnectivityStateWatcher(
170 WeakRefCountedPtr<SubchannelWrapper> subchannel)
171 : subchannel_(std::move(subchannel)) {}
172
OnConnectivityStateChange(grpc_connectivity_state state,absl::Status status)173 void OnConnectivityStateChange(grpc_connectivity_state state,
174 absl::Status status) override {
175 subchannel_->UpdateConnectivityState(state, status);
176 }
177
interested_parties()178 grpc_pollset_set* interested_parties() override {
179 return subchannel_->policy()->interested_parties();
180 }
181
182 private:
183 WeakRefCountedPtr<SubchannelWrapper> subchannel_;
184 };
185
186 void Orphaned() override;
187 void UpdateConnectivityState(grpc_connectivity_state state,
188 absl::Status status);
189
190 RefCountedPtr<XdsOverrideHostLb> policy_;
191 RefCountedPtr<SubchannelEntry> subchannel_entry_;
192 ConnectivityStateWatcher* watcher_;
193 std::set<std::unique_ptr<ConnectivityStateWatcherInterface>,
194 PtrLessThan<ConnectivityStateWatcherInterface>>
195 watchers_;
196 };
197
198 // An entry in the subchannel map.
199 //
200 // The entry may hold either an owned (RefCountedPtr<>) or unowned
201 // (raw pointer) SubchannelWrapper, but not both. It will be unowned
202 // in the case where the SubchannelWrapper is owned by the child policy.
203 // It will be owned in the case where the child policy has not created a
204 // subchannel but we have RPCs whose cookies point to that address.
205 //
206 // Note that when a SubchannelWrapper is orphaned, it will try to
207 // acquire the lock to remove itself from the entry. This means that
208 // whenever we need to remove an owned subchannel from an entry, if we
209 // released our ref to the SubchannelWrapper immediately, we would
210 // cause a deadlock, since our caller is already holding the lock. To
211 // avoid that, any method that may result in releasing a ref to the
212 // SubchannelWrapper will instead return that ref to the caller, who is
213 // responsible for releasing the ref after releasing the lock.
214 class SubchannelEntry final : public RefCounted<SubchannelEntry> {
215 public:
HasOwnedSubchannel() const216 bool HasOwnedSubchannel() const
217 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
218 auto* sc = absl::get_if<RefCountedPtr<SubchannelWrapper>>(&subchannel_);
219 return sc != nullptr && *sc != nullptr;
220 }
221
222 // Sets the unowned subchannel. If the entry previously had an
223 // owned subchannel, returns the ref to it.
224 RefCountedPtr<SubchannelWrapper> SetUnownedSubchannel(
225 SubchannelWrapper* subchannel)
226 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
227
228 // Sets the owned subchannel. Must not be called if the entry
229 // already has an owned subchannel.
SetOwnedSubchannel(RefCountedPtr<SubchannelWrapper> subchannel)230 void SetOwnedSubchannel(RefCountedPtr<SubchannelWrapper> subchannel)
231 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
232 GPR_DEBUG_ASSERT(!HasOwnedSubchannel());
233 subchannel_ = std::move(subchannel);
234 }
235
236 // Returns a pointer to the subchannel, regardless of whether it's
237 // owned or not.
238 SubchannelWrapper* GetSubchannel() const
239 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
240
241 // Returns a ref to the subchannel, regardless of whether it's owned
242 // or not. Returns null if there is no subchannel or if the
243 // subchannel's ref count is 0.
244 RefCountedPtr<SubchannelWrapper> GetSubchannelRef() const
245 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
246
247 // If the entry has an owned subchannel, moves it out of the entry
248 // and returns it.
249 RefCountedPtr<SubchannelWrapper> TakeOwnedSubchannel()
250 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
251
252 // Unsets the entry's subchannel.
253 // If the entry had an owned subchannel, moves the ref into
254 // owned_subchannels.
255 void UnsetSubchannel(
256 std::vector<RefCountedPtr<SubchannelWrapper>>* owned_subchannels)
257 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
258
259 // Called when a SubchannelWrapper is orphaned. May replace the
260 // unowned SubchannelWrapper with an owned one based on
261 // last_used_time_ and connection_idle_timeout.
262 void OnSubchannelWrapperOrphan(SubchannelWrapper* wrapper,
263 Duration connection_idle_timeout)
264 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_);
265
connectivity_state() const266 grpc_connectivity_state connectivity_state() const
267 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
268 return connectivity_state_;
269 }
set_connectivity_state(grpc_connectivity_state state)270 void set_connectivity_state(grpc_connectivity_state state)
271 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
272 connectivity_state_ = state;
273 }
274
eds_health_status() const275 XdsHealthStatus eds_health_status() const
276 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
277 return eds_health_status_;
278 }
set_eds_health_status(XdsHealthStatus eds_health_status)279 void set_eds_health_status(XdsHealthStatus eds_health_status)
280 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
281 eds_health_status_ = eds_health_status;
282 }
283
address_list() const284 RefCountedStringValue address_list() const
285 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
286 return address_list_;
287 }
set_address_list(RefCountedStringValue address_list)288 void set_address_list(RefCountedStringValue address_list)
289 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
290 address_list_ = std::move(address_list);
291 }
292
last_used_time() const293 Timestamp last_used_time() const
294 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
295 return last_used_time_;
296 }
set_last_used_time()297 void set_last_used_time()
298 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
299 last_used_time_ = Timestamp::Now();
300 }
301
302 private:
303 grpc_connectivity_state connectivity_state_
304 ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_) = GRPC_CHANNEL_IDLE;
305 absl::variant<SubchannelWrapper*, RefCountedPtr<SubchannelWrapper>>
306 subchannel_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
307 XdsHealthStatus eds_health_status_ ABSL_GUARDED_BY(
308 &XdsOverrideHostLb::mu_) = XdsHealthStatus(XdsHealthStatus::kUnknown);
309 RefCountedStringValue address_list_
310 ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_);
311 Timestamp last_used_time_ ABSL_GUARDED_BY(&XdsOverrideHostLb::mu_) =
312 Timestamp::InfPast();
313 };
314
315 // A picker that wraps the picker from the child for cases when cookie is
316 // present.
317 class Picker final : public SubchannelPicker {
318 public:
319 Picker(RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
320 RefCountedPtr<SubchannelPicker> picker,
321 XdsHealthStatusSet override_host_health_status_set);
322
323 PickResult Pick(PickArgs args) override;
324
325 private:
326 class SubchannelConnectionRequester final {
327 public:
SubchannelConnectionRequester(RefCountedPtr<SubchannelWrapper> subchannel)328 explicit SubchannelConnectionRequester(
329 RefCountedPtr<SubchannelWrapper> subchannel)
330 : subchannel_(std::move(subchannel)) {
331 GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
332 // Hop into ExecCtx, so that we don't get stuck running
333 // arbitrary WorkSerializer callbacks while doing a pick.
334 ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
335 }
336
337 private:
RunInExecCtx(void * arg,grpc_error_handle)338 static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
339 auto* self = static_cast<SubchannelConnectionRequester*>(arg);
340 self->subchannel_->policy()->work_serializer()->Run(
341 [self]() {
342 self->subchannel_->RequestConnection();
343 delete self;
344 },
345 DEBUG_LOCATION);
346 }
347
348 RefCountedPtr<SubchannelWrapper> subchannel_;
349 grpc_closure closure_;
350 };
351
352 class SubchannelCreationRequester final {
353 public:
SubchannelCreationRequester(RefCountedPtr<XdsOverrideHostLb> policy,absl::string_view address)354 SubchannelCreationRequester(RefCountedPtr<XdsOverrideHostLb> policy,
355 absl::string_view address)
356 : policy_(std::move(policy)), address_(address) {
357 GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
358 // Hop into ExecCtx, so that we don't get stuck running
359 // arbitrary WorkSerializer callbacks while doing a pick.
360 ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
361 }
362
363 private:
RunInExecCtx(void * arg,grpc_error_handle)364 static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) {
365 auto* self = static_cast<SubchannelCreationRequester*>(arg);
366 self->policy_->work_serializer()->Run(
367 [self]() {
368 self->policy_->CreateSubchannelForAddress(self->address_);
369 delete self;
370 },
371 DEBUG_LOCATION);
372 }
373
374 RefCountedPtr<XdsOverrideHostLb> policy_;
375 std::string address_;
376 grpc_closure closure_;
377 };
378
379 absl::optional<LoadBalancingPolicy::PickResult> PickOverridenHost(
380 XdsOverrideHostAttribute* override_host_attr) const;
381
382 RefCountedPtr<XdsOverrideHostLb> policy_;
383 RefCountedPtr<SubchannelPicker> picker_;
384 XdsHealthStatusSet override_host_health_status_set_;
385 };
386
387 class Helper final
388 : public ParentOwningDelegatingChannelControlHelper<XdsOverrideHostLb> {
389 public:
Helper(RefCountedPtr<XdsOverrideHostLb> xds_override_host_policy)390 explicit Helper(RefCountedPtr<XdsOverrideHostLb> xds_override_host_policy)
391 : ParentOwningDelegatingChannelControlHelper(
392 std::move(xds_override_host_policy)) {}
393
394 RefCountedPtr<SubchannelInterface> CreateSubchannel(
395 const grpc_resolved_address& address,
396 const ChannelArgs& per_address_args, const ChannelArgs& args) override;
397 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
398 RefCountedPtr<SubchannelPicker> picker) override;
399 };
400
401 class IdleTimer final : public InternallyRefCounted<IdleTimer> {
402 public:
403 IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy, Duration duration);
404
405 void Orphan() override;
406
407 private:
408 void OnTimerLocked();
409
410 RefCountedPtr<XdsOverrideHostLb> policy_;
411 absl::optional<EventEngine::TaskHandle> timer_handle_;
412 };
413
414 ~XdsOverrideHostLb() override;
415
416 void ShutdownLocked() override;
417
418 void ResetState();
419 void ReportTransientFailure(absl::Status status);
420
421 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
422 const ChannelArgs& args);
423
424 void MaybeUpdatePickerLocked();
425
426 void UpdateAddressMap(const EndpointAddressesIterator& endpoints);
427
428 RefCountedPtr<SubchannelWrapper> AdoptSubchannel(
429 const grpc_resolved_address& address,
430 RefCountedPtr<SubchannelInterface> subchannel);
431
432 void CreateSubchannelForAddress(absl::string_view address);
433
434 void CleanupSubchannels();
435
436 // State from most recent resolver update.
437 ChannelArgs args_;
438 XdsHealthStatusSet override_host_status_set_;
439 Duration connection_idle_timeout_;
440
441 // Internal state.
442 bool shutting_down_ = false;
443
444 OrphanablePtr<LoadBalancingPolicy> child_policy_;
445
446 // Latest state and picker reported by the child policy.
447 grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
448 absl::Status status_;
449 RefCountedPtr<SubchannelPicker> picker_;
450 Mutex mu_;
451 std::map<std::string, RefCountedPtr<SubchannelEntry>, std::less<>>
452 subchannel_map_ ABSL_GUARDED_BY(mu_);
453
454 // Timer handle for periodic subchannel sweep.
455 OrphanablePtr<IdleTimer> idle_timer_;
456 };
457
458 //
459 // XdsOverrideHostLb::Picker
460 //
461
Picker(RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,RefCountedPtr<SubchannelPicker> picker,XdsHealthStatusSet override_host_health_status_set)462 XdsOverrideHostLb::Picker::Picker(
463 RefCountedPtr<XdsOverrideHostLb> xds_override_host_lb,
464 RefCountedPtr<SubchannelPicker> picker,
465 XdsHealthStatusSet override_host_health_status_set)
466 : policy_(std::move(xds_override_host_lb)),
467 picker_(std::move(picker)),
468 override_host_health_status_set_(override_host_health_status_set) {
469 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
470 gpr_log(GPR_INFO, "[xds_override_host_lb %p] constructed new picker %p",
471 policy_.get(), this);
472 }
473 }
474
475 absl::optional<LoadBalancingPolicy::PickResult>
PickOverridenHost(XdsOverrideHostAttribute * override_host_attr) const476 XdsOverrideHostLb::Picker::PickOverridenHost(
477 XdsOverrideHostAttribute* override_host_attr) const {
478 GPR_ASSERT(override_host_attr != nullptr);
479 auto cookie_address_list = override_host_attr->cookie_address_list();
480 if (cookie_address_list.empty()) return absl::nullopt;
481 // The cookie has an address list, so look through the addresses in order.
482 absl::string_view address_with_no_subchannel;
483 RefCountedPtr<SubchannelWrapper> idle_subchannel;
484 bool found_connecting = false;
485 {
486 MutexLock lock(&policy_->mu_);
487 for (absl::string_view address : absl::StrSplit(cookie_address_list, ',')) {
488 auto it = policy_->subchannel_map_.find(address);
489 if (it == policy_->subchannel_map_.end()) continue;
490 if (!override_host_health_status_set_.Contains(
491 it->second->eds_health_status())) {
492 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
493 gpr_log(GPR_INFO,
494 "Subchannel %s health status is not overridden (%s)",
495 std::string(address).c_str(),
496 it->second->eds_health_status().ToString());
497 }
498 continue;
499 }
500 auto subchannel = it->second->GetSubchannelRef();
501 if (subchannel == nullptr) {
502 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
503 gpr_log(GPR_INFO, "No subchannel for %s",
504 std::string(address).c_str());
505 }
506 if (address_with_no_subchannel.empty()) {
507 address_with_no_subchannel = it->first;
508 }
509 continue;
510 }
511 auto connectivity_state = it->second->connectivity_state();
512 if (connectivity_state == GRPC_CHANNEL_READY) {
513 // Found a READY subchannel. Pass back the actual address list
514 // and return the subchannel.
515 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
516 gpr_log(GPR_INFO, "Picker override found READY subchannel %s",
517 std::string(address).c_str());
518 }
519 it->second->set_last_used_time();
520 override_host_attr->set_actual_address_list(it->second->address_list());
521 return PickResult::Complete(subchannel->wrapped_subchannel());
522 } else if (connectivity_state == GRPC_CHANNEL_IDLE) {
523 if (idle_subchannel == nullptr) idle_subchannel = std::move(subchannel);
524 } else if (connectivity_state == GRPC_CHANNEL_CONNECTING) {
525 found_connecting = true;
526 }
527 }
528 }
529 // No READY subchannel found. If we found an IDLE subchannel, trigger
530 // a connection attempt and queue the pick until that attempt completes.
531 if (idle_subchannel != nullptr) {
532 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
533 gpr_log(GPR_INFO, "Picker override found IDLE subchannel");
534 }
535 // Deletes itself after the connection is requested.
536 new SubchannelConnectionRequester(std::move(idle_subchannel));
537 return PickResult::Queue();
538 }
539 // No READY or IDLE subchannels. If we found a CONNECTING subchannel,
540 // queue the pick and wait for the connection attempt to complete.
541 if (found_connecting) {
542 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
543 gpr_log(GPR_INFO, "Picker override found CONNECTING subchannel");
544 }
545 return PickResult::Queue();
546 }
547 // No READY, IDLE, or CONNECTING subchannels found. If we found an
548 // entry that has no subchannel, then queue the pick and trigger
549 // creation of a subchannel for that entry.
550 if (!address_with_no_subchannel.empty()) {
551 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
552 gpr_log(GPR_INFO, "Picker override found entry with no subchannel");
553 }
554 if (!IsWorkSerializerDispatchEnabled()) {
555 new SubchannelCreationRequester(policy_, address_with_no_subchannel);
556 } else {
557 policy_->work_serializer()->Run(
558 [policy = policy_,
559 address = std::string(address_with_no_subchannel)]() {
560 policy->CreateSubchannelForAddress(address);
561 },
562 DEBUG_LOCATION);
563 }
564 return PickResult::Queue();
565 }
566 // No entry found that was not in TRANSIENT_FAILURE.
567 return absl::nullopt;
568 }
569
Pick(PickArgs args)570 LoadBalancingPolicy::PickResult XdsOverrideHostLb::Picker::Pick(PickArgs args) {
571 auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
572 auto* override_host_attr = static_cast<XdsOverrideHostAttribute*>(
573 call_state->GetCallAttribute(XdsOverrideHostAttribute::TypeName()));
574 if (override_host_attr != nullptr) {
575 auto overridden_host_pick = PickOverridenHost(override_host_attr);
576 if (overridden_host_pick.has_value()) {
577 return std::move(*overridden_host_pick);
578 }
579 }
580 // No usable override. Delegate to child picker.
581 if (picker_ == nullptr) { // Should never happen.
582 return PickResult::Fail(absl::InternalError(
583 "xds_override_host picker not given any child picker"));
584 }
585 auto result = picker_->Pick(args);
586 auto complete_pick = absl::get_if<PickResult::Complete>(&result.result);
587 if (complete_pick != nullptr) {
588 auto* wrapper =
589 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
590 // Populate the address list in the override host attribute so that
591 // the StatefulSession filter can set the cookie.
592 if (override_host_attr != nullptr) {
593 MutexLock lock(&wrapper->policy()->mu_);
594 wrapper->set_last_used_time();
595 override_host_attr->set_actual_address_list(wrapper->address_list());
596 }
597 // Unwrap the subchannel.
598 complete_pick->subchannel = wrapper->wrapped_subchannel();
599 }
600 return result;
601 }
602
603 //
604 // XdsOverrideHostLb::IdleTimer
605 //
606
IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy,Duration duration)607 XdsOverrideHostLb::IdleTimer::IdleTimer(RefCountedPtr<XdsOverrideHostLb> policy,
608 Duration duration)
609 : policy_(std::move(policy)) {
610 // Min time between timer runs is 5s so that we don't kill ourselves
611 // with lock contention and CPU usage due to sweeps over the map.
612 duration = std::max(duration, Duration::Seconds(5));
613 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
614 gpr_log(GPR_INFO,
615 "[xds_override_host_lb %p] idle timer %p: subchannel cleanup "
616 "pass will run in %s",
617 policy_.get(), this, duration.ToString().c_str());
618 }
619 timer_handle_ = policy_->channel_control_helper()->GetEventEngine()->RunAfter(
620 duration, [self = RefAsSubclass<IdleTimer>()]() mutable {
621 ApplicationCallbackExecCtx callback_exec_ctx;
622 ExecCtx exec_ctx;
623 auto self_ptr = self.get();
624 self_ptr->policy_->work_serializer()->Run(
625 [self = std::move(self)]() { self->OnTimerLocked(); },
626 DEBUG_LOCATION);
627 });
628 }
629
Orphan()630 void XdsOverrideHostLb::IdleTimer::Orphan() {
631 if (timer_handle_.has_value()) {
632 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
633 gpr_log(GPR_INFO, "[xds_override_host_lb %p] idle timer %p: cancelling",
634 policy_.get(), this);
635 }
636 policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
637 timer_handle_.reset();
638 }
639 Unref();
640 }
641
OnTimerLocked()642 void XdsOverrideHostLb::IdleTimer::OnTimerLocked() {
643 if (timer_handle_.has_value()) {
644 timer_handle_.reset();
645 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
646 gpr_log(GPR_INFO, "[xds_override_host_lb %p] idle timer %p: timer fired",
647 policy_.get(), this);
648 }
649 policy_->CleanupSubchannels();
650 }
651 }
652
653 //
654 // XdsOverrideHostLb
655 //
656
XdsOverrideHostLb(Args args)657 XdsOverrideHostLb::XdsOverrideHostLb(Args args)
658 : LoadBalancingPolicy(std::move(args)) {
659 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
660 gpr_log(GPR_INFO, "[xds_override_host_lb %p] created", this);
661 }
662 }
663
~XdsOverrideHostLb()664 XdsOverrideHostLb::~XdsOverrideHostLb() {
665 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
666 gpr_log(GPR_INFO,
667 "[xds_override_host_lb %p] destroying xds_override_host LB policy",
668 this);
669 }
670 }
671
ShutdownLocked()672 void XdsOverrideHostLb::ShutdownLocked() {
673 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
674 gpr_log(GPR_INFO, "[xds_override_host_lb %p] shutting down", this);
675 }
676 shutting_down_ = true;
677 ResetState();
678 }
679
ResetState()680 void XdsOverrideHostLb::ResetState() {
681 {
682 // Drop subchannel refs after releasing the lock to avoid deadlock.
683 std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
684 MutexLock lock(&mu_);
685 subchannel_refs_to_drop.reserve(subchannel_map_.size());
686 for (auto& p : subchannel_map_) {
687 p.second->UnsetSubchannel(&subchannel_refs_to_drop);
688 }
689 subchannel_map_.clear();
690 }
691 // Cancel timer, if any.
692 idle_timer_.reset();
693 // Remove the child policy's interested_parties pollset_set from the
694 // xDS policy.
695 if (child_policy_ != nullptr) {
696 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
697 interested_parties());
698 child_policy_.reset();
699 }
700 // Drop our ref to the child's picker, in case it's holding a ref to
701 // the child.
702 picker_.reset();
703 }
704
ReportTransientFailure(absl::Status status)705 void XdsOverrideHostLb::ReportTransientFailure(absl::Status status) {
706 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
707 gpr_log(GPR_INFO,
708 "[xds_override_host_lb %p] reporting TRANSIENT_FAILURE: %s", this,
709 status.ToString().c_str());
710 }
711 ResetState();
712 channel_control_helper()->UpdateState(
713 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
714 MakeRefCounted<TransientFailurePicker>(status));
715 }
716
ExitIdleLocked()717 void XdsOverrideHostLb::ExitIdleLocked() {
718 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
719 }
720
ResetBackoffLocked()721 void XdsOverrideHostLb::ResetBackoffLocked() {
722 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
723 }
724
GetEndpointHealthStatus(const EndpointAddresses & endpoint)725 XdsHealthStatus GetEndpointHealthStatus(const EndpointAddresses& endpoint) {
726 return XdsHealthStatus(static_cast<XdsHealthStatus::HealthStatus>(
727 endpoint.args()
728 .GetInt(GRPC_ARG_XDS_HEALTH_STATUS)
729 .value_or(XdsHealthStatus::HealthStatus::kUnknown)));
730 }
731
732 // Wraps the endpoint iterator and filters out endpoints in state DRAINING.
733 class ChildEndpointIterator final : public EndpointAddressesIterator {
734 public:
ChildEndpointIterator(std::shared_ptr<EndpointAddressesIterator> parent_it)735 explicit ChildEndpointIterator(
736 std::shared_ptr<EndpointAddressesIterator> parent_it)
737 : parent_it_(std::move(parent_it)) {}
738
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const739 void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
740 const override {
741 parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
742 XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
743 if (status.status() != XdsHealthStatus::kDraining) {
744 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
745 gpr_log(GPR_INFO,
746 "[xds_override_host_lb %p] endpoint %s: not draining, "
747 "passing to child",
748 this, endpoint.ToString().c_str());
749 }
750 callback(endpoint);
751 }
752 });
753 }
754
755 private:
756 std::shared_ptr<EndpointAddressesIterator> parent_it_;
757 };
758
UpdateLocked(UpdateArgs args)759 absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
760 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
761 gpr_log(GPR_INFO, "[xds_override_host_lb %p] Received update", this);
762 }
763 // Grab new LB policy config.
764 if (args.config == nullptr) {
765 return absl::InvalidArgumentError("Missing policy config");
766 }
767 auto new_config = args.config.TakeAsSubclass<XdsOverrideHostLbConfig>();
768 // Get xDS config.
769 auto new_xds_config =
770 args.args.GetObjectRef<XdsDependencyManager::XdsConfig>();
771 if (new_xds_config == nullptr) {
772 // Should never happen.
773 absl::Status status = absl::InternalError(
774 "xDS config not passed to xds_cluster_impl LB policy");
775 ReportTransientFailure(status);
776 return status;
777 }
778 auto it = new_xds_config->clusters.find(new_config->cluster_name());
779 if (it == new_xds_config->clusters.end() || !it->second.ok() ||
780 it->second->cluster == nullptr) {
781 // Should never happen.
782 absl::Status status = absl::InternalError(absl::StrCat(
783 "xDS config has no entry for cluster ", new_config->cluster_name()));
784 ReportTransientFailure(status);
785 return status;
786 }
787 args_ = std::move(args.args);
788 override_host_status_set_ = it->second->cluster->override_host_statuses;
789 connection_idle_timeout_ = it->second->cluster->connection_idle_timeout;
790 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
791 gpr_log(GPR_INFO,
792 "[xds_override_host_lb %p] override host status set: %s "
793 "connection idle timeout: %s",
794 this, override_host_status_set_.ToString().c_str(),
795 connection_idle_timeout_.ToString().c_str());
796 }
797 // Update address map and wrap endpoint iterator for child policy.
798 if (args.addresses.ok()) {
799 UpdateAddressMap(**args.addresses);
800 args.addresses =
801 std::make_shared<ChildEndpointIterator>(std::move(*args.addresses));
802 } else {
803 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
804 gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this,
805 args.addresses.status().ToString().c_str());
806 }
807 }
808 // Create child policy if needed.
809 if (child_policy_ == nullptr) {
810 child_policy_ = CreateChildPolicyLocked(args.args);
811 }
812 // Update child policy.
813 UpdateArgs update_args;
814 update_args.addresses = std::move(args.addresses);
815 update_args.resolution_note = std::move(args.resolution_note);
816 update_args.config = new_config->child_config();
817 update_args.args = args_;
818 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
819 gpr_log(GPR_INFO,
820 "[xds_override_host_lb %p] Updating child policy handler %p", this,
821 child_policy_.get());
822 }
823 return child_policy_->UpdateLocked(std::move(update_args));
824 }
825
MaybeUpdatePickerLocked()826 void XdsOverrideHostLb::MaybeUpdatePickerLocked() {
827 if (picker_ != nullptr) {
828 auto xds_override_host_picker = MakeRefCounted<Picker>(
829 RefAsSubclass<XdsOverrideHostLb>(), picker_, override_host_status_set_);
830 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
831 gpr_log(GPR_INFO,
832 "[xds_override_host_lb %p] updating connectivity: state=%s "
833 "status=(%s) picker=%p",
834 this, ConnectivityStateName(state_), status_.ToString().c_str(),
835 xds_override_host_picker.get());
836 }
837 channel_control_helper()->UpdateState(state_, status_,
838 std::move(xds_override_host_picker));
839 }
840 }
841
CreateChildPolicyLocked(const ChannelArgs & args)842 OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
843 const ChannelArgs& args) {
844 LoadBalancingPolicy::Args lb_policy_args;
845 lb_policy_args.work_serializer = work_serializer();
846 lb_policy_args.args = args;
847 lb_policy_args.channel_control_helper = std::make_unique<Helper>(
848 RefAsSubclass<XdsOverrideHostLb>(DEBUG_LOCATION, "Helper"));
849 OrphanablePtr<LoadBalancingPolicy> lb_policy =
850 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
851 &grpc_lb_xds_override_host_trace);
852 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
853 gpr_log(GPR_INFO,
854 "[xds_override_host_lb %p] Created new child policy handler %p",
855 this, lb_policy.get());
856 }
857 // Add our interested_parties pollset_set to that of the newly created
858 // child policy. This will make the child policy progress upon activity on
859 // this policy, which in turn is tied to the application's call.
860 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
861 interested_parties());
862 return lb_policy;
863 }
864
UpdateAddressMap(const EndpointAddressesIterator & endpoints)865 void XdsOverrideHostLb::UpdateAddressMap(
866 const EndpointAddressesIterator& endpoints) {
867 // Construct a map of address info from which to update subchannel_map_.
868 struct AddressInfo {
869 XdsHealthStatus eds_health_status;
870 RefCountedStringValue address_list;
871 AddressInfo(XdsHealthStatus status, RefCountedStringValue addresses)
872 : eds_health_status(status), address_list(std::move(addresses)) {}
873 };
874 std::map<const std::string, AddressInfo> addresses_for_map;
875 endpoints.ForEach([&](const EndpointAddresses& endpoint) {
876 XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
877 // Skip draining hosts if not in the override status set.
878 if (status.status() == XdsHealthStatus::kDraining &&
879 !override_host_status_set_.Contains(status)) {
880 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
881 gpr_log(GPR_INFO,
882 "[xds_override_host_lb %p] endpoint %s: draining but not in "
883 "override_host_status set -- ignoring",
884 this, endpoint.ToString().c_str());
885 }
886 return;
887 }
888 std::vector<std::string> addresses;
889 addresses.reserve(endpoint.addresses().size());
890 for (const auto& address : endpoint.addresses()) {
891 auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
892 if (!key.ok()) {
893 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
894 gpr_log(GPR_INFO,
895 "[xds_override_host_lb %p] no key for endpoint address; "
896 "not adding to map",
897 this);
898 }
899 } else {
900 addresses.push_back(*std::move(key));
901 }
902 }
903 absl::Span<const std::string> addresses_span = addresses;
904 for (size_t i = 0; i < addresses.size(); ++i) {
905 std::string start = absl::StrJoin(addresses_span.subspan(0, i), ",");
906 std::string end = absl::StrJoin(addresses_span.subspan(i + 1), ",");
907 RefCountedStringValue address_list(
908 absl::StrCat(addresses[i], (start.empty() ? "" : ","), start,
909 (end.empty() ? "" : ","), end));
910 addresses_for_map.emplace(
911 std::piecewise_construct, std::forward_as_tuple(addresses[i]),
912 std::forward_as_tuple(status, std::move(address_list)));
913 }
914 });
915 // Now grab the lock and update subchannel_map_ from addresses_for_map.
916 const Timestamp now = Timestamp::Now();
917 const Timestamp idle_threshold = now - connection_idle_timeout_;
918 Duration next_time = connection_idle_timeout_;
919 {
920 // Drop subchannel refs after releasing the lock to avoid deadlock.
921 std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
922 MutexLock lock(&mu_);
923 for (auto it = subchannel_map_.begin(); it != subchannel_map_.end();) {
924 if (addresses_for_map.find(it->first) == addresses_for_map.end()) {
925 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
926 gpr_log(GPR_INFO, "[xds_override_host_lb %p] removing map key %s",
927 this, it->first.c_str());
928 }
929 it->second->UnsetSubchannel(&subchannel_refs_to_drop);
930 it = subchannel_map_.erase(it);
931 } else {
932 ++it;
933 }
934 }
935 for (auto& p : addresses_for_map) {
936 const auto& address = p.first;
937 auto& address_info = p.second;
938 auto it = subchannel_map_.find(address);
939 if (it == subchannel_map_.end()) {
940 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
941 gpr_log(GPR_INFO, "[xds_override_host_lb %p] adding map key %s", this,
942 address.c_str());
943 }
944 it = subchannel_map_.emplace(address, MakeRefCounted<SubchannelEntry>())
945 .first;
946 }
947 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
948 gpr_log(GPR_INFO,
949 "[xds_override_host_lb %p] map key %s: setting "
950 "eds_health_status=%s address_list=%s",
951 this, address.c_str(),
952 address_info.eds_health_status.ToString(),
953 address_info.address_list.c_str());
954 }
955 it->second->set_eds_health_status(address_info.eds_health_status);
956 it->second->set_address_list(std::move(address_info.address_list));
957 // Check the entry's last_used_time to determine the next time at
958 // which the timer needs to run.
959 if (it->second->last_used_time() > idle_threshold) {
960 const Duration next_time_for_entry =
961 it->second->last_used_time() + connection_idle_timeout_ - now;
962 next_time = std::min(next_time, next_time_for_entry);
963 }
964 }
965 }
966 idle_timer_ =
967 MakeOrphanable<IdleTimer>(RefAsSubclass<XdsOverrideHostLb>(), next_time);
968 }
969
970 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
AdoptSubchannel(const grpc_resolved_address & address,RefCountedPtr<SubchannelInterface> subchannel)971 XdsOverrideHostLb::AdoptSubchannel(
972 const grpc_resolved_address& address,
973 RefCountedPtr<SubchannelInterface> subchannel) {
974 auto wrapper = MakeRefCounted<SubchannelWrapper>(
975 std::move(subchannel), RefAsSubclass<XdsOverrideHostLb>());
976 auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
977 if (key.ok()) {
978 // Drop ref to previously owned subchannel (if any) after releasing
979 // the lock.
980 RefCountedPtr<SubchannelWrapper> subchannel_ref_to_drop;
981 MutexLock lock(&mu_);
982 auto it = subchannel_map_.find(*key);
983 if (it != subchannel_map_.end()) {
984 wrapper->set_subchannel_entry(it->second);
985 subchannel_ref_to_drop = it->second->SetUnownedSubchannel(wrapper.get());
986 }
987 }
988 return wrapper;
989 }
990
CreateSubchannelForAddress(absl::string_view address)991 void XdsOverrideHostLb::CreateSubchannelForAddress(absl::string_view address) {
992 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
993 gpr_log(GPR_INFO,
994 "[xds_override_host_lb %p] creating owned subchannel for %s", this,
995 std::string(address).c_str());
996 }
997 auto addr = StringToSockaddr(address);
998 GPR_ASSERT(addr.ok());
999 // Note: We don't currently have any cases where per_address_args need to
1000 // be passed through. If we encounter any such cases in the future, we
1001 // will need to change this to store those attributes from the resolver
1002 // update in the map entry.
1003 auto subchannel = channel_control_helper()->CreateSubchannel(
1004 *addr, /*per_address_args=*/ChannelArgs(), args_);
1005 auto wrapper = MakeRefCounted<SubchannelWrapper>(
1006 std::move(subchannel), RefAsSubclass<XdsOverrideHostLb>());
1007 {
1008 MutexLock lock(&mu_);
1009 auto it = subchannel_map_.find(address);
1010 // This can happen if the map entry was removed between the time that
1011 // the picker requested the subchannel creation and the time that we got
1012 // here. In that case, we can just make it a no-op, since the update
1013 // that removed the entry will have generated a new picker already.
1014 if (it == subchannel_map_.end()) return;
1015 // This can happen if the picker requests subchannel creation for
1016 // the same address multiple times.
1017 if (it->second->HasOwnedSubchannel()) return;
1018 wrapper->set_subchannel_entry(it->second);
1019 it->second->SetOwnedSubchannel(std::move(wrapper));
1020 }
1021 MaybeUpdatePickerLocked();
1022 }
1023
CleanupSubchannels()1024 void XdsOverrideHostLb::CleanupSubchannels() {
1025 const Timestamp now = Timestamp::Now();
1026 const Timestamp idle_threshold = now - connection_idle_timeout_;
1027 Duration next_time = connection_idle_timeout_;
1028 std::vector<RefCountedPtr<SubchannelWrapper>> subchannel_refs_to_drop;
1029 {
1030 MutexLock lock(&mu_);
1031 if (subchannel_map_.empty()) return;
1032 for (const auto& p : subchannel_map_) {
1033 if (p.second->last_used_time() <= idle_threshold) {
1034 auto subchannel = p.second->TakeOwnedSubchannel();
1035 if (subchannel != nullptr) {
1036 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1037 gpr_log(GPR_INFO,
1038 "[xds_override_host_lb %p] dropping subchannel for %s",
1039 this, p.first.c_str());
1040 }
1041 subchannel_refs_to_drop.push_back(std::move(subchannel));
1042 }
1043 } else {
1044 // Not dropping the subchannel. Check the entry's last_used_time to
1045 // determine the next time at which the timer needs to run.
1046 const Duration next_time_for_entry =
1047 p.second->last_used_time() + connection_idle_timeout_ - now;
1048 next_time = std::min(next_time, next_time_for_entry);
1049 }
1050 }
1051 }
1052 idle_timer_ =
1053 MakeOrphanable<IdleTimer>(RefAsSubclass<XdsOverrideHostLb>(), next_time);
1054 }
1055
1056 //
1057 // XdsOverrideHostLb::Helper
1058 //
1059
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)1060 RefCountedPtr<SubchannelInterface> XdsOverrideHostLb::Helper::CreateSubchannel(
1061 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
1062 const ChannelArgs& args) {
1063 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1064 auto key = grpc_sockaddr_to_string(&address, /*normalize=*/false);
1065 gpr_log(GPR_INFO,
1066 "[xds_override_host_lb %p] creating subchannel for %s, "
1067 "per_address_args=%s, args=%s",
1068 this, key.value_or("<unknown>").c_str(),
1069 per_address_args.ToString().c_str(), args.ToString().c_str());
1070 }
1071 auto subchannel = parent()->channel_control_helper()->CreateSubchannel(
1072 address, per_address_args, args);
1073 return parent()->AdoptSubchannel(address, std::move(subchannel));
1074 }
1075
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)1076 void XdsOverrideHostLb::Helper::UpdateState(
1077 grpc_connectivity_state state, const absl::Status& status,
1078 RefCountedPtr<SubchannelPicker> picker) {
1079 if (parent()->shutting_down_) return;
1080 // Save the state and picker.
1081 parent()->state_ = state;
1082 parent()->status_ = status;
1083 parent()->picker_ = std::move(picker);
1084 // Wrap the picker and return it to the channel.
1085 parent()->MaybeUpdatePickerLocked();
1086 }
1087
1088 //
1089 // XdsOverrideHostLb::SubchannelWrapper
1090 //
1091
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,RefCountedPtr<XdsOverrideHostLb> policy)1092 XdsOverrideHostLb::SubchannelWrapper::SubchannelWrapper(
1093 RefCountedPtr<SubchannelInterface> subchannel,
1094 RefCountedPtr<XdsOverrideHostLb> policy)
1095 : DelegatingSubchannel(std::move(subchannel)), policy_(std::move(policy)) {
1096 auto watcher = std::make_unique<ConnectivityStateWatcher>(
1097 WeakRefAsSubclass<SubchannelWrapper>());
1098 watcher_ = watcher.get();
1099 wrapped_subchannel()->WatchConnectivityState(std::move(watcher));
1100 }
1101
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)1102 void XdsOverrideHostLb::SubchannelWrapper::WatchConnectivityState(
1103 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
1104 watchers_.insert(std::move(watcher));
1105 }
1106
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)1107 void XdsOverrideHostLb::SubchannelWrapper::CancelConnectivityStateWatch(
1108 ConnectivityStateWatcherInterface* watcher) {
1109 auto it = watchers_.find(watcher);
1110 if (it != watchers_.end()) {
1111 watchers_.erase(it);
1112 }
1113 }
1114
Orphaned()1115 void XdsOverrideHostLb::SubchannelWrapper::Orphaned() {
1116 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1117 gpr_log(GPR_INFO,
1118 "[xds_override_host_lb %p] subchannel wrapper %p orphaned",
1119 policy_.get(), this);
1120 }
1121 if (!IsWorkSerializerDispatchEnabled()) {
1122 wrapped_subchannel()->CancelConnectivityStateWatch(watcher_);
1123 if (subchannel_entry_ != nullptr) {
1124 MutexLock lock(&policy()->mu_);
1125 subchannel_entry_->OnSubchannelWrapperOrphan(
1126 this, policy()->connection_idle_timeout_);
1127 }
1128 return;
1129 }
1130 policy()->work_serializer()->Run(
1131 [self = WeakRefAsSubclass<SubchannelWrapper>()]() {
1132 self->wrapped_subchannel()->CancelConnectivityStateWatch(
1133 self->watcher_);
1134 if (self->subchannel_entry_ != nullptr) {
1135 MutexLock lock(&self->policy()->mu_);
1136 self->subchannel_entry_->OnSubchannelWrapperOrphan(
1137 self.get(), self->policy()->connection_idle_timeout_);
1138 }
1139 },
1140 DEBUG_LOCATION);
1141 }
1142
UpdateConnectivityState(grpc_connectivity_state state,absl::Status status)1143 void XdsOverrideHostLb::SubchannelWrapper::UpdateConnectivityState(
1144 grpc_connectivity_state state, absl::Status status) {
1145 bool update_picker = false;
1146 if (subchannel_entry_ != nullptr) {
1147 MutexLock lock(&policy()->mu_);
1148 if (subchannel_entry_->connectivity_state() != state) {
1149 subchannel_entry_->set_connectivity_state(state);
1150 update_picker = subchannel_entry_->HasOwnedSubchannel() &&
1151 subchannel_entry_->GetSubchannel() == this;
1152 }
1153 }
1154 // Sending connectivity state notifications to the watchers may cause the set
1155 // of watchers to change, so we can't be iterating over the set of watchers
1156 // while we send the notifications
1157 std::vector<ConnectivityStateWatcherInterface*> watchers;
1158 watchers.reserve(watchers_.size());
1159 for (const auto& watcher : watchers_) {
1160 watchers.push_back(watcher.get());
1161 }
1162 for (const auto& watcher : watchers) {
1163 if (watchers_.find(watcher) != watchers_.end()) {
1164 watcher->OnConnectivityStateChange(state, status);
1165 }
1166 }
1167 if (update_picker) policy()->MaybeUpdatePickerLocked();
1168 }
1169
1170 //
1171 // XdsOverrideHostLb::SubchannelEntry
1172 //
1173
1174 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
SetUnownedSubchannel(SubchannelWrapper * subchannel)1175 XdsOverrideHostLb::SubchannelEntry::SetUnownedSubchannel(
1176 SubchannelWrapper* subchannel)
1177 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1178 auto owned_subchannel = TakeOwnedSubchannel();
1179 subchannel_ = subchannel;
1180 return owned_subchannel;
1181 }
1182
1183 XdsOverrideHostLb::SubchannelWrapper*
GetSubchannel() const1184 XdsOverrideHostLb::SubchannelEntry::GetSubchannel() const
1185 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1186 return Match(
1187 subchannel_, [](SubchannelWrapper* subchannel) { return subchannel; },
1188 [](const RefCountedPtr<SubchannelWrapper>& subchannel) {
1189 return subchannel.get();
1190 });
1191 }
1192
1193 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
GetSubchannelRef() const1194 XdsOverrideHostLb::SubchannelEntry::GetSubchannelRef() const
1195 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1196 auto* sc = GetSubchannel();
1197 if (sc == nullptr) return nullptr;
1198 return sc->RefIfNonZero().TakeAsSubclass<SubchannelWrapper>();
1199 }
1200
1201 RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>
TakeOwnedSubchannel()1202 XdsOverrideHostLb::SubchannelEntry::TakeOwnedSubchannel()
1203 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1204 return MatchMutable(
1205 &subchannel_,
1206 [](SubchannelWrapper**) -> RefCountedPtr<SubchannelWrapper> {
1207 return nullptr;
1208 },
1209 [](RefCountedPtr<SubchannelWrapper>* subchannel) {
1210 return std::move(*subchannel);
1211 });
1212 }
1213
UnsetSubchannel(std::vector<RefCountedPtr<SubchannelWrapper>> * owned_subchannels)1214 void XdsOverrideHostLb::SubchannelEntry::UnsetSubchannel(
1215 std::vector<RefCountedPtr<SubchannelWrapper>>* owned_subchannels)
1216 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1217 auto subchannel = TakeOwnedSubchannel();
1218 if (subchannel != nullptr) {
1219 owned_subchannels->push_back(std::move(subchannel));
1220 }
1221 subchannel_ = nullptr;
1222 }
1223
OnSubchannelWrapperOrphan(SubchannelWrapper * wrapper,Duration connection_idle_timeout)1224 void XdsOverrideHostLb::SubchannelEntry::OnSubchannelWrapperOrphan(
1225 SubchannelWrapper* wrapper, Duration connection_idle_timeout)
1226 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsOverrideHostLb::mu_) {
1227 auto* subchannel = GetSubchannel();
1228 if (subchannel != wrapper) return;
1229 if (last_used_time_ < (Timestamp::Now() - connection_idle_timeout)) {
1230 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1231 gpr_log(GPR_INFO,
1232 "[xds_override_host_lb] removing unowned subchannel wrapper %p",
1233 subchannel);
1234 }
1235 subchannel_ = nullptr;
1236 } else {
1237 // The subchannel is being released by the child policy, but it
1238 // is still within its idle timeout, so we make a new copy of
1239 // the wrapper with the same underlying subchannel, and we hold
1240 // our own ref to it.
1241 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
1242 gpr_log(GPR_INFO,
1243 "[xds_override_host_lb] subchannel wrapper %p: cloning "
1244 "to gain ownership",
1245 subchannel);
1246 }
1247 subchannel_ = wrapper->Clone();
1248 }
1249 }
1250
1251 //
1252 // factory
1253 //
1254
1255 class XdsOverrideHostLbFactory final : public LoadBalancingPolicyFactory {
1256 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1257 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1258 LoadBalancingPolicy::Args args) const override {
1259 return MakeOrphanable<XdsOverrideHostLb>(std::move(args));
1260 }
1261
name() const1262 absl::string_view name() const override {
1263 return XdsOverrideHostLbConfig::Name();
1264 }
1265
1266 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const1267 ParseLoadBalancingConfig(const Json& json) const override {
1268 return LoadFromJson<RefCountedPtr<XdsOverrideHostLbConfig>>(
1269 json, JsonArgs(),
1270 "errors validating xds_override_host LB policy config");
1271 }
1272 };
1273
1274 } // namespace
1275
RegisterXdsOverrideHostLbPolicy(CoreConfiguration::Builder * builder)1276 void RegisterXdsOverrideHostLbPolicy(CoreConfiguration::Builder* builder) {
1277 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
1278 std::make_unique<XdsOverrideHostLbFactory>());
1279 }
1280
1281 //
1282 // XdsOverrideHostLbConfig
1283 //
1284
JsonLoader(const JsonArgs &)1285 const JsonLoaderInterface* XdsOverrideHostLbConfig::JsonLoader(
1286 const JsonArgs&) {
1287 static const auto kJsonLoader =
1288 JsonObjectLoader<XdsOverrideHostLbConfig>()
1289 // Child policy config is parsed in JsonPostLoad
1290 .Field("clusterName", &XdsOverrideHostLbConfig::cluster_name_)
1291 .Finish();
1292 return kJsonLoader;
1293 }
1294
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)1295 void XdsOverrideHostLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
1296 ValidationErrors* errors) {
1297 ValidationErrors::ScopedField field(errors, ".childPolicy");
1298 auto it = json.object().find("childPolicy");
1299 if (it == json.object().end()) {
1300 errors->AddError("field not present");
1301 } else {
1302 auto child_policy_config =
1303 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1304 it->second);
1305 if (!child_policy_config.ok()) {
1306 errors->AddError(child_policy_config.status().message());
1307 } else {
1308 child_config_ = std::move(*child_policy_config);
1309 }
1310 }
1311 }
1312
1313 } // namespace grpc_core
1314