1 /* 2 * Copyright 2021 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME; 22 23 import com.google.auto.value.AutoValue; 24 import com.google.common.annotations.VisibleForTesting; 25 import com.google.common.collect.ImmutableList; 26 import com.google.common.collect.ImmutableMap; 27 import com.google.common.util.concurrent.SettableFuture; 28 import io.grpc.Attributes; 29 import io.grpc.InternalServerInterceptors; 30 import io.grpc.Metadata; 31 import io.grpc.MethodDescriptor; 32 import io.grpc.Server; 33 import io.grpc.ServerBuilder; 34 import io.grpc.ServerCall; 35 import io.grpc.ServerCall.Listener; 36 import io.grpc.ServerCallHandler; 37 import io.grpc.ServerInterceptor; 38 import io.grpc.ServerServiceDefinition; 39 import io.grpc.Status; 40 import io.grpc.StatusException; 41 import io.grpc.SynchronizationContext; 42 import io.grpc.SynchronizationContext.ScheduledHandle; 43 import io.grpc.internal.GrpcUtil; 44 import io.grpc.internal.ObjectPool; 45 import io.grpc.internal.SharedResourceHolder; 46 import io.grpc.xds.EnvoyServerProtoData.FilterChain; 47 import io.grpc.xds.Filter.FilterConfig; 48 import io.grpc.xds.Filter.NamedFilterConfig; 49 import io.grpc.xds.Filter.ServerInterceptorBuilder; 50 import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector; 51 import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; 52 import io.grpc.xds.VirtualHost.Route; 53 import io.grpc.xds.XdsClient.ResourceWatcher; 54 import io.grpc.xds.XdsListenerResource.LdsUpdate; 55 import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; 56 import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; 57 import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener; 58 import io.grpc.xds.internal.security.SslContextProviderSupplier; 59 import java.io.IOException; 60 import java.net.SocketAddress; 61 import java.util.ArrayList; 62 import java.util.Collections; 63 import java.util.HashMap; 64 import java.util.HashSet; 65 import java.util.List; 66 import java.util.Map; 67 import java.util.Set; 68 import java.util.concurrent.CountDownLatch; 69 import java.util.concurrent.ExecutionException; 70 import java.util.concurrent.ScheduledExecutorService; 71 import java.util.concurrent.TimeUnit; 72 import java.util.concurrent.atomic.AtomicBoolean; 73 import java.util.concurrent.atomic.AtomicReference; 74 import java.util.logging.Level; 75 import java.util.logging.Logger; 76 import javax.annotation.Nullable; 77 78 final class XdsServerWrapper extends Server { 79 private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName()); 80 81 private final SynchronizationContext syncContext = new SynchronizationContext( 82 new Thread.UncaughtExceptionHandler() { 83 @Override 84 public void uncaughtException(Thread t, Throwable e) { 85 logger.log(Level.SEVERE, "Exception!" + e); 86 // TODO(chengyuanzhang): implement cleanup. 87 } 88 }); 89 90 public static final Attributes.Key<AtomicReference<ServerRoutingConfig>> 91 ATTR_SERVER_ROUTING_CONFIG = 92 Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig"); 93 94 @VisibleForTesting 95 static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1); 96 private final String listenerAddress; 97 private final ServerBuilder<?> delegateBuilder; 98 private boolean sharedTimeService; 99 private final ScheduledExecutorService timeService; 100 private final FilterRegistry filterRegistry; 101 private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance; 102 private final XdsClientPoolFactory xdsClientPoolFactory; 103 private final XdsServingStatusListener listener; 104 private final FilterChainSelectorManager filterChainSelectorManager; 105 private final AtomicBoolean started = new AtomicBoolean(false); 106 private final AtomicBoolean shutdown = new AtomicBoolean(false); 107 private boolean isServing; 108 private final CountDownLatch internalTerminationLatch = new CountDownLatch(1); 109 private final SettableFuture<Exception> initialStartFuture = SettableFuture.create(); 110 private boolean initialStarted; 111 private ScheduledHandle restartTimer; 112 private ObjectPool<XdsClient> xdsClientPool; 113 private XdsClient xdsClient; 114 private DiscoveryState discoveryState; 115 private volatile Server delegate; 116 XdsServerWrapper( String listenerAddress, ServerBuilder<?> delegateBuilder, XdsServingStatusListener listener, FilterChainSelectorManager filterChainSelectorManager, XdsClientPoolFactory xdsClientPoolFactory, FilterRegistry filterRegistry)117 XdsServerWrapper( 118 String listenerAddress, 119 ServerBuilder<?> delegateBuilder, 120 XdsServingStatusListener listener, 121 FilterChainSelectorManager filterChainSelectorManager, 122 XdsClientPoolFactory xdsClientPoolFactory, 123 FilterRegistry filterRegistry) { 124 this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager, 125 xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE)); 126 sharedTimeService = true; 127 } 128 129 @VisibleForTesting XdsServerWrapper( String listenerAddress, ServerBuilder<?> delegateBuilder, XdsServingStatusListener listener, FilterChainSelectorManager filterChainSelectorManager, XdsClientPoolFactory xdsClientPoolFactory, FilterRegistry filterRegistry, ScheduledExecutorService timeService)130 XdsServerWrapper( 131 String listenerAddress, 132 ServerBuilder<?> delegateBuilder, 133 XdsServingStatusListener listener, 134 FilterChainSelectorManager filterChainSelectorManager, 135 XdsClientPoolFactory xdsClientPoolFactory, 136 FilterRegistry filterRegistry, 137 ScheduledExecutorService timeService) { 138 this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress"); 139 this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder"); 140 this.delegateBuilder.intercept(new ConfigApplyingInterceptor()); 141 this.listener = checkNotNull(listener, "listener"); 142 this.filterChainSelectorManager 143 = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager"); 144 this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); 145 this.timeService = checkNotNull(timeService, "timeService"); 146 this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry"); 147 this.delegate = delegateBuilder.build(); 148 } 149 150 @Override start()151 public Server start() throws IOException { 152 checkState(started.compareAndSet(false, true), "Already started"); 153 syncContext.execute(new Runnable() { 154 @Override 155 public void run() { 156 internalStart(); 157 } 158 }); 159 Exception exception; 160 try { 161 exception = initialStartFuture.get(); 162 } catch (InterruptedException | ExecutionException e) { 163 throw new RuntimeException(e); 164 } 165 if (exception != null) { 166 throw (exception instanceof IOException) ? (IOException) exception : 167 new IOException(exception); 168 } 169 return this; 170 } 171 internalStart()172 private void internalStart() { 173 try { 174 xdsClientPool = xdsClientPoolFactory.getOrCreate(); 175 } catch (Exception e) { 176 StatusException statusException = Status.UNAVAILABLE.withDescription( 177 "Failed to initialize xDS").withCause(e).asException(); 178 listener.onNotServing(statusException); 179 initialStartFuture.set(statusException); 180 return; 181 } 182 xdsClient = xdsClientPool.getObject(); 183 String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate(); 184 if (listenerTemplate == null) { 185 StatusException statusException = 186 Status.UNAVAILABLE.withDescription( 187 "Can only support xDS v3 with listener resource name template").asException(); 188 listener.onNotServing(statusException); 189 initialStartFuture.set(statusException); 190 xdsClient = xdsClientPool.returnObject(xdsClient); 191 return; 192 } 193 String replacement = listenerAddress; 194 if (listenerTemplate.startsWith(XDSTP_SCHEME)) { 195 replacement = XdsClient.percentEncodePath(replacement); 196 } 197 discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement)); 198 } 199 200 @Override shutdown()201 public Server shutdown() { 202 if (!shutdown.compareAndSet(false, true)) { 203 return this; 204 } 205 syncContext.execute(new Runnable() { 206 @Override 207 public void run() { 208 if (!delegate.isShutdown()) { 209 delegate.shutdown(); 210 } 211 internalShutdown(); 212 } 213 }); 214 return this; 215 } 216 217 @Override shutdownNow()218 public Server shutdownNow() { 219 if (!shutdown.compareAndSet(false, true)) { 220 return this; 221 } 222 syncContext.execute(new Runnable() { 223 @Override 224 public void run() { 225 if (!delegate.isShutdown()) { 226 delegate.shutdownNow(); 227 } 228 internalShutdown(); 229 initialStartFuture.set(new IOException("server is forcefully shut down")); 230 } 231 }); 232 return this; 233 } 234 235 // Must run in SynchronizationContext internalShutdown()236 private void internalShutdown() { 237 logger.log(Level.FINER, "Shutting down XdsServerWrapper"); 238 if (discoveryState != null) { 239 discoveryState.shutdown(); 240 } 241 if (xdsClient != null) { 242 xdsClient = xdsClientPool.returnObject(xdsClient); 243 } 244 if (restartTimer != null) { 245 restartTimer.cancel(); 246 } 247 if (sharedTimeService) { 248 SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService); 249 } 250 isServing = false; 251 internalTerminationLatch.countDown(); 252 } 253 254 @Override isShutdown()255 public boolean isShutdown() { 256 return shutdown.get(); 257 } 258 259 @Override isTerminated()260 public boolean isTerminated() { 261 return internalTerminationLatch.getCount() == 0 && delegate.isTerminated(); 262 } 263 264 @Override awaitTermination(long timeout, TimeUnit unit)265 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 266 long startTime = System.nanoTime(); 267 if (!internalTerminationLatch.await(timeout, unit)) { 268 return false; 269 } 270 long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime); 271 return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS); 272 } 273 274 @Override awaitTermination()275 public void awaitTermination() throws InterruptedException { 276 internalTerminationLatch.await(); 277 delegate.awaitTermination(); 278 } 279 280 @Override getPort()281 public int getPort() { 282 return delegate.getPort(); 283 } 284 285 @Override getListenSockets()286 public List<? extends SocketAddress> getListenSockets() { 287 return delegate.getListenSockets(); 288 } 289 290 @Override getServices()291 public List<ServerServiceDefinition> getServices() { 292 return delegate.getServices(); 293 } 294 295 @Override getImmutableServices()296 public List<ServerServiceDefinition> getImmutableServices() { 297 return delegate.getImmutableServices(); 298 } 299 300 @Override getMutableServices()301 public List<ServerServiceDefinition> getMutableServices() { 302 return delegate.getMutableServices(); 303 } 304 305 // Must run in SynchronizationContext startDelegateServer()306 private void startDelegateServer() { 307 if (restartTimer != null && restartTimer.isPending()) { 308 return; 309 } 310 if (isServing) { 311 return; 312 } 313 if (delegate.isShutdown()) { 314 delegate = delegateBuilder.build(); 315 } 316 try { 317 delegate.start(); 318 listener.onServing(); 319 isServing = true; 320 if (!initialStarted) { 321 initialStarted = true; 322 initialStartFuture.set(null); 323 } 324 logger.log(Level.FINER, "Delegate server started."); 325 } catch (IOException e) { 326 logger.log(Level.FINE, "Fail to start delegate server: {0}", e); 327 if (!initialStarted) { 328 initialStarted = true; 329 initialStartFuture.set(e); 330 } else { 331 listener.onNotServing(e); 332 } 333 restartTimer = syncContext.schedule( 334 new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService); 335 } 336 } 337 338 private final class RestartTask implements Runnable { 339 @Override run()340 public void run() { 341 startDelegateServer(); 342 } 343 } 344 345 private final class DiscoveryState implements ResourceWatcher<LdsUpdate> { 346 private final String resourceName; 347 // RDS resource name is the key. 348 private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>(); 349 // Track pending RDS resources using rds name. 350 private final Set<String> pendingRds = new HashSet<>(); 351 // Most recently discovered filter chains. 352 private List<FilterChain> filterChains = new ArrayList<>(); 353 // Most recently discovered default filter chain. 354 @Nullable 355 private FilterChain defaultFilterChain; 356 private boolean stopped; 357 private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef 358 = new HashMap<>(); 359 private final ServerInterceptor noopInterceptor = new ServerInterceptor() { 360 @Override 361 public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, 362 Metadata headers, ServerCallHandler<ReqT, RespT> next) { 363 return next.startCall(call, headers); 364 } 365 }; 366 DiscoveryState(String resourceName)367 private DiscoveryState(String resourceName) { 368 this.resourceName = checkNotNull(resourceName, "resourceName"); 369 xdsClient.watchXdsResource(XdsListenerResource.getInstance(), resourceName, this); 370 } 371 372 @Override onChanged(final LdsUpdate update)373 public void onChanged(final LdsUpdate update) { 374 syncContext.execute(new Runnable() { 375 @Override 376 public void run() { 377 if (stopped) { 378 return; 379 } 380 logger.log(Level.FINEST, "Received Lds update {0}", update); 381 checkNotNull(update.listener(), "update"); 382 if (!pendingRds.isEmpty()) { 383 // filter chain state has not yet been applied to filterChainSelectorManager and there 384 // are two sets of sslContextProviderSuppliers, so we release the old ones. 385 releaseSuppliersInFlight(); 386 pendingRds.clear(); 387 } 388 filterChains = update.listener().filterChains(); 389 defaultFilterChain = update.listener().defaultFilterChain(); 390 List<FilterChain> allFilterChains = filterChains; 391 if (defaultFilterChain != null) { 392 allFilterChains = new ArrayList<>(filterChains); 393 allFilterChains.add(defaultFilterChain); 394 } 395 Set<String> allRds = new HashSet<>(); 396 for (FilterChain filterChain : allFilterChains) { 397 HttpConnectionManager hcm = filterChain.httpConnectionManager(); 398 if (hcm.virtualHosts() == null) { 399 RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName()); 400 if (rdsState == null) { 401 rdsState = new RouteDiscoveryState(hcm.rdsName()); 402 routeDiscoveryStates.put(hcm.rdsName(), rdsState); 403 xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), 404 hcm.rdsName(), rdsState); 405 } 406 if (rdsState.isPending) { 407 pendingRds.add(hcm.rdsName()); 408 } 409 allRds.add(hcm.rdsName()); 410 } 411 } 412 for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) { 413 if (!allRds.contains(entry.getKey())) { 414 xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), 415 entry.getKey(), entry.getValue()); 416 } 417 } 418 routeDiscoveryStates.keySet().retainAll(allRds); 419 if (pendingRds.isEmpty()) { 420 updateSelector(); 421 } 422 } 423 }); 424 } 425 426 @Override onResourceDoesNotExist(final String resourceName)427 public void onResourceDoesNotExist(final String resourceName) { 428 syncContext.execute(new Runnable() { 429 @Override 430 public void run() { 431 if (stopped) { 432 return; 433 } 434 StatusException statusException = Status.UNAVAILABLE.withDescription( 435 "Listener " + resourceName + " unavailable").asException(); 436 handleConfigNotFound(statusException); 437 } 438 }); 439 } 440 441 @Override onError(final Status error)442 public void onError(final Status error) { 443 syncContext.execute(new Runnable() { 444 @Override 445 public void run() { 446 if (stopped) { 447 return; 448 } 449 logger.log(Level.FINE, "Error from XdsClient", error); 450 if (!isServing) { 451 listener.onNotServing(error.asException()); 452 } 453 } 454 }); 455 } 456 shutdown()457 private void shutdown() { 458 stopped = true; 459 cleanUpRouteDiscoveryStates(); 460 logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName); 461 xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this); 462 List<SslContextProviderSupplier> toRelease = getSuppliersInUse(); 463 filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN); 464 for (SslContextProviderSupplier s: toRelease) { 465 s.close(); 466 } 467 releaseSuppliersInFlight(); 468 } 469 updateSelector()470 private void updateSelector() { 471 Map<FilterChain, AtomicReference<ServerRoutingConfig>> filterChainRouting = new HashMap<>(); 472 savedRdsRoutingConfigRef.clear(); 473 for (FilterChain filterChain: filterChains) { 474 filterChainRouting.put(filterChain, generateRoutingConfig(filterChain)); 475 } 476 FilterChainSelector selector = new FilterChainSelector( 477 Collections.unmodifiableMap(filterChainRouting), 478 defaultFilterChain == null ? null : defaultFilterChain.sslContextProviderSupplier(), 479 defaultFilterChain == null ? new AtomicReference<ServerRoutingConfig>() : 480 generateRoutingConfig(defaultFilterChain)); 481 List<SslContextProviderSupplier> toRelease = getSuppliersInUse(); 482 logger.log(Level.FINEST, "Updating selector {0}", selector); 483 filterChainSelectorManager.updateSelector(selector); 484 for (SslContextProviderSupplier e: toRelease) { 485 e.close(); 486 } 487 startDelegateServer(); 488 } 489 generateRoutingConfig(FilterChain filterChain)490 private AtomicReference<ServerRoutingConfig> generateRoutingConfig(FilterChain filterChain) { 491 HttpConnectionManager hcm = filterChain.httpConnectionManager(); 492 if (hcm.virtualHosts() != null) { 493 ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors( 494 hcm.httpFilterConfigs(), hcm.virtualHosts()); 495 return new AtomicReference<>(ServerRoutingConfig.create(hcm.virtualHosts(),interceptors)); 496 } else { 497 RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName()); 498 checkNotNull(rds, "rds"); 499 AtomicReference<ServerRoutingConfig> serverRoutingConfigRef = new AtomicReference<>(); 500 if (rds.savedVirtualHosts != null) { 501 ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors( 502 hcm.httpFilterConfigs(), rds.savedVirtualHosts); 503 ServerRoutingConfig serverRoutingConfig = 504 ServerRoutingConfig.create(rds.savedVirtualHosts, interceptors); 505 serverRoutingConfigRef.set(serverRoutingConfig); 506 } else { 507 serverRoutingConfigRef.set(ServerRoutingConfig.FAILING_ROUTING_CONFIG); 508 } 509 savedRdsRoutingConfigRef.put(filterChain, serverRoutingConfigRef); 510 return serverRoutingConfigRef; 511 } 512 } 513 generatePerRouteInterceptors( List<NamedFilterConfig> namedFilterConfigs, List<VirtualHost> virtualHosts)514 private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors( 515 List<NamedFilterConfig> namedFilterConfigs, List<VirtualHost> virtualHosts) { 516 ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors = 517 new ImmutableMap.Builder<>(); 518 for (VirtualHost virtualHost : virtualHosts) { 519 for (Route route : virtualHost.routes()) { 520 List<ServerInterceptor> filterInterceptors = new ArrayList<>(); 521 Map<String, FilterConfig> selectedOverrideConfigs = 522 new HashMap<>(virtualHost.filterConfigOverrides()); 523 selectedOverrideConfigs.putAll(route.filterConfigOverrides()); 524 if (namedFilterConfigs != null) { 525 for (NamedFilterConfig namedFilterConfig : namedFilterConfigs) { 526 FilterConfig filterConfig = namedFilterConfig.filterConfig; 527 Filter filter = filterRegistry.get(filterConfig.typeUrl()); 528 if (filter instanceof ServerInterceptorBuilder) { 529 ServerInterceptor interceptor = 530 ((ServerInterceptorBuilder) filter).buildServerInterceptor( 531 filterConfig, selectedOverrideConfigs.get(namedFilterConfig.name)); 532 if (interceptor != null) { 533 filterInterceptors.add(interceptor); 534 } 535 } else { 536 logger.log(Level.WARNING, "HttpFilterConfig(type URL: " 537 + filterConfig.typeUrl() + ") is not supported on server-side. " 538 + "Probably a bug at ClientXdsClient verification."); 539 } 540 } 541 } 542 ServerInterceptor interceptor = combineInterceptors(filterInterceptors); 543 perRouteInterceptors.put(route, interceptor); 544 } 545 } 546 return perRouteInterceptors.buildOrThrow(); 547 } 548 combineInterceptors(final List<ServerInterceptor> interceptors)549 private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) { 550 if (interceptors.isEmpty()) { 551 return noopInterceptor; 552 } 553 if (interceptors.size() == 1) { 554 return interceptors.get(0); 555 } 556 return new ServerInterceptor() { 557 @Override 558 public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, 559 Metadata headers, ServerCallHandler<ReqT, RespT> next) { 560 // intercept forward 561 for (int i = interceptors.size() - 1; i >= 0; i--) { 562 next = InternalServerInterceptors.interceptCallHandlerCreate( 563 interceptors.get(i), next); 564 } 565 return next.startCall(call, headers); 566 } 567 }; 568 } 569 handleConfigNotFound(StatusException exception)570 private void handleConfigNotFound(StatusException exception) { 571 cleanUpRouteDiscoveryStates(); 572 List<SslContextProviderSupplier> toRelease = getSuppliersInUse(); 573 filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN); 574 for (SslContextProviderSupplier s: toRelease) { 575 s.close(); 576 } 577 if (restartTimer != null) { 578 restartTimer.cancel(); 579 } 580 if (!delegate.isShutdown()) { 581 delegate.shutdown(); // let in-progress calls finish 582 } 583 isServing = false; 584 listener.onNotServing(exception); 585 } 586 cleanUpRouteDiscoveryStates()587 private void cleanUpRouteDiscoveryStates() { 588 for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) { 589 String rdsName = rdsState.resourceName; 590 logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName); 591 xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName, 592 rdsState); 593 } 594 routeDiscoveryStates.clear(); 595 savedRdsRoutingConfigRef.clear(); 596 } 597 getSuppliersInUse()598 private List<SslContextProviderSupplier> getSuppliersInUse() { 599 List<SslContextProviderSupplier> toRelease = new ArrayList<>(); 600 FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector(); 601 if (selector != null) { 602 for (FilterChain f: selector.getRoutingConfigs().keySet()) { 603 if (f.sslContextProviderSupplier() != null) { 604 toRelease.add(f.sslContextProviderSupplier()); 605 } 606 } 607 SslContextProviderSupplier defaultSupplier = 608 selector.getDefaultSslContextProviderSupplier(); 609 if (defaultSupplier != null) { 610 toRelease.add(defaultSupplier); 611 } 612 } 613 return toRelease; 614 } 615 releaseSuppliersInFlight()616 private void releaseSuppliersInFlight() { 617 SslContextProviderSupplier supplier; 618 for (FilterChain filterChain : filterChains) { 619 supplier = filterChain.sslContextProviderSupplier(); 620 if (supplier != null) { 621 supplier.close(); 622 } 623 } 624 if (defaultFilterChain != null 625 && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) { 626 supplier.close(); 627 } 628 } 629 630 private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> { 631 private final String resourceName; 632 private ImmutableList<VirtualHost> savedVirtualHosts; 633 private boolean isPending = true; 634 RouteDiscoveryState(String resourceName)635 private RouteDiscoveryState(String resourceName) { 636 this.resourceName = checkNotNull(resourceName, "resourceName"); 637 } 638 639 @Override onChanged(final RdsUpdate update)640 public void onChanged(final RdsUpdate update) { 641 syncContext.execute(new Runnable() { 642 @Override 643 public void run() { 644 if (!routeDiscoveryStates.containsKey(resourceName)) { 645 return; 646 } 647 if (savedVirtualHosts == null && !isPending) { 648 logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName); 649 } 650 savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts); 651 updateRdsRoutingConfig(); 652 maybeUpdateSelector(); 653 } 654 }); 655 } 656 657 @Override onResourceDoesNotExist(final String resourceName)658 public void onResourceDoesNotExist(final String resourceName) { 659 syncContext.execute(new Runnable() { 660 @Override 661 public void run() { 662 if (!routeDiscoveryStates.containsKey(resourceName)) { 663 return; 664 } 665 logger.log(Level.WARNING, "Rds {0} unavailable", resourceName); 666 savedVirtualHosts = null; 667 updateRdsRoutingConfig(); 668 maybeUpdateSelector(); 669 } 670 }); 671 } 672 673 @Override onError(final Status error)674 public void onError(final Status error) { 675 syncContext.execute(new Runnable() { 676 @Override 677 public void run() { 678 if (!routeDiscoveryStates.containsKey(resourceName)) { 679 return; 680 } 681 logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.", 682 new Object[]{resourceName, error}); 683 maybeUpdateSelector(); 684 } 685 }); 686 } 687 updateRdsRoutingConfig()688 private void updateRdsRoutingConfig() { 689 for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) { 690 if (resourceName.equals(filterChain.httpConnectionManager().rdsName())) { 691 ServerRoutingConfig updatedRoutingConfig; 692 if (savedVirtualHosts == null) { 693 updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG; 694 } else { 695 ImmutableMap<Route, ServerInterceptor> updatedInterceptors = 696 generatePerRouteInterceptors( 697 filterChain.httpConnectionManager().httpFilterConfigs(), 698 savedVirtualHosts); 699 updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts, 700 updatedInterceptors); 701 } 702 logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}", 703 new Object[]{filterChain.name(), updatedRoutingConfig}); 704 savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig); 705 } 706 } 707 } 708 709 // Update the selector to use the most recently updated configs only after all rds have been 710 // discovered for the first time. Later changes on rds will be applied through virtual host 711 // list atomic ref. maybeUpdateSelector()712 private void maybeUpdateSelector() { 713 isPending = false; 714 boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty(); 715 if (isLastPending) { 716 updateSelector(); 717 } 718 } 719 } 720 } 721 722 @VisibleForTesting 723 final class ConfigApplyingInterceptor implements ServerInterceptor { 724 private final ServerInterceptor noopInterceptor = new ServerInterceptor() { 725 @Override 726 public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, 727 Metadata headers, ServerCallHandler<ReqT, RespT> next) { 728 return next.startCall(call, headers); 729 } 730 }; 731 732 @Override 733 public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, 734 Metadata headers, ServerCallHandler<ReqT, RespT> next) { 735 AtomicReference<ServerRoutingConfig> routingConfigRef = 736 call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG); 737 ServerRoutingConfig routingConfig = routingConfigRef == null ? null : 738 routingConfigRef.get(); 739 if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) { 740 String errorMsg = "Missing or broken xDS routing config: RDS config unavailable."; 741 call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata()); 742 return new Listener<ReqT>() {}; 743 } 744 List<VirtualHost> virtualHosts = routingConfig.virtualHosts(); 745 VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName( 746 virtualHosts, call.getAuthority()); 747 if (virtualHost == null) { 748 call.close( 749 Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"), 750 new Metadata()); 751 return new Listener<ReqT>() {}; 752 } 753 Route selectedRoute = null; 754 MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor(); 755 for (Route route : virtualHost.routes()) { 756 if (RoutingUtils.matchRoute( 757 route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) { 758 selectedRoute = route; 759 break; 760 } 761 } 762 if (selectedRoute == null) { 763 call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"), 764 new Metadata()); 765 return new ServerCall.Listener<ReqT>() {}; 766 } 767 if (selectedRoute.routeAction() != null) { 768 call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching " 769 + "route: only Route.non_forwarding_action should be allowed."), new Metadata()); 770 return new ServerCall.Listener<ReqT>() {}; 771 } 772 ServerInterceptor routeInterceptor = noopInterceptor; 773 Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors(); 774 if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) { 775 routeInterceptor = perRouteInterceptors.get(selectedRoute); 776 } 777 return routeInterceptor.interceptCall(call, headers, next); 778 } 779 } 780 781 /** 782 * The HttpConnectionManager level configuration. 783 */ 784 @AutoValue 785 abstract static class ServerRoutingConfig { 786 @VisibleForTesting 787 static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create( 788 ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of()); 789 790 abstract ImmutableList<VirtualHost> virtualHosts(); 791 792 // Prebuilt per route server interceptors from http filter configs. 793 abstract ImmutableMap<Route, ServerInterceptor> interceptors(); 794 795 /** 796 * Server routing configuration. 797 * */ 798 public static ServerRoutingConfig create( 799 ImmutableList<VirtualHost> virtualHosts, 800 ImmutableMap<Route, ServerInterceptor> interceptors) { 801 checkNotNull(virtualHosts, "virtualHosts"); 802 checkNotNull(interceptors, "interceptors"); 803 return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors); 804 } 805 } 806 } 807