1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static io.grpc.ConnectivityState.CONNECTING; 21 import static io.grpc.ConnectivityState.IDLE; 22 import static io.grpc.ConnectivityState.READY; 23 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; 24 import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; 25 26 import io.grpc.ConnectivityState; 27 import io.grpc.InternalLogId; 28 import io.grpc.LoadBalancer; 29 import io.grpc.LoadBalancerProvider; 30 import io.grpc.Status; 31 import io.grpc.SynchronizationContext; 32 import io.grpc.SynchronizationContext.ScheduledHandle; 33 import io.grpc.internal.ServiceConfigUtil.PolicySelection; 34 import io.grpc.util.ForwardingLoadBalancerHelper; 35 import io.grpc.util.GracefulSwitchLoadBalancer; 36 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; 37 import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; 38 import io.grpc.xds.XdsLogger.XdsLogLevel; 39 import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; 40 import java.util.ArrayList; 41 import java.util.Collection; 42 import java.util.HashMap; 43 import java.util.HashSet; 44 import java.util.List; 45 import java.util.Map; 46 import java.util.Objects; 47 import java.util.Set; 48 import java.util.concurrent.ScheduledExecutorService; 49 import java.util.concurrent.TimeUnit; 50 import javax.annotation.Nullable; 51 52 /** 53 * Load balancer for priority policy. A <em>priority</em> represents a logical entity within a 54 * cluster for load balancing purposes. 55 */ 56 final class PriorityLoadBalancer extends LoadBalancer { 57 private final Helper helper; 58 private final SynchronizationContext syncContext; 59 private final ScheduledExecutorService executor; 60 private final XdsLogger logger; 61 62 // Includes all active and deactivated children. Mutable. New entries are only added from priority 63 // 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation. 64 // Note that calling into a child can cause the child to call back into the LB policy and modify 65 // the map. Therefore copy values before looping over them. 66 private final Map<String, ChildLbState> children = new HashMap<>(); 67 68 // Following fields are only null initially. 69 private ResolvedAddresses resolvedAddresses; 70 // List of priority names in order. 71 private List<String> priorityNames; 72 // Config for each priority. 73 private Map<String, PriorityChildConfig> priorityConfigs; 74 @Nullable private String currentPriority; 75 private ConnectivityState currentConnectivityState; 76 private SubchannelPicker currentPicker; 77 // Set to true if currently in the process of handling resolved addresses. 78 private boolean handlingResolvedAddresses; 79 PriorityLoadBalancer(Helper helper)80 PriorityLoadBalancer(Helper helper) { 81 this.helper = checkNotNull(helper, "helper"); 82 syncContext = helper.getSynchronizationContext(); 83 executor = helper.getScheduledExecutorService(); 84 InternalLogId logId = InternalLogId.allocate("priority-lb", helper.getAuthority()); 85 logger = XdsLogger.withLogId(logId); 86 logger.log(XdsLogLevel.INFO, "Created"); 87 } 88 89 @Override acceptResolvedAddresses(ResolvedAddresses resolvedAddresses)90 public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { 91 logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); 92 this.resolvedAddresses = resolvedAddresses; 93 PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); 94 checkNotNull(config, "missing priority lb config"); 95 priorityNames = config.priorities; 96 priorityConfigs = config.childConfigs; 97 Set<String> prioritySet = new HashSet<>(config.priorities); 98 ArrayList<String> childKeys = new ArrayList<>(children.keySet()); 99 for (String priority : childKeys) { 100 if (!prioritySet.contains(priority)) { 101 ChildLbState childLbState = children.get(priority); 102 if (childLbState != null) { 103 childLbState.deactivate(); 104 } 105 } 106 } 107 handlingResolvedAddresses = true; 108 for (String priority : priorityNames) { 109 ChildLbState childLbState = children.get(priority); 110 if (childLbState != null) { 111 childLbState.updateResolvedAddresses(); 112 } 113 } 114 handlingResolvedAddresses = false; 115 tryNextPriority(); 116 return true; 117 } 118 119 @Override handleNameResolutionError(Status error)120 public void handleNameResolutionError(Status error) { 121 logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); 122 boolean gotoTransientFailure = true; 123 Collection<ChildLbState> childValues = new ArrayList<>(children.values()); 124 for (ChildLbState child : childValues) { 125 if (priorityNames.contains(child.priority)) { 126 child.lb.handleNameResolutionError(error); 127 gotoTransientFailure = false; 128 } 129 } 130 if (gotoTransientFailure) { 131 updateOverallState(null, TRANSIENT_FAILURE, new ErrorPicker(error)); 132 } 133 } 134 135 @Override shutdown()136 public void shutdown() { 137 logger.log(XdsLogLevel.INFO, "Shutdown"); 138 Collection<ChildLbState> childValues = new ArrayList<>(children.values()); 139 for (ChildLbState child : childValues) { 140 child.tearDown(); 141 } 142 children.clear(); 143 } 144 tryNextPriority()145 private void tryNextPriority() { 146 for (int i = 0; i < priorityNames.size(); i++) { 147 String priority = priorityNames.get(i); 148 if (!children.containsKey(priority)) { 149 ChildLbState child = 150 new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution); 151 children.put(priority, child); 152 updateOverallState(priority, CONNECTING, BUFFER_PICKER); 153 // Calling the child's updateResolvedAddresses() can result in tryNextPriority() being 154 // called recursively. We need to be sure to be done with processing here before it is 155 // called. 156 child.updateResolvedAddresses(); 157 return; // Give priority i time to connect. 158 } 159 ChildLbState child = children.get(priority); 160 child.reactivate(); 161 if (child.connectivityState.equals(READY) || child.connectivityState.equals(IDLE)) { 162 logger.log(XdsLogLevel.DEBUG, "Shifted to priority {0}", priority); 163 updateOverallState(priority, child.connectivityState, child.picker); 164 for (int j = i + 1; j < priorityNames.size(); j++) { 165 String p = priorityNames.get(j); 166 if (children.containsKey(p)) { 167 children.get(p).deactivate(); 168 } 169 } 170 return; 171 } 172 if (child.failOverTimer != null && child.failOverTimer.isPending()) { 173 updateOverallState(priority, child.connectivityState, child.picker); 174 return; // Give priority i time to connect. 175 } 176 if (priority.equals(currentPriority) && child.connectivityState != TRANSIENT_FAILURE) { 177 // If the current priority is not changed into TRANSIENT_FAILURE, keep using it. 178 updateOverallState(priority, child.connectivityState, child.picker); 179 return; 180 } 181 } 182 // TODO(zdapeng): Include error details of each priority. 183 logger.log(XdsLogLevel.DEBUG, "All priority failed"); 184 String lastPriority = priorityNames.get(priorityNames.size() - 1); 185 SubchannelPicker errorPicker = children.get(lastPriority).picker; 186 updateOverallState(lastPriority, TRANSIENT_FAILURE, errorPicker); 187 } 188 updateOverallState( @ullable String priority, ConnectivityState state, SubchannelPicker picker)189 private void updateOverallState( 190 @Nullable String priority, ConnectivityState state, SubchannelPicker picker) { 191 if (!Objects.equals(priority, currentPriority) || !state.equals(currentConnectivityState) 192 || !picker.equals(currentPicker)) { 193 currentPriority = priority; 194 currentConnectivityState = state; 195 currentPicker = picker; 196 helper.updateBalancingState(state, picker); 197 } 198 } 199 200 private final class ChildLbState { 201 final String priority; 202 final ChildHelper childHelper; 203 final GracefulSwitchLoadBalancer lb; 204 // Timer to fail over to the next priority if not connected in 10 sec. Scheduled only once at 205 // child initialization. 206 ScheduledHandle failOverTimer; 207 boolean seenReadyOrIdleSinceTransientFailure = false; 208 // Timer to delay shutdown and deletion of the priority. Scheduled whenever the child is 209 // deactivated. 210 @Nullable ScheduledHandle deletionTimer; 211 @Nullable String policy; 212 ConnectivityState connectivityState = CONNECTING; 213 SubchannelPicker picker = BUFFER_PICKER; 214 ChildLbState(final String priority, boolean ignoreReresolution)215 ChildLbState(final String priority, boolean ignoreReresolution) { 216 this.priority = priority; 217 childHelper = new ChildHelper(ignoreReresolution); 218 lb = new GracefulSwitchLoadBalancer(childHelper); 219 failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, executor); 220 logger.log(XdsLogLevel.DEBUG, "Priority created: {0}", priority); 221 } 222 223 final class FailOverTask implements Runnable { 224 @Override run()225 public void run() { 226 if (deletionTimer != null && deletionTimer.isPending()) { 227 // The child is deactivated. 228 return; 229 } 230 picker = new ErrorPicker( 231 Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority)); 232 logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority); 233 currentPriority = null; // reset currentPriority to guarantee failover happen 234 tryNextPriority(); 235 } 236 } 237 238 /** 239 * Called when the child becomes a priority that is or appears before the first READY one in the 240 * {@code priorities} list, due to either config update or balancing state update. 241 */ reactivate()242 void reactivate() { 243 if (deletionTimer != null && deletionTimer.isPending()) { 244 deletionTimer.cancel(); 245 logger.log(XdsLogLevel.DEBUG, "Priority reactivated: {0}", priority); 246 } 247 } 248 249 /** 250 * Called when either the child is removed by config update, or a higher priority becomes READY. 251 */ deactivate()252 void deactivate() { 253 if (deletionTimer != null && deletionTimer.isPending()) { 254 return; 255 } 256 257 class DeletionTask implements Runnable { 258 @Override 259 public void run() { 260 tearDown(); 261 children.remove(priority); 262 } 263 } 264 265 deletionTimer = syncContext.schedule(new DeletionTask(), 15, TimeUnit.MINUTES, executor); 266 logger.log(XdsLogLevel.DEBUG, "Priority deactivated: {0}", priority); 267 } 268 tearDown()269 void tearDown() { 270 if (failOverTimer.isPending()) { 271 failOverTimer.cancel(); 272 } 273 if (deletionTimer != null && deletionTimer.isPending()) { 274 deletionTimer.cancel(); 275 } 276 lb.shutdown(); 277 logger.log(XdsLogLevel.DEBUG, "Priority deleted: {0}", priority); 278 } 279 280 /** 281 * Called either when the child is just created and in this case updated with the cached {@code 282 * resolvedAddresses}, or when priority lb receives a new resolved addresses while the child 283 * already exists. 284 */ updateResolvedAddresses()285 void updateResolvedAddresses() { 286 PriorityLbConfig config = 287 (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); 288 PolicySelection childPolicySelection = config.childConfigs.get(priority).policySelection; 289 LoadBalancerProvider lbProvider = childPolicySelection.getProvider(); 290 String newPolicy = lbProvider.getPolicyName(); 291 if (!newPolicy.equals(policy)) { 292 policy = newPolicy; 293 lb.switchTo(lbProvider); 294 } 295 lb.handleResolvedAddresses( 296 resolvedAddresses.toBuilder() 297 .setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), priority)) 298 .setLoadBalancingPolicyConfig(childPolicySelection.getConfig()) 299 .build()); 300 } 301 302 final class ChildHelper extends ForwardingLoadBalancerHelper { 303 private final boolean ignoreReresolution; 304 ChildHelper(boolean ignoreReresolution)305 ChildHelper(boolean ignoreReresolution) { 306 this.ignoreReresolution = ignoreReresolution; 307 } 308 309 @Override refreshNameResolution()310 public void refreshNameResolution() { 311 if (!ignoreReresolution) { 312 delegate().refreshNameResolution(); 313 } 314 } 315 316 @Override updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker)317 public void updateBalancingState(final ConnectivityState newState, 318 final SubchannelPicker newPicker) { 319 if (!children.containsKey(priority)) { 320 return; 321 } 322 connectivityState = newState; 323 picker = newPicker; 324 325 if (deletionTimer != null && deletionTimer.isPending()) { 326 return; 327 } 328 if (newState.equals(CONNECTING)) { 329 if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) { 330 failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, 331 executor); 332 } 333 } else if (newState.equals(READY) || newState.equals(IDLE)) { 334 seenReadyOrIdleSinceTransientFailure = true; 335 failOverTimer.cancel(); 336 } else if (newState.equals(TRANSIENT_FAILURE)) { 337 seenReadyOrIdleSinceTransientFailure = false; 338 failOverTimer.cancel(); 339 } 340 341 // If we are currently handling newly resolved addresses, let's not try to reconfigure as 342 // the address handling process will take care of that to provide an atomic config update. 343 if (!handlingResolvedAddresses) { 344 tryNextPriority(); 345 } 346 } 347 348 @Override delegate()349 protected Helper delegate() { 350 return helper; 351 } 352 } 353 } 354 } 355