1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/load_balancing/endpoint_list.h"
20
21 #include <stdlib.h>
22
23 #include <memory>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/types/optional.h"
30
31 #include <grpc/impl/connectivity_state.h>
32 #include <grpc/support/json.h>
33 #include <grpc/support/log.h>
34
35 #include "src/core/load_balancing/pick_first/pick_first.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/config/core_configuration.h"
38 #include "src/core/lib/gprpp/debug_location.h"
39 #include "src/core/lib/gprpp/orphanable.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/pollset_set.h"
42 #include "src/core/lib/json/json.h"
43 #include "src/core/load_balancing/delegating_helper.h"
44 #include "src/core/load_balancing/lb_policy.h"
45 #include "src/core/load_balancing/lb_policy_registry.h"
46 #include "src/core/resolver/endpoint_addresses.h"
47
48 namespace grpc_core {
49
50 //
51 // EndpointList::Endpoint::Helper
52 //
53
54 class EndpointList::Endpoint::Helper final
55 : public LoadBalancingPolicy::DelegatingChannelControlHelper {
56 public:
Helper(RefCountedPtr<Endpoint> endpoint)57 explicit Helper(RefCountedPtr<Endpoint> endpoint)
58 : endpoint_(std::move(endpoint)) {}
59
~Helper()60 ~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); }
61
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)62 RefCountedPtr<SubchannelInterface> CreateSubchannel(
63 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
64 const ChannelArgs& args) override {
65 return endpoint_->CreateSubchannel(address, per_address_args, args);
66 }
67
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)68 void UpdateState(
69 grpc_connectivity_state state, const absl::Status& status,
70 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
71 auto old_state = std::exchange(endpoint_->connectivity_state_, state);
72 if (!old_state.has_value()) {
73 ++endpoint_->endpoint_list_->num_endpoints_seen_initial_state_;
74 }
75 endpoint_->picker_ = std::move(picker);
76 endpoint_->OnStateUpdate(old_state, state, status);
77 }
78
79 private:
parent_helper() const80 LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override {
81 return endpoint_->endpoint_list_->channel_control_helper();
82 }
83
84 RefCountedPtr<Endpoint> endpoint_;
85 };
86
87 //
88 // EndpointList::Endpoint
89 //
90
Init(const EndpointAddresses & addresses,const ChannelArgs & args,std::shared_ptr<WorkSerializer> work_serializer)91 void EndpointList::Endpoint::Init(
92 const EndpointAddresses& addresses, const ChannelArgs& args,
93 std::shared_ptr<WorkSerializer> work_serializer) {
94 ChannelArgs child_args =
95 args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)
96 .Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true);
97 LoadBalancingPolicy::Args lb_policy_args;
98 lb_policy_args.work_serializer = std::move(work_serializer);
99 lb_policy_args.args = child_args;
100 lb_policy_args.channel_control_helper =
101 std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
102 child_policy_ =
103 CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
104 "pick_first", std::move(lb_policy_args));
105 if (GPR_UNLIKELY(endpoint_list_->tracer_ != nullptr)) {
106 gpr_log(GPR_INFO, "[%s %p] endpoint %p: created child policy %p",
107 endpoint_list_->tracer_, endpoint_list_->policy_.get(), this,
108 child_policy_.get());
109 }
110 // Add our interested_parties pollset_set to that of the newly created
111 // child policy. This will make the child policy progress upon activity on
112 // this policy, which in turn is tied to the application's call.
113 grpc_pollset_set_add_pollset_set(
114 child_policy_->interested_parties(),
115 endpoint_list_->policy_->interested_parties());
116 // Construct pick_first config.
117 auto config =
118 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
119 Json::FromArray(
120 {Json::FromObject({{"pick_first", Json::FromObject({})}})}));
121 GPR_ASSERT(config.ok());
122 // Update child policy.
123 LoadBalancingPolicy::UpdateArgs update_args;
124 update_args.addresses = std::make_shared<SingleEndpointIterator>(addresses);
125 update_args.args = child_args;
126 update_args.config = std::move(*config);
127 // TODO(roth): If the child reports a non-OK status with the update,
128 // we need to propagate that back to the resolver somehow.
129 (void)child_policy_->UpdateLocked(std::move(update_args));
130 }
131
Orphan()132 void EndpointList::Endpoint::Orphan() {
133 // Remove pollset_set linkage.
134 grpc_pollset_set_del_pollset_set(
135 child_policy_->interested_parties(),
136 endpoint_list_->policy_->interested_parties());
137 child_policy_.reset();
138 picker_.reset();
139 Unref();
140 }
141
ResetBackoffLocked()142 void EndpointList::Endpoint::ResetBackoffLocked() {
143 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
144 }
145
ExitIdleLocked()146 void EndpointList::Endpoint::ExitIdleLocked() {
147 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
148 }
149
Index() const150 size_t EndpointList::Endpoint::Index() const {
151 for (size_t i = 0; i < endpoint_list_->endpoints_.size(); ++i) {
152 if (endpoint_list_->endpoints_[i].get() == this) return i;
153 }
154 return -1;
155 }
156
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)157 RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel(
158 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
159 const ChannelArgs& args) {
160 return endpoint_list_->channel_control_helper()->CreateSubchannel(
161 address, per_address_args, args);
162 }
163
164 //
165 // EndpointList
166 //
167
Init(EndpointAddressesIterator * endpoints,const ChannelArgs & args,absl::FunctionRef<OrphanablePtr<Endpoint> (RefCountedPtr<EndpointList>,const EndpointAddresses &,const ChannelArgs &)> create_endpoint)168 void EndpointList::Init(
169 EndpointAddressesIterator* endpoints, const ChannelArgs& args,
170 absl::FunctionRef<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
171 const EndpointAddresses&,
172 const ChannelArgs&)>
173 create_endpoint) {
174 if (endpoints == nullptr) return;
175 endpoints->ForEach([&](const EndpointAddresses& endpoint) {
176 endpoints_.push_back(
177 create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), endpoint, args));
178 });
179 }
180
ResetBackoffLocked()181 void EndpointList::ResetBackoffLocked() {
182 for (const auto& endpoint : endpoints_) {
183 endpoint->ResetBackoffLocked();
184 }
185 }
186
187 } // namespace grpc_core
188