xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2021 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
22 
23 import com.google.auto.value.AutoValue;
24 import com.google.common.annotations.VisibleForTesting;
25 import com.google.common.collect.ImmutableList;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.SettableFuture;
28 import io.grpc.Attributes;
29 import io.grpc.InternalServerInterceptors;
30 import io.grpc.Metadata;
31 import io.grpc.MethodDescriptor;
32 import io.grpc.Server;
33 import io.grpc.ServerBuilder;
34 import io.grpc.ServerCall;
35 import io.grpc.ServerCall.Listener;
36 import io.grpc.ServerCallHandler;
37 import io.grpc.ServerInterceptor;
38 import io.grpc.ServerServiceDefinition;
39 import io.grpc.Status;
40 import io.grpc.StatusException;
41 import io.grpc.SynchronizationContext;
42 import io.grpc.SynchronizationContext.ScheduledHandle;
43 import io.grpc.internal.GrpcUtil;
44 import io.grpc.internal.ObjectPool;
45 import io.grpc.internal.SharedResourceHolder;
46 import io.grpc.xds.EnvoyServerProtoData.FilterChain;
47 import io.grpc.xds.Filter.FilterConfig;
48 import io.grpc.xds.Filter.NamedFilterConfig;
49 import io.grpc.xds.Filter.ServerInterceptorBuilder;
50 import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
51 import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
52 import io.grpc.xds.VirtualHost.Route;
53 import io.grpc.xds.XdsClient.ResourceWatcher;
54 import io.grpc.xds.XdsListenerResource.LdsUpdate;
55 import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
56 import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
57 import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
58 import io.grpc.xds.internal.security.SslContextProviderSupplier;
59 import java.io.IOException;
60 import java.net.SocketAddress;
61 import java.util.ArrayList;
62 import java.util.Collections;
63 import java.util.HashMap;
64 import java.util.HashSet;
65 import java.util.List;
66 import java.util.Map;
67 import java.util.Set;
68 import java.util.concurrent.CountDownLatch;
69 import java.util.concurrent.ExecutionException;
70 import java.util.concurrent.ScheduledExecutorService;
71 import java.util.concurrent.TimeUnit;
72 import java.util.concurrent.atomic.AtomicBoolean;
73 import java.util.concurrent.atomic.AtomicReference;
74 import java.util.logging.Level;
75 import java.util.logging.Logger;
76 import javax.annotation.Nullable;
77 
78 final class XdsServerWrapper extends Server {
79   private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
80 
81   private final SynchronizationContext syncContext = new SynchronizationContext(
82       new Thread.UncaughtExceptionHandler() {
83         @Override
84         public void uncaughtException(Thread t, Throwable e) {
85           logger.log(Level.SEVERE, "Exception!" + e);
86           // TODO(chengyuanzhang): implement cleanup.
87         }
88       });
89 
90   public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
91       ATTR_SERVER_ROUTING_CONFIG =
92       Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
93 
94   @VisibleForTesting
95   static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
96   private final String listenerAddress;
97   private final ServerBuilder<?> delegateBuilder;
98   private boolean sharedTimeService;
99   private final ScheduledExecutorService timeService;
100   private final FilterRegistry filterRegistry;
101   private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
102   private final XdsClientPoolFactory xdsClientPoolFactory;
103   private final XdsServingStatusListener listener;
104   private final FilterChainSelectorManager filterChainSelectorManager;
105   private final AtomicBoolean started = new AtomicBoolean(false);
106   private final AtomicBoolean shutdown = new AtomicBoolean(false);
107   private boolean isServing;
108   private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
109   private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
110   private boolean initialStarted;
111   private ScheduledHandle restartTimer;
112   private ObjectPool<XdsClient> xdsClientPool;
113   private XdsClient xdsClient;
114   private DiscoveryState discoveryState;
115   private volatile Server delegate;
116 
XdsServerWrapper( String listenerAddress, ServerBuilder<?> delegateBuilder, XdsServingStatusListener listener, FilterChainSelectorManager filterChainSelectorManager, XdsClientPoolFactory xdsClientPoolFactory, FilterRegistry filterRegistry)117   XdsServerWrapper(
118       String listenerAddress,
119       ServerBuilder<?> delegateBuilder,
120       XdsServingStatusListener listener,
121       FilterChainSelectorManager filterChainSelectorManager,
122       XdsClientPoolFactory xdsClientPoolFactory,
123       FilterRegistry filterRegistry) {
124     this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager,
125         xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
126     sharedTimeService = true;
127   }
128 
129   @VisibleForTesting
XdsServerWrapper( String listenerAddress, ServerBuilder<?> delegateBuilder, XdsServingStatusListener listener, FilterChainSelectorManager filterChainSelectorManager, XdsClientPoolFactory xdsClientPoolFactory, FilterRegistry filterRegistry, ScheduledExecutorService timeService)130   XdsServerWrapper(
131           String listenerAddress,
132           ServerBuilder<?> delegateBuilder,
133           XdsServingStatusListener listener,
134           FilterChainSelectorManager filterChainSelectorManager,
135           XdsClientPoolFactory xdsClientPoolFactory,
136           FilterRegistry filterRegistry,
137           ScheduledExecutorService timeService) {
138     this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
139     this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
140     this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
141     this.listener = checkNotNull(listener, "listener");
142     this.filterChainSelectorManager
143         = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
144     this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
145     this.timeService = checkNotNull(timeService, "timeService");
146     this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
147     this.delegate = delegateBuilder.build();
148   }
149 
150   @Override
start()151   public Server start() throws IOException {
152     checkState(started.compareAndSet(false, true), "Already started");
153     syncContext.execute(new Runnable() {
154       @Override
155       public void run() {
156         internalStart();
157       }
158     });
159     Exception exception;
160     try {
161       exception = initialStartFuture.get();
162     } catch (InterruptedException | ExecutionException e) {
163       throw new RuntimeException(e);
164     }
165     if (exception != null) {
166       throw (exception instanceof IOException) ? (IOException) exception :
167               new IOException(exception);
168     }
169     return this;
170   }
171 
internalStart()172   private void internalStart() {
173     try {
174       xdsClientPool = xdsClientPoolFactory.getOrCreate();
175     } catch (Exception e) {
176       StatusException statusException = Status.UNAVAILABLE.withDescription(
177               "Failed to initialize xDS").withCause(e).asException();
178       listener.onNotServing(statusException);
179       initialStartFuture.set(statusException);
180       return;
181     }
182     xdsClient = xdsClientPool.getObject();
183     String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
184     if (listenerTemplate == null) {
185       StatusException statusException =
186           Status.UNAVAILABLE.withDescription(
187               "Can only support xDS v3 with listener resource name template").asException();
188       listener.onNotServing(statusException);
189       initialStartFuture.set(statusException);
190       xdsClient = xdsClientPool.returnObject(xdsClient);
191       return;
192     }
193     String replacement = listenerAddress;
194     if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
195       replacement = XdsClient.percentEncodePath(replacement);
196     }
197     discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
198   }
199 
200   @Override
shutdown()201   public Server shutdown() {
202     if (!shutdown.compareAndSet(false, true)) {
203       return this;
204     }
205     syncContext.execute(new Runnable() {
206       @Override
207       public void run() {
208         if (!delegate.isShutdown()) {
209           delegate.shutdown();
210         }
211         internalShutdown();
212       }
213     });
214     return this;
215   }
216 
217   @Override
shutdownNow()218   public Server shutdownNow() {
219     if (!shutdown.compareAndSet(false, true)) {
220       return this;
221     }
222     syncContext.execute(new Runnable() {
223       @Override
224       public void run() {
225         if (!delegate.isShutdown()) {
226           delegate.shutdownNow();
227         }
228         internalShutdown();
229         initialStartFuture.set(new IOException("server is forcefully shut down"));
230       }
231     });
232     return this;
233   }
234 
235   // Must run in SynchronizationContext
internalShutdown()236   private void internalShutdown() {
237     logger.log(Level.FINER, "Shutting down XdsServerWrapper");
238     if (discoveryState != null) {
239       discoveryState.shutdown();
240     }
241     if (xdsClient != null) {
242       xdsClient = xdsClientPool.returnObject(xdsClient);
243     }
244     if (restartTimer != null) {
245       restartTimer.cancel();
246     }
247     if (sharedTimeService) {
248       SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
249     }
250     isServing = false;
251     internalTerminationLatch.countDown();
252   }
253 
254   @Override
isShutdown()255   public boolean isShutdown() {
256     return shutdown.get();
257   }
258 
259   @Override
isTerminated()260   public boolean isTerminated() {
261     return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
262   }
263 
264   @Override
awaitTermination(long timeout, TimeUnit unit)265   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
266     long startTime = System.nanoTime();
267     if (!internalTerminationLatch.await(timeout, unit)) {
268       return false;
269     }
270     long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
271     return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
272   }
273 
274   @Override
awaitTermination()275   public void awaitTermination() throws InterruptedException {
276     internalTerminationLatch.await();
277     delegate.awaitTermination();
278   }
279 
280   @Override
getPort()281   public int getPort() {
282     return delegate.getPort();
283   }
284 
285   @Override
getListenSockets()286   public List<? extends SocketAddress> getListenSockets() {
287     return delegate.getListenSockets();
288   }
289 
290   @Override
getServices()291   public List<ServerServiceDefinition> getServices() {
292     return delegate.getServices();
293   }
294 
295   @Override
getImmutableServices()296   public List<ServerServiceDefinition> getImmutableServices() {
297     return delegate.getImmutableServices();
298   }
299 
300   @Override
getMutableServices()301   public List<ServerServiceDefinition> getMutableServices() {
302     return delegate.getMutableServices();
303   }
304 
305   // Must run in SynchronizationContext
startDelegateServer()306   private void startDelegateServer() {
307     if (restartTimer != null && restartTimer.isPending()) {
308       return;
309     }
310     if (isServing) {
311       return;
312     }
313     if (delegate.isShutdown()) {
314       delegate = delegateBuilder.build();
315     }
316     try {
317       delegate.start();
318       listener.onServing();
319       isServing = true;
320       if (!initialStarted) {
321         initialStarted = true;
322         initialStartFuture.set(null);
323       }
324       logger.log(Level.FINER, "Delegate server started.");
325     } catch (IOException e) {
326       logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
327       if (!initialStarted) {
328         initialStarted = true;
329         initialStartFuture.set(e);
330       } else {
331         listener.onNotServing(e);
332       }
333       restartTimer = syncContext.schedule(
334         new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
335     }
336   }
337 
338   private final class RestartTask implements Runnable {
339     @Override
run()340     public void run() {
341       startDelegateServer();
342     }
343   }
344 
345   private final class DiscoveryState implements ResourceWatcher<LdsUpdate> {
346     private final String resourceName;
347     // RDS resource name is the key.
348     private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
349     // Track pending RDS resources using rds name.
350     private final Set<String> pendingRds = new HashSet<>();
351     // Most recently discovered filter chains.
352     private List<FilterChain> filterChains = new ArrayList<>();
353     // Most recently discovered default filter chain.
354     @Nullable
355     private FilterChain defaultFilterChain;
356     private boolean stopped;
357     private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef
358         = new HashMap<>();
359     private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
360       @Override
361       public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
362           Metadata headers, ServerCallHandler<ReqT, RespT> next) {
363         return next.startCall(call, headers);
364       }
365     };
366 
DiscoveryState(String resourceName)367     private DiscoveryState(String resourceName) {
368       this.resourceName = checkNotNull(resourceName, "resourceName");
369       xdsClient.watchXdsResource(XdsListenerResource.getInstance(), resourceName, this);
370     }
371 
372     @Override
onChanged(final LdsUpdate update)373     public void onChanged(final LdsUpdate update) {
374       syncContext.execute(new Runnable() {
375         @Override
376         public void run() {
377           if (stopped) {
378             return;
379           }
380           logger.log(Level.FINEST, "Received Lds update {0}", update);
381           checkNotNull(update.listener(), "update");
382           if (!pendingRds.isEmpty()) {
383             // filter chain state has not yet been applied to filterChainSelectorManager and there
384             // are two sets of sslContextProviderSuppliers, so we release the old ones.
385             releaseSuppliersInFlight();
386             pendingRds.clear();
387           }
388           filterChains = update.listener().filterChains();
389           defaultFilterChain = update.listener().defaultFilterChain();
390           List<FilterChain> allFilterChains = filterChains;
391           if (defaultFilterChain != null) {
392             allFilterChains = new ArrayList<>(filterChains);
393             allFilterChains.add(defaultFilterChain);
394           }
395           Set<String> allRds = new HashSet<>();
396           for (FilterChain filterChain : allFilterChains) {
397             HttpConnectionManager hcm = filterChain.httpConnectionManager();
398             if (hcm.virtualHosts() == null) {
399               RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
400               if (rdsState == null) {
401                 rdsState = new RouteDiscoveryState(hcm.rdsName());
402                 routeDiscoveryStates.put(hcm.rdsName(), rdsState);
403                 xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
404                     hcm.rdsName(), rdsState);
405               }
406               if (rdsState.isPending) {
407                 pendingRds.add(hcm.rdsName());
408               }
409               allRds.add(hcm.rdsName());
410             }
411           }
412           for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
413             if (!allRds.contains(entry.getKey())) {
414               xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
415                   entry.getKey(), entry.getValue());
416             }
417           }
418           routeDiscoveryStates.keySet().retainAll(allRds);
419           if (pendingRds.isEmpty()) {
420             updateSelector();
421           }
422         }
423       });
424     }
425 
426     @Override
onResourceDoesNotExist(final String resourceName)427     public void onResourceDoesNotExist(final String resourceName) {
428       syncContext.execute(new Runnable() {
429         @Override
430         public void run() {
431           if (stopped) {
432             return;
433           }
434           StatusException statusException = Status.UNAVAILABLE.withDescription(
435                   "Listener " + resourceName + " unavailable").asException();
436           handleConfigNotFound(statusException);
437         }
438       });
439     }
440 
441     @Override
onError(final Status error)442     public void onError(final Status error) {
443       syncContext.execute(new Runnable() {
444         @Override
445         public void run() {
446           if (stopped) {
447             return;
448           }
449           logger.log(Level.FINE, "Error from XdsClient", error);
450           if (!isServing) {
451             listener.onNotServing(error.asException());
452           }
453         }
454       });
455     }
456 
shutdown()457     private void shutdown() {
458       stopped = true;
459       cleanUpRouteDiscoveryStates();
460       logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
461       xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this);
462       List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
463       filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
464       for (SslContextProviderSupplier s: toRelease) {
465         s.close();
466       }
467       releaseSuppliersInFlight();
468     }
469 
updateSelector()470     private void updateSelector() {
471       Map<FilterChain, AtomicReference<ServerRoutingConfig>> filterChainRouting = new HashMap<>();
472       savedRdsRoutingConfigRef.clear();
473       for (FilterChain filterChain: filterChains) {
474         filterChainRouting.put(filterChain, generateRoutingConfig(filterChain));
475       }
476       FilterChainSelector selector = new FilterChainSelector(
477           Collections.unmodifiableMap(filterChainRouting),
478           defaultFilterChain == null ? null : defaultFilterChain.sslContextProviderSupplier(),
479           defaultFilterChain == null ? new AtomicReference<ServerRoutingConfig>() :
480               generateRoutingConfig(defaultFilterChain));
481       List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
482       logger.log(Level.FINEST, "Updating selector {0}", selector);
483       filterChainSelectorManager.updateSelector(selector);
484       for (SslContextProviderSupplier e: toRelease) {
485         e.close();
486       }
487       startDelegateServer();
488     }
489 
generateRoutingConfig(FilterChain filterChain)490     private AtomicReference<ServerRoutingConfig> generateRoutingConfig(FilterChain filterChain) {
491       HttpConnectionManager hcm = filterChain.httpConnectionManager();
492       if (hcm.virtualHosts() != null) {
493         ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
494                 hcm.httpFilterConfigs(), hcm.virtualHosts());
495         return new AtomicReference<>(ServerRoutingConfig.create(hcm.virtualHosts(),interceptors));
496       } else {
497         RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
498         checkNotNull(rds, "rds");
499         AtomicReference<ServerRoutingConfig> serverRoutingConfigRef = new AtomicReference<>();
500         if (rds.savedVirtualHosts != null) {
501           ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
502               hcm.httpFilterConfigs(), rds.savedVirtualHosts);
503           ServerRoutingConfig serverRoutingConfig =
504               ServerRoutingConfig.create(rds.savedVirtualHosts, interceptors);
505           serverRoutingConfigRef.set(serverRoutingConfig);
506         } else {
507           serverRoutingConfigRef.set(ServerRoutingConfig.FAILING_ROUTING_CONFIG);
508         }
509         savedRdsRoutingConfigRef.put(filterChain, serverRoutingConfigRef);
510         return serverRoutingConfigRef;
511       }
512     }
513 
generatePerRouteInterceptors( List<NamedFilterConfig> namedFilterConfigs, List<VirtualHost> virtualHosts)514     private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
515         List<NamedFilterConfig> namedFilterConfigs, List<VirtualHost> virtualHosts) {
516       ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
517           new ImmutableMap.Builder<>();
518       for (VirtualHost virtualHost : virtualHosts) {
519         for (Route route : virtualHost.routes()) {
520           List<ServerInterceptor> filterInterceptors = new ArrayList<>();
521           Map<String, FilterConfig> selectedOverrideConfigs =
522               new HashMap<>(virtualHost.filterConfigOverrides());
523           selectedOverrideConfigs.putAll(route.filterConfigOverrides());
524           if (namedFilterConfigs != null) {
525             for (NamedFilterConfig namedFilterConfig : namedFilterConfigs) {
526               FilterConfig filterConfig = namedFilterConfig.filterConfig;
527               Filter filter = filterRegistry.get(filterConfig.typeUrl());
528               if (filter instanceof ServerInterceptorBuilder) {
529                 ServerInterceptor interceptor =
530                     ((ServerInterceptorBuilder) filter).buildServerInterceptor(
531                         filterConfig, selectedOverrideConfigs.get(namedFilterConfig.name));
532                 if (interceptor != null) {
533                   filterInterceptors.add(interceptor);
534                 }
535               } else {
536                 logger.log(Level.WARNING, "HttpFilterConfig(type URL: "
537                     + filterConfig.typeUrl() + ") is not supported on server-side. "
538                     + "Probably a bug at ClientXdsClient verification.");
539               }
540             }
541           }
542           ServerInterceptor interceptor = combineInterceptors(filterInterceptors);
543           perRouteInterceptors.put(route, interceptor);
544         }
545       }
546       return perRouteInterceptors.buildOrThrow();
547     }
548 
combineInterceptors(final List<ServerInterceptor> interceptors)549     private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
550       if (interceptors.isEmpty()) {
551         return noopInterceptor;
552       }
553       if (interceptors.size() == 1) {
554         return interceptors.get(0);
555       }
556       return new ServerInterceptor() {
557         @Override
558         public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
559             Metadata headers, ServerCallHandler<ReqT, RespT> next) {
560           // intercept forward
561           for (int i = interceptors.size() - 1; i >= 0; i--) {
562             next = InternalServerInterceptors.interceptCallHandlerCreate(
563                 interceptors.get(i), next);
564           }
565           return next.startCall(call, headers);
566         }
567       };
568     }
569 
handleConfigNotFound(StatusException exception)570     private void handleConfigNotFound(StatusException exception) {
571       cleanUpRouteDiscoveryStates();
572       List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
573       filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
574       for (SslContextProviderSupplier s: toRelease) {
575         s.close();
576       }
577       if (restartTimer != null) {
578         restartTimer.cancel();
579       }
580       if (!delegate.isShutdown()) {
581         delegate.shutdown();  // let in-progress calls finish
582       }
583       isServing = false;
584       listener.onNotServing(exception);
585     }
586 
cleanUpRouteDiscoveryStates()587     private void cleanUpRouteDiscoveryStates() {
588       for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
589         String rdsName = rdsState.resourceName;
590         logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
591         xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
592             rdsState);
593       }
594       routeDiscoveryStates.clear();
595       savedRdsRoutingConfigRef.clear();
596     }
597 
getSuppliersInUse()598     private List<SslContextProviderSupplier> getSuppliersInUse() {
599       List<SslContextProviderSupplier> toRelease = new ArrayList<>();
600       FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
601       if (selector != null) {
602         for (FilterChain f: selector.getRoutingConfigs().keySet()) {
603           if (f.sslContextProviderSupplier() != null) {
604             toRelease.add(f.sslContextProviderSupplier());
605           }
606         }
607         SslContextProviderSupplier defaultSupplier =
608                 selector.getDefaultSslContextProviderSupplier();
609         if (defaultSupplier != null) {
610           toRelease.add(defaultSupplier);
611         }
612       }
613       return toRelease;
614     }
615 
releaseSuppliersInFlight()616     private void releaseSuppliersInFlight() {
617       SslContextProviderSupplier supplier;
618       for (FilterChain filterChain : filterChains) {
619         supplier = filterChain.sslContextProviderSupplier();
620         if (supplier != null) {
621           supplier.close();
622         }
623       }
624       if (defaultFilterChain != null
625               && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
626         supplier.close();
627       }
628     }
629 
630     private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
631       private final String resourceName;
632       private ImmutableList<VirtualHost> savedVirtualHosts;
633       private boolean isPending = true;
634 
RouteDiscoveryState(String resourceName)635       private RouteDiscoveryState(String resourceName) {
636         this.resourceName = checkNotNull(resourceName, "resourceName");
637       }
638 
639       @Override
onChanged(final RdsUpdate update)640       public void onChanged(final RdsUpdate update) {
641         syncContext.execute(new Runnable() {
642           @Override
643           public void run() {
644             if (!routeDiscoveryStates.containsKey(resourceName)) {
645               return;
646             }
647             if (savedVirtualHosts == null && !isPending) {
648               logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
649             }
650             savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
651             updateRdsRoutingConfig();
652             maybeUpdateSelector();
653           }
654         });
655       }
656 
657       @Override
onResourceDoesNotExist(final String resourceName)658       public void onResourceDoesNotExist(final String resourceName) {
659         syncContext.execute(new Runnable() {
660           @Override
661           public void run() {
662             if (!routeDiscoveryStates.containsKey(resourceName)) {
663               return;
664             }
665             logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
666             savedVirtualHosts = null;
667             updateRdsRoutingConfig();
668             maybeUpdateSelector();
669           }
670         });
671       }
672 
673       @Override
onError(final Status error)674       public void onError(final Status error) {
675         syncContext.execute(new Runnable() {
676           @Override
677           public void run() {
678             if (!routeDiscoveryStates.containsKey(resourceName)) {
679               return;
680             }
681             logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
682                     new Object[]{resourceName, error});
683             maybeUpdateSelector();
684           }
685         });
686       }
687 
updateRdsRoutingConfig()688       private void updateRdsRoutingConfig() {
689         for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
690           if (resourceName.equals(filterChain.httpConnectionManager().rdsName())) {
691             ServerRoutingConfig updatedRoutingConfig;
692             if (savedVirtualHosts == null) {
693               updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
694             } else {
695               ImmutableMap<Route, ServerInterceptor> updatedInterceptors =
696                   generatePerRouteInterceptors(
697                       filterChain.httpConnectionManager().httpFilterConfigs(),
698                       savedVirtualHosts);
699               updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts,
700                   updatedInterceptors);
701             }
702             logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
703                 new Object[]{filterChain.name(), updatedRoutingConfig});
704             savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
705           }
706         }
707       }
708 
709       // Update the selector to use the most recently updated configs only after all rds have been
710       // discovered for the first time. Later changes on rds will be applied through virtual host
711       // list atomic ref.
maybeUpdateSelector()712       private void maybeUpdateSelector() {
713         isPending = false;
714         boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
715         if (isLastPending) {
716           updateSelector();
717         }
718       }
719     }
720   }
721 
722   @VisibleForTesting
723   final class ConfigApplyingInterceptor implements ServerInterceptor {
724     private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
725       @Override
726       public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
727           Metadata headers, ServerCallHandler<ReqT, RespT> next) {
728         return next.startCall(call, headers);
729       }
730     };
731 
732     @Override
733     public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
734         Metadata headers, ServerCallHandler<ReqT, RespT> next) {
735       AtomicReference<ServerRoutingConfig> routingConfigRef =
736           call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
737       ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
738           routingConfigRef.get();
739       if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
740         String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
741         call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
742         return new Listener<ReqT>() {};
743       }
744       List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
745       VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
746           virtualHosts, call.getAuthority());
747       if (virtualHost == null) {
748         call.close(
749             Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
750             new Metadata());
751         return new Listener<ReqT>() {};
752       }
753       Route selectedRoute = null;
754       MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
755       for (Route route : virtualHost.routes()) {
756         if (RoutingUtils.matchRoute(
757             route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
758           selectedRoute = route;
759           break;
760         }
761       }
762       if (selectedRoute == null) {
763         call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
764             new Metadata());
765         return new ServerCall.Listener<ReqT>() {};
766       }
767       if (selectedRoute.routeAction() != null) {
768         call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
769             + "route: only Route.non_forwarding_action should be allowed."), new Metadata());
770         return new ServerCall.Listener<ReqT>() {};
771       }
772       ServerInterceptor routeInterceptor = noopInterceptor;
773       Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
774       if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
775         routeInterceptor = perRouteInterceptors.get(selectedRoute);
776       }
777       return routeInterceptor.interceptCall(call, headers, next);
778     }
779   }
780 
781   /**
782    * The HttpConnectionManager level configuration.
783    */
784   @AutoValue
785   abstract static class ServerRoutingConfig {
786     @VisibleForTesting
787     static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
788         ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
789 
790     abstract ImmutableList<VirtualHost> virtualHosts();
791 
792     // Prebuilt per route server interceptors from http filter configs.
793     abstract ImmutableMap<Route, ServerInterceptor> interceptors();
794 
795     /**
796      * Server routing configuration.
797      * */
798     public static ServerRoutingConfig create(
799         ImmutableList<VirtualHost> virtualHosts,
800         ImmutableMap<Route, ServerInterceptor> interceptors) {
801       checkNotNull(virtualHosts, "virtualHosts");
802       checkNotNull(interceptors, "interceptors");
803       return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
804     }
805   }
806 }
807