xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static io.grpc.ConnectivityState.CONNECTING;
21 import static io.grpc.ConnectivityState.IDLE;
22 import static io.grpc.ConnectivityState.READY;
23 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24 import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
25 
26 import io.grpc.ConnectivityState;
27 import io.grpc.InternalLogId;
28 import io.grpc.LoadBalancer;
29 import io.grpc.LoadBalancerProvider;
30 import io.grpc.Status;
31 import io.grpc.SynchronizationContext;
32 import io.grpc.SynchronizationContext.ScheduledHandle;
33 import io.grpc.internal.ServiceConfigUtil.PolicySelection;
34 import io.grpc.util.ForwardingLoadBalancerHelper;
35 import io.grpc.util.GracefulSwitchLoadBalancer;
36 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
37 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
38 import io.grpc.xds.XdsLogger.XdsLogLevel;
39 import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Objects;
47 import java.util.Set;
48 import java.util.concurrent.ScheduledExecutorService;
49 import java.util.concurrent.TimeUnit;
50 import javax.annotation.Nullable;
51 
52 /**
53  * Load balancer for priority policy. A <em>priority</em> represents a logical entity within a
54  * cluster for load balancing purposes.
55  */
56 final class PriorityLoadBalancer extends LoadBalancer {
57   private final Helper helper;
58   private final SynchronizationContext syncContext;
59   private final ScheduledExecutorService executor;
60   private final XdsLogger logger;
61 
62   // Includes all active and deactivated children. Mutable. New entries are only added from priority
63   // 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation.
64   // Note that calling into a child can cause the child to call back into the LB policy and modify
65   // the map.  Therefore copy values before looping over them.
66   private final Map<String, ChildLbState> children = new HashMap<>();
67 
68   // Following fields are only null initially.
69   private ResolvedAddresses resolvedAddresses;
70   // List of priority names in order.
71   private List<String> priorityNames;
72   // Config for each priority.
73   private Map<String, PriorityChildConfig> priorityConfigs;
74   @Nullable private String currentPriority;
75   private ConnectivityState currentConnectivityState;
76   private SubchannelPicker currentPicker;
77   // Set to true if currently in the process of handling resolved addresses.
78   private boolean handlingResolvedAddresses;
79 
PriorityLoadBalancer(Helper helper)80   PriorityLoadBalancer(Helper helper) {
81     this.helper = checkNotNull(helper, "helper");
82     syncContext = helper.getSynchronizationContext();
83     executor = helper.getScheduledExecutorService();
84     InternalLogId logId = InternalLogId.allocate("priority-lb", helper.getAuthority());
85     logger = XdsLogger.withLogId(logId);
86     logger.log(XdsLogLevel.INFO, "Created");
87   }
88 
89   @Override
acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)90   public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
91     logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
92     this.resolvedAddresses = resolvedAddresses;
93     PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
94     checkNotNull(config, "missing priority lb config");
95     priorityNames = config.priorities;
96     priorityConfigs = config.childConfigs;
97     Set<String> prioritySet = new HashSet<>(config.priorities);
98     ArrayList<String> childKeys = new ArrayList<>(children.keySet());
99     for (String priority : childKeys) {
100       if (!prioritySet.contains(priority)) {
101         ChildLbState childLbState = children.get(priority);
102         if (childLbState != null) {
103           childLbState.deactivate();
104         }
105       }
106     }
107     handlingResolvedAddresses = true;
108     for (String priority : priorityNames) {
109       ChildLbState childLbState = children.get(priority);
110       if (childLbState != null) {
111         childLbState.updateResolvedAddresses();
112       }
113     }
114     handlingResolvedAddresses = false;
115     tryNextPriority();
116     return true;
117   }
118 
119   @Override
handleNameResolutionError(Status error)120   public void handleNameResolutionError(Status error) {
121     logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
122     boolean gotoTransientFailure = true;
123     Collection<ChildLbState> childValues = new ArrayList<>(children.values());
124     for (ChildLbState child : childValues) {
125       if (priorityNames.contains(child.priority)) {
126         child.lb.handleNameResolutionError(error);
127         gotoTransientFailure = false;
128       }
129     }
130     if (gotoTransientFailure) {
131       updateOverallState(null, TRANSIENT_FAILURE, new ErrorPicker(error));
132     }
133   }
134 
135   @Override
shutdown()136   public void shutdown() {
137     logger.log(XdsLogLevel.INFO, "Shutdown");
138     Collection<ChildLbState> childValues = new ArrayList<>(children.values());
139     for (ChildLbState child : childValues) {
140       child.tearDown();
141     }
142     children.clear();
143   }
144 
tryNextPriority()145   private void tryNextPriority() {
146     for (int i = 0; i < priorityNames.size(); i++) {
147       String priority = priorityNames.get(i);
148       if (!children.containsKey(priority)) {
149         ChildLbState child =
150             new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution);
151         children.put(priority, child);
152         updateOverallState(priority, CONNECTING, BUFFER_PICKER);
153         // Calling the child's updateResolvedAddresses() can result in tryNextPriority() being
154         // called recursively. We need to be sure to be done with processing here before it is
155         // called.
156         child.updateResolvedAddresses();
157         return; // Give priority i time to connect.
158       }
159       ChildLbState child = children.get(priority);
160       child.reactivate();
161       if (child.connectivityState.equals(READY) || child.connectivityState.equals(IDLE)) {
162         logger.log(XdsLogLevel.DEBUG, "Shifted to priority {0}", priority);
163         updateOverallState(priority, child.connectivityState, child.picker);
164         for (int j = i + 1; j < priorityNames.size(); j++) {
165           String p = priorityNames.get(j);
166           if (children.containsKey(p)) {
167             children.get(p).deactivate();
168           }
169         }
170         return;
171       }
172       if (child.failOverTimer != null && child.failOverTimer.isPending()) {
173         updateOverallState(priority, child.connectivityState, child.picker);
174         return; // Give priority i time to connect.
175       }
176       if (priority.equals(currentPriority) && child.connectivityState != TRANSIENT_FAILURE) {
177         // If the current priority is not changed into TRANSIENT_FAILURE, keep using it.
178         updateOverallState(priority, child.connectivityState, child.picker);
179         return;
180       }
181     }
182     // TODO(zdapeng): Include error details of each priority.
183     logger.log(XdsLogLevel.DEBUG, "All priority failed");
184     String lastPriority = priorityNames.get(priorityNames.size() - 1);
185     SubchannelPicker errorPicker = children.get(lastPriority).picker;
186     updateOverallState(lastPriority, TRANSIENT_FAILURE, errorPicker);
187   }
188 
updateOverallState( @ullable String priority, ConnectivityState state, SubchannelPicker picker)189   private void updateOverallState(
190       @Nullable String priority, ConnectivityState state, SubchannelPicker picker) {
191     if (!Objects.equals(priority, currentPriority) || !state.equals(currentConnectivityState)
192         || !picker.equals(currentPicker)) {
193       currentPriority = priority;
194       currentConnectivityState = state;
195       currentPicker = picker;
196       helper.updateBalancingState(state, picker);
197     }
198   }
199 
200   private final class ChildLbState {
201     final String priority;
202     final ChildHelper childHelper;
203     final GracefulSwitchLoadBalancer lb;
204     // Timer to fail over to the next priority if not connected in 10 sec. Scheduled only once at
205     // child initialization.
206     ScheduledHandle failOverTimer;
207     boolean seenReadyOrIdleSinceTransientFailure = false;
208     // Timer to delay shutdown and deletion of the priority. Scheduled whenever the child is
209     // deactivated.
210     @Nullable ScheduledHandle deletionTimer;
211     @Nullable String policy;
212     ConnectivityState connectivityState = CONNECTING;
213     SubchannelPicker picker = BUFFER_PICKER;
214 
ChildLbState(final String priority, boolean ignoreReresolution)215     ChildLbState(final String priority, boolean ignoreReresolution) {
216       this.priority = priority;
217       childHelper = new ChildHelper(ignoreReresolution);
218       lb = new GracefulSwitchLoadBalancer(childHelper);
219       failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, executor);
220       logger.log(XdsLogLevel.DEBUG, "Priority created: {0}", priority);
221     }
222 
223     final class FailOverTask implements Runnable {
224       @Override
run()225       public void run() {
226         if (deletionTimer != null && deletionTimer.isPending()) {
227           // The child is deactivated.
228           return;
229         }
230         picker = new ErrorPicker(
231             Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority));
232         logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority);
233         currentPriority = null; // reset currentPriority to guarantee failover happen
234         tryNextPriority();
235       }
236     }
237 
238     /**
239      * Called when the child becomes a priority that is or appears before the first READY one in the
240      * {@code priorities} list, due to either config update or balancing state update.
241      */
reactivate()242     void reactivate() {
243       if (deletionTimer != null && deletionTimer.isPending()) {
244         deletionTimer.cancel();
245         logger.log(XdsLogLevel.DEBUG, "Priority reactivated: {0}", priority);
246       }
247     }
248 
249     /**
250      * Called when either the child is removed by config update, or a higher priority becomes READY.
251      */
deactivate()252     void deactivate() {
253       if (deletionTimer != null && deletionTimer.isPending()) {
254         return;
255       }
256 
257       class DeletionTask implements Runnable {
258         @Override
259         public void run() {
260           tearDown();
261           children.remove(priority);
262         }
263       }
264 
265       deletionTimer = syncContext.schedule(new DeletionTask(), 15, TimeUnit.MINUTES, executor);
266       logger.log(XdsLogLevel.DEBUG, "Priority deactivated: {0}", priority);
267     }
268 
tearDown()269     void tearDown() {
270       if (failOverTimer.isPending()) {
271         failOverTimer.cancel();
272       }
273       if (deletionTimer != null && deletionTimer.isPending()) {
274         deletionTimer.cancel();
275       }
276       lb.shutdown();
277       logger.log(XdsLogLevel.DEBUG, "Priority deleted: {0}", priority);
278     }
279 
280     /**
281      * Called either when the child is just created and in this case updated with the cached {@code
282      * resolvedAddresses}, or when priority lb receives a new resolved addresses while the child
283      * already exists.
284      */
updateResolvedAddresses()285     void updateResolvedAddresses() {
286       PriorityLbConfig config =
287           (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
288       PolicySelection childPolicySelection = config.childConfigs.get(priority).policySelection;
289       LoadBalancerProvider lbProvider = childPolicySelection.getProvider();
290       String newPolicy = lbProvider.getPolicyName();
291       if (!newPolicy.equals(policy)) {
292         policy = newPolicy;
293         lb.switchTo(lbProvider);
294       }
295       lb.handleResolvedAddresses(
296           resolvedAddresses.toBuilder()
297               .setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), priority))
298               .setLoadBalancingPolicyConfig(childPolicySelection.getConfig())
299               .build());
300     }
301 
302     final class ChildHelper extends ForwardingLoadBalancerHelper {
303       private final boolean ignoreReresolution;
304 
ChildHelper(boolean ignoreReresolution)305       ChildHelper(boolean ignoreReresolution) {
306         this.ignoreReresolution = ignoreReresolution;
307       }
308 
309       @Override
refreshNameResolution()310       public void refreshNameResolution() {
311         if (!ignoreReresolution) {
312           delegate().refreshNameResolution();
313         }
314       }
315 
316       @Override
updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker)317       public void updateBalancingState(final ConnectivityState newState,
318           final SubchannelPicker newPicker) {
319         if (!children.containsKey(priority)) {
320           return;
321         }
322         connectivityState = newState;
323         picker = newPicker;
324 
325         if (deletionTimer != null && deletionTimer.isPending()) {
326           return;
327         }
328         if (newState.equals(CONNECTING)) {
329           if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) {
330             failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS,
331                 executor);
332           }
333         } else if (newState.equals(READY) || newState.equals(IDLE)) {
334           seenReadyOrIdleSinceTransientFailure = true;
335           failOverTimer.cancel();
336         } else if (newState.equals(TRANSIENT_FAILURE)) {
337           seenReadyOrIdleSinceTransientFailure = false;
338           failOverTimer.cancel();
339         }
340 
341         // If we are currently handling newly resolved addresses, let's not try to reconfigure as
342         // the address handling process will take care of that to provide an atomic config update.
343         if (!handlingResolvedAddresses) {
344           tryNextPriority();
345         }
346       }
347 
348       @Override
delegate()349       protected Helper delegate() {
350         return helper;
351       }
352     }
353   }
354 }
355