1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME; 22 import static io.grpc.xds.XdsResourceType.ParsedResource; 23 import static io.grpc.xds.XdsResourceType.ValidatedResourceUpdate; 24 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.base.Joiner; 27 import com.google.common.base.Stopwatch; 28 import com.google.common.base.Supplier; 29 import com.google.common.collect.ImmutableMap; 30 import com.google.common.collect.ImmutableSet; 31 import com.google.common.util.concurrent.ListenableFuture; 32 import com.google.common.util.concurrent.SettableFuture; 33 import com.google.protobuf.Any; 34 import io.grpc.ChannelCredentials; 35 import io.grpc.Context; 36 import io.grpc.Grpc; 37 import io.grpc.InternalLogId; 38 import io.grpc.LoadBalancerRegistry; 39 import io.grpc.ManagedChannel; 40 import io.grpc.Status; 41 import io.grpc.SynchronizationContext; 42 import io.grpc.SynchronizationContext.ScheduledHandle; 43 import io.grpc.internal.BackoffPolicy; 44 import io.grpc.internal.TimeProvider; 45 import io.grpc.xds.Bootstrapper.AuthorityInfo; 46 import io.grpc.xds.Bootstrapper.ServerInfo; 47 import io.grpc.xds.LoadStatsManager2.ClusterDropStats; 48 import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; 49 import io.grpc.xds.XdsClient.ResourceStore; 50 import io.grpc.xds.XdsClient.TimerLaunch; 51 import io.grpc.xds.XdsClient.XdsResponseHandler; 52 import io.grpc.xds.XdsLogger.XdsLogLevel; 53 import java.net.URI; 54 import java.util.Collection; 55 import java.util.Collections; 56 import java.util.HashMap; 57 import java.util.HashSet; 58 import java.util.List; 59 import java.util.Map; 60 import java.util.Objects; 61 import java.util.Set; 62 import java.util.concurrent.ScheduledExecutorService; 63 import java.util.concurrent.TimeUnit; 64 import java.util.logging.Level; 65 import java.util.logging.Logger; 66 import javax.annotation.Nullable; 67 68 /** 69 * XdsClient implementation. 70 */ 71 final class XdsClientImpl extends XdsClient 72 implements XdsResponseHandler, ResourceStore, TimerLaunch { 73 74 private static boolean LOG_XDS_NODE_ID = Boolean.parseBoolean( 75 System.getenv("GRPC_LOG_XDS_NODE_ID")); 76 private static final Logger classLogger = Logger.getLogger(XdsClientImpl.class.getName()); 77 78 // Longest time to wait, since the subscription to some resource, for concluding its absence. 79 @VisibleForTesting 80 static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15; 81 private final SynchronizationContext syncContext = new SynchronizationContext( 82 new Thread.UncaughtExceptionHandler() { 83 @Override 84 public void uncaughtException(Thread t, Throwable e) { 85 logger.log( 86 XdsLogLevel.ERROR, 87 "Uncaught exception in XdsClient SynchronizationContext. Panic!", 88 e); 89 // TODO(chengyuanzhang): better error handling. 90 throw new AssertionError(e); 91 } 92 }); 93 private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry(); 94 private final LoadBalancerRegistry loadBalancerRegistry 95 = LoadBalancerRegistry.getDefaultRegistry(); 96 private final Map<ServerInfo, ControlPlaneClient> serverChannelMap = new HashMap<>(); 97 private final Map<XdsResourceType<? extends ResourceUpdate>, 98 Map<String, ResourceSubscriber<? extends ResourceUpdate>>> 99 resourceSubscribers = new HashMap<>(); 100 private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>(); 101 private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>(); 102 private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>(); 103 private final XdsChannelFactory xdsChannelFactory; 104 private final Bootstrapper.BootstrapInfo bootstrapInfo; 105 private final Context context; 106 private final ScheduledExecutorService timeService; 107 private final BackoffPolicy.Provider backoffPolicyProvider; 108 private final Supplier<Stopwatch> stopwatchSupplier; 109 private final TimeProvider timeProvider; 110 private final TlsContextManager tlsContextManager; 111 private final InternalLogId logId; 112 private final XdsLogger logger; 113 private volatile boolean isShutdown; 114 XdsClientImpl( XdsChannelFactory xdsChannelFactory, Bootstrapper.BootstrapInfo bootstrapInfo, Context context, ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier, TimeProvider timeProvider, TlsContextManager tlsContextManager)115 XdsClientImpl( 116 XdsChannelFactory xdsChannelFactory, 117 Bootstrapper.BootstrapInfo bootstrapInfo, 118 Context context, 119 ScheduledExecutorService timeService, 120 BackoffPolicy.Provider backoffPolicyProvider, 121 Supplier<Stopwatch> stopwatchSupplier, 122 TimeProvider timeProvider, 123 TlsContextManager tlsContextManager) { 124 this.xdsChannelFactory = xdsChannelFactory; 125 this.bootstrapInfo = bootstrapInfo; 126 this.context = context; 127 this.timeService = timeService; 128 this.backoffPolicyProvider = backoffPolicyProvider; 129 this.stopwatchSupplier = stopwatchSupplier; 130 this.timeProvider = timeProvider; 131 this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager"); 132 logId = InternalLogId.allocate("xds-client", null); 133 logger = XdsLogger.withLogId(logId); 134 logger.log(XdsLogLevel.INFO, "Created"); 135 if (LOG_XDS_NODE_ID) { 136 classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId()); 137 } 138 } 139 maybeCreateXdsChannelWithLrs(ServerInfo serverInfo)140 private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { 141 syncContext.throwIfNotInThisSynchronizationContext(); 142 if (serverChannelMap.containsKey(serverInfo)) { 143 return; 144 } 145 ControlPlaneClient xdsChannel = new ControlPlaneClient( 146 xdsChannelFactory, 147 serverInfo, 148 bootstrapInfo.node(), 149 this, 150 this, 151 context, 152 timeService, 153 syncContext, 154 backoffPolicyProvider, 155 stopwatchSupplier, 156 this); 157 LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier); 158 loadStatsManagerMap.put(serverInfo, loadStatsManager); 159 LoadReportClient lrsClient = new LoadReportClient( 160 loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext, 161 timeService, backoffPolicyProvider, stopwatchSupplier); 162 serverChannelMap.put(serverInfo, xdsChannel); 163 serverLrsClientMap.put(serverInfo, lrsClient); 164 } 165 166 @Override handleResourceResponse( XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo, List<Any> resources, String nonce)167 public void handleResourceResponse( 168 XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo, 169 List<Any> resources, String nonce) { 170 checkNotNull(xdsResourceType, "xdsResourceType"); 171 syncContext.throwIfNotInThisSynchronizationContext(); 172 Set<String> toParseResourceNames = null; 173 if (!(xdsResourceType == XdsListenerResource.getInstance() 174 || xdsResourceType == XdsRouteConfigureResource.getInstance()) 175 && resourceSubscribers.containsKey(xdsResourceType)) { 176 toParseResourceNames = resourceSubscribers.get(xdsResourceType).keySet(); 177 } 178 XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce, 179 bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager, 180 toParseResourceNames); 181 handleResourceUpdate(args, resources, xdsResourceType); 182 } 183 184 @Override handleStreamClosed(Status error)185 public void handleStreamClosed(Status error) { 186 syncContext.throwIfNotInThisSynchronizationContext(); 187 cleanUpResourceTimers(); 188 for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap : 189 resourceSubscribers.values()) { 190 for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) { 191 if (!subscriber.hasResult()) { 192 subscriber.onError(error); 193 } 194 } 195 } 196 } 197 198 @Override handleStreamRestarted(ServerInfo serverInfo)199 public void handleStreamRestarted(ServerInfo serverInfo) { 200 syncContext.throwIfNotInThisSynchronizationContext(); 201 for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap : 202 resourceSubscribers.values()) { 203 for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) { 204 if (subscriber.serverInfo.equals(serverInfo)) { 205 subscriber.restartTimer(); 206 } 207 } 208 } 209 } 210 211 @Override shutdown()212 void shutdown() { 213 syncContext.execute( 214 new Runnable() { 215 @Override 216 public void run() { 217 if (isShutdown) { 218 return; 219 } 220 isShutdown = true; 221 for (ControlPlaneClient xdsChannel : serverChannelMap.values()) { 222 xdsChannel.shutdown(); 223 } 224 for (final LoadReportClient lrsClient : serverLrsClientMap.values()) { 225 lrsClient.stopLoadReporting(); 226 } 227 cleanUpResourceTimers(); 228 } 229 }); 230 } 231 232 @Override isShutDown()233 boolean isShutDown() { 234 return isShutdown; 235 } 236 237 @Override getSubscribedResourceTypesWithTypeUrl()238 public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() { 239 return Collections.unmodifiableMap(subscribedResourceTypeUrls); 240 } 241 242 @Nullable 243 @Override getSubscribedResources(ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type)244 public Collection<String> getSubscribedResources(ServerInfo serverInfo, 245 XdsResourceType<? extends ResourceUpdate> type) { 246 Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources = 247 resourceSubscribers.getOrDefault(type, Collections.emptyMap()); 248 ImmutableSet.Builder<String> builder = ImmutableSet.builder(); 249 for (String key : resources.keySet()) { 250 if (resources.get(key).serverInfo.equals(serverInfo)) { 251 builder.add(key); 252 } 253 } 254 Collection<String> retVal = builder.build(); 255 return retVal.isEmpty() ? null : retVal; 256 } 257 258 // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic. 259 // ResourceTypes that do not have subscribers does not show up in the snapshot keys. 260 @Override 261 ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> getSubscribedResourcesMetadataSnapshot()262 getSubscribedResourcesMetadataSnapshot() { 263 final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future = 264 SettableFuture.create(); 265 syncContext.execute(new Runnable() { 266 @Override 267 public void run() { 268 // A map from a "resource type" to a map ("resource name": "resource metadata") 269 ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot = 270 ImmutableMap.builder(); 271 for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) { 272 ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder(); 273 for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry 274 : resourceSubscribers.get(resourceType).entrySet()) { 275 metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata); 276 } 277 metadataSnapshot.put(resourceType, metadataMap.buildOrThrow()); 278 } 279 future.set(metadataSnapshot.buildOrThrow()); 280 } 281 }); 282 return future; 283 } 284 285 @Override getTlsContextManager()286 TlsContextManager getTlsContextManager() { 287 return tlsContextManager; 288 } 289 290 @Override watchXdsResource(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)291 <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName, 292 ResourceWatcher<T> watcher) { 293 syncContext.execute(new Runnable() { 294 @Override 295 @SuppressWarnings("unchecked") 296 public void run() { 297 if (!resourceSubscribers.containsKey(type)) { 298 resourceSubscribers.put(type, new HashMap<>()); 299 subscribedResourceTypeUrls.put(type.typeUrl(), type); 300 } 301 ResourceSubscriber<T> subscriber = 302 (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);; 303 if (subscriber == null) { 304 logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName); 305 subscriber = new ResourceSubscriber<>(type, resourceName); 306 resourceSubscribers.get(type).put(resourceName, subscriber); 307 if (subscriber.xdsChannel != null) { 308 subscriber.xdsChannel.adjustResourceSubscription(type); 309 } 310 } 311 subscriber.addWatcher(watcher); 312 } 313 }); 314 } 315 316 @Override cancelXdsResourceWatch(XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher)317 <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type, 318 String resourceName, 319 ResourceWatcher<T> watcher) { 320 syncContext.execute(new Runnable() { 321 @Override 322 @SuppressWarnings("unchecked") 323 public void run() { 324 ResourceSubscriber<T> subscriber = 325 (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);; 326 subscriber.removeWatcher(watcher); 327 if (!subscriber.isWatched()) { 328 subscriber.cancelResourceWatch(); 329 resourceSubscribers.get(type).remove(resourceName); 330 if (subscriber.xdsChannel != null) { 331 subscriber.xdsChannel.adjustResourceSubscription(type); 332 } 333 if (resourceSubscribers.get(type).isEmpty()) { 334 resourceSubscribers.remove(type); 335 subscribedResourceTypeUrls.remove(type.typeUrl()); 336 337 } 338 } 339 } 340 }); 341 } 342 343 @Override addClusterDropStats( final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName)344 ClusterDropStats addClusterDropStats( 345 final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) { 346 LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo); 347 ClusterDropStats dropCounter = 348 loadStatsManager.getClusterDropStats(clusterName, edsServiceName); 349 syncContext.execute(new Runnable() { 350 @Override 351 public void run() { 352 serverLrsClientMap.get(serverInfo).startLoadReporting(); 353 } 354 }); 355 return dropCounter; 356 } 357 358 @Override addClusterLocalityStats( final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, Locality locality)359 ClusterLocalityStats addClusterLocalityStats( 360 final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName, 361 Locality locality) { 362 LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo); 363 ClusterLocalityStats loadCounter = 364 loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality); 365 syncContext.execute(new Runnable() { 366 @Override 367 public void run() { 368 serverLrsClientMap.get(serverInfo).startLoadReporting(); 369 } 370 }); 371 return loadCounter; 372 } 373 374 @Override getBootstrapInfo()375 Bootstrapper.BootstrapInfo getBootstrapInfo() { 376 return bootstrapInfo; 377 } 378 379 @VisibleForTesting 380 @Override getServerLrsClientMap()381 Map<ServerInfo, LoadReportClient> getServerLrsClientMap() { 382 return ImmutableMap.copyOf(serverLrsClientMap); 383 } 384 385 @Override toString()386 public String toString() { 387 return logId.toString(); 388 } 389 390 @Override startSubscriberTimersIfNeeded(ServerInfo serverInfo)391 public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) { 392 if (isShutDown()) { 393 return; 394 } 395 396 syncContext.execute(new Runnable() { 397 @Override 398 public void run() { 399 if (isShutDown()) { 400 return; 401 } 402 403 for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) { 404 for (ResourceSubscriber<?> subscriber : subscriberMap.values()) { 405 if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) { 406 subscriber.restartTimer(); 407 } 408 } 409 } 410 } 411 }); 412 } 413 cleanUpResourceTimers()414 private void cleanUpResourceTimers() { 415 for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) { 416 for (ResourceSubscriber<?> subscriber : subscriberMap.values()) { 417 subscriber.stopTimer(); 418 } 419 } 420 } 421 422 @SuppressWarnings("unchecked") handleResourceUpdate(XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType)423 private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Args args, 424 List<Any> resources, 425 XdsResourceType<T> xdsResourceType) { 426 ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources); 427 logger.log(XdsLogger.XdsLogLevel.INFO, 428 "Received {0} Response version {1} nonce {2}. Parsed resources: {3}", 429 xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources); 430 Map<String, ParsedResource<T>> parsedResources = result.parsedResources; 431 Set<String> invalidResources = result.invalidResources; 432 List<String> errors = result.errors; 433 String errorDetail = null; 434 if (errors.isEmpty()) { 435 checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); 436 serverChannelMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo, 437 args.nonce); 438 } else { 439 errorDetail = Joiner.on('\n').join(errors); 440 logger.log(XdsLogLevel.WARNING, 441 "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", 442 xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail); 443 serverChannelMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail); 444 } 445 446 long updateTime = timeProvider.currentTimeNanos(); 447 Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources = 448 resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap()); 449 for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) { 450 String resourceName = entry.getKey(); 451 ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue(); 452 453 if (parsedResources.containsKey(resourceName)) { 454 // Happy path: the resource updated successfully. Notify the watchers of the update. 455 subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime); 456 continue; 457 } 458 459 if (invalidResources.contains(resourceName)) { 460 // The resource update is invalid. Capture the error without notifying the watchers. 461 subscriber.onRejected(args.versionInfo, updateTime, errorDetail); 462 } 463 464 // Nothing else to do for incremental ADS resources. 465 if (!xdsResourceType.isFullStateOfTheWorld()) { 466 continue; 467 } 468 469 // Handle State of the World ADS: invalid resources. 470 if (invalidResources.contains(resourceName)) { 471 // The resource is missing. Reuse the cached resource if possible. 472 if (subscriber.data == null) { 473 // No cached data. Notify the watchers of an invalid update. 474 subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail)); 475 } 476 continue; 477 } 478 479 // For State of the World services, notify watchers when their watched resource is missing 480 // from the ADS update. Note that we can only do this if the resource update is coming from 481 // the same xDS server that the ResourceSubscriber is subscribed to. 482 if (subscriber.serverInfo.equals(args.serverInfo)) { 483 subscriber.onAbsent(); 484 } 485 } 486 } 487 488 /** 489 * Tracks a single subscribed resource. 490 */ 491 private final class ResourceSubscriber<T extends ResourceUpdate> { 492 @Nullable private final ServerInfo serverInfo; 493 @Nullable private final ControlPlaneClient xdsChannel; 494 private final XdsResourceType<T> type; 495 private final String resource; 496 private final Set<ResourceWatcher<T>> watchers = new HashSet<>(); 497 @Nullable private T data; 498 private boolean absent; 499 // Tracks whether the deletion has been ignored per bootstrap server feature. 500 // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md 501 private boolean resourceDeletionIgnored; 502 @Nullable private ScheduledHandle respTimer; 503 @Nullable private ResourceMetadata metadata; 504 @Nullable private String errorDescription; 505 ResourceSubscriber(XdsResourceType<T> type, String resource)506 ResourceSubscriber(XdsResourceType<T> type, String resource) { 507 syncContext.throwIfNotInThisSynchronizationContext(); 508 this.type = type; 509 this.resource = resource; 510 this.serverInfo = getServerInfo(resource); 511 if (serverInfo == null) { 512 this.errorDescription = "Wrong configuration: xds server does not exist for resource " 513 + resource; 514 this.xdsChannel = null; 515 return; 516 } 517 // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, 518 // is created but not yet requested because the client is in backoff. 519 this.metadata = ResourceMetadata.newResourceMetadataUnknown(); 520 521 ControlPlaneClient xdsChannelTemp = null; 522 try { 523 maybeCreateXdsChannelWithLrs(serverInfo); 524 xdsChannelTemp = serverChannelMap.get(serverInfo); 525 if (xdsChannelTemp.isInBackoff()) { 526 return; 527 } 528 } catch (IllegalArgumentException e) { 529 xdsChannelTemp = null; 530 this.errorDescription = "Bad configuration: " + e.getMessage(); 531 return; 532 } finally { 533 this.xdsChannel = xdsChannelTemp; 534 } 535 536 restartTimer(); 537 } 538 539 @Nullable getServerInfo(String resource)540 private ServerInfo getServerInfo(String resource) { 541 if (BootstrapperImpl.enableFederation && resource.startsWith(XDSTP_SCHEME)) { 542 URI uri = URI.create(resource); 543 String authority = uri.getAuthority(); 544 if (authority == null) { 545 authority = ""; 546 } 547 AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority); 548 if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) { 549 return null; 550 } 551 return authorityInfo.xdsServers().get(0); 552 } 553 return bootstrapInfo.servers().get(0); // use first server 554 } 555 addWatcher(ResourceWatcher<T> watcher)556 void addWatcher(ResourceWatcher<T> watcher) { 557 checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher); 558 watchers.add(watcher); 559 if (errorDescription != null) { 560 watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); 561 return; 562 } 563 if (data != null) { 564 notifyWatcher(watcher, data); 565 } else if (absent) { 566 watcher.onResourceDoesNotExist(resource); 567 } 568 } 569 removeWatcher(ResourceWatcher<T> watcher)570 void removeWatcher(ResourceWatcher<T> watcher) { 571 checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher); 572 watchers.remove(watcher); 573 } 574 restartTimer()575 void restartTimer() { 576 if (data != null || absent) { // resource already resolved 577 return; 578 } 579 if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer 580 return; 581 } 582 583 class ResourceNotFound implements Runnable { 584 @Override 585 public void run() { 586 logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", 587 type, resource); 588 respTimer = null; 589 onAbsent(); 590 } 591 592 @Override 593 public String toString() { 594 return type + this.getClass().getSimpleName(); 595 } 596 } 597 598 // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. 599 metadata = ResourceMetadata.newResourceMetadataRequested(); 600 601 respTimer = syncContext.schedule( 602 new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, 603 timeService); 604 } 605 stopTimer()606 void stopTimer() { 607 if (respTimer != null && respTimer.isPending()) { 608 respTimer.cancel(); 609 respTimer = null; 610 } 611 } 612 cancelResourceWatch()613 void cancelResourceWatch() { 614 if (isWatched()) { 615 throw new IllegalStateException("Can't cancel resource watch with active watchers present"); 616 } 617 stopTimer(); 618 String message = "Unsubscribing {0} resource {1} from server {2}"; 619 XdsLogLevel logLevel = XdsLogLevel.INFO; 620 if (resourceDeletionIgnored) { 621 message += " for which we previously ignored a deletion"; 622 logLevel = XdsLogLevel.FORCE_INFO; 623 } 624 logger.log(logLevel, message, type, resource, 625 serverInfo != null ? serverInfo.target() : "unknown"); 626 } 627 isWatched()628 boolean isWatched() { 629 return !watchers.isEmpty(); 630 } 631 hasResult()632 boolean hasResult() { 633 return data != null || absent; 634 } 635 onData(ParsedResource<T> parsedResource, String version, long updateTime)636 void onData(ParsedResource<T> parsedResource, String version, long updateTime) { 637 if (respTimer != null && respTimer.isPending()) { 638 respTimer.cancel(); 639 respTimer = null; 640 } 641 this.metadata = ResourceMetadata 642 .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime); 643 ResourceUpdate oldData = this.data; 644 this.data = parsedResource.getResourceUpdate(); 645 absent = false; 646 if (resourceDeletionIgnored) { 647 logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version " 648 + "of resource for which we previously ignored a deletion: type {1} name {2}", 649 serverInfo != null ? serverInfo.target() : "unknown", type, resource); 650 resourceDeletionIgnored = false; 651 } 652 if (!Objects.equals(oldData, data)) { 653 for (ResourceWatcher<T> watcher : watchers) { 654 notifyWatcher(watcher, data); 655 } 656 } 657 } 658 onAbsent()659 void onAbsent() { 660 if (respTimer != null && respTimer.isPending()) { // too early to conclude absence 661 return; 662 } 663 664 // Ignore deletion of State of the World resources when this feature is on, 665 // and the resource is reusable. 666 boolean ignoreResourceDeletionEnabled = 667 serverInfo != null && serverInfo.ignoreResourceDeletion(); 668 if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) { 669 if (!resourceDeletionIgnored) { 670 logger.log(XdsLogLevel.FORCE_WARNING, 671 "xds server {0}: ignoring deletion for resource type {1} name {2}}", 672 serverInfo.target(), type, resource); 673 resourceDeletionIgnored = true; 674 } 675 return; 676 } 677 678 logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); 679 if (!absent) { 680 data = null; 681 absent = true; 682 metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); 683 for (ResourceWatcher<T> watcher : watchers) { 684 watcher.onResourceDoesNotExist(resource); 685 } 686 } 687 } 688 onError(Status error)689 void onError(Status error) { 690 if (respTimer != null && respTimer.isPending()) { 691 respTimer.cancel(); 692 respTimer = null; 693 } 694 695 // Include node ID in xds failures to allow cross-referencing with control plane logs 696 // when debugging. 697 String description = error.getDescription() == null ? "" : error.getDescription() + " "; 698 Status errorAugmented = Status.fromCode(error.getCode()) 699 .withDescription(description + "nodeID: " + bootstrapInfo.node().getId()) 700 .withCause(error.getCause()); 701 702 for (ResourceWatcher<T> watcher : watchers) { 703 watcher.onError(errorAugmented); 704 } 705 } 706 onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails)707 void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) { 708 metadata = ResourceMetadata 709 .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails); 710 } 711 notifyWatcher(ResourceWatcher<T> watcher, T update)712 private void notifyWatcher(ResourceWatcher<T> watcher, T update) { 713 watcher.onChanged(update); 714 } 715 } 716 717 static final class ResourceInvalidException extends Exception { 718 private static final long serialVersionUID = 0L; 719 ResourceInvalidException(String message)720 ResourceInvalidException(String message) { 721 super(message, null, false, false); 722 } 723 ResourceInvalidException(String message, Throwable cause)724 ResourceInvalidException(String message, Throwable cause) { 725 super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false); 726 } 727 } 728 729 abstract static class XdsChannelFactory { 730 static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() { 731 @Override 732 ManagedChannel create(ServerInfo serverInfo) { 733 String target = serverInfo.target(); 734 ChannelCredentials channelCredentials = serverInfo.channelCredentials(); 735 return Grpc.newChannelBuilder(target, channelCredentials) 736 .keepAliveTime(5, TimeUnit.MINUTES) 737 .build(); 738 } 739 }; 740 create(ServerInfo serverInfo)741 abstract ManagedChannel create(ServerInfo serverInfo); 742 } 743 } 744