xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2021 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 
18 package io.grpc.xds;
19 
20 import com.google.common.collect.ImmutableMap;
21 import com.google.protobuf.Any;
22 import com.google.protobuf.Message;
23 import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
24 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
25 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
26 import io.grpc.SynchronizationContext;
27 import io.grpc.stub.StreamObserver;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36 
37 /**
38 * A bidi-stream service that acts as a local xDS Control Plane.
39  * It accepts xDS config injection through a method call {@link #setXdsConfig}. Handling AdsStream
40  * response or updating xds config are run in syncContext.
41  *
42  * <p>The service maintains lookup tables:
43  * Subscriber table: map from each resource type, to a map from each client to subscribed resource
44  * names set.
45  * Resources table: store the resources in raw proto message.
46  *
47  * <p>xDS protocol requires version/nonce to avoid various race conditions. In this impl:
48  * Version stores the latest version number per each resource type. It is simply bumped up on each
49  * xds config set.
50  * Nonce stores the nonce number for each resource type and for each client. Incoming xDS requests
51  * share the same proto message type but may at different resources update phases:
52  * 1) Original: an initial xDS request.
53  * 2) NACK an xDS response.
54  * 3) ACK an xDS response.
55  * The service is capable of distinguish these cases when handling the request.
56  */
57 final class XdsTestControlPlaneService extends
58     AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase {
59   private static final Logger logger = Logger.getLogger(XdsTestControlPlaneService.class.getName());
60 
61   private final SynchronizationContext syncContext = new SynchronizationContext(
62       new Thread.UncaughtExceptionHandler() {
63         @Override
64         public void uncaughtException(Thread t, Throwable e) {
65           throw new AssertionError(e);
66         }
67       });
68 
69   static final String ADS_TYPE_URL_LDS =
70       "type.googleapis.com/envoy.config.listener.v3.Listener";
71   static final String ADS_TYPE_URL_RDS =
72       "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
73   static final String ADS_TYPE_URL_CDS =
74       "type.googleapis.com/envoy.config.cluster.v3.Cluster";
75   static final String ADS_TYPE_URL_EDS =
76       "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
77 
78   private final Map<String, HashMap<String, Message>> xdsResources = new HashMap<>();
79   private ImmutableMap<String, Map<StreamObserver<DiscoveryResponse>, Set<String>>> subscribers
80       = ImmutableMap.of(
81       ADS_TYPE_URL_LDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>(),
82       ADS_TYPE_URL_RDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>(),
83       ADS_TYPE_URL_CDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>(),
84       ADS_TYPE_URL_EDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>());
85   private final ImmutableMap<String, AtomicInteger> xdsVersions = ImmutableMap.of(
86       ADS_TYPE_URL_LDS, new AtomicInteger(1),
87       ADS_TYPE_URL_RDS, new AtomicInteger(1),
88       ADS_TYPE_URL_CDS, new AtomicInteger(1),
89       ADS_TYPE_URL_EDS, new AtomicInteger(1)
90   );
91   private final ImmutableMap<String, Map<StreamObserver<DiscoveryResponse>, AtomicInteger>>
92       xdsNonces = ImmutableMap.of(
93       ADS_TYPE_URL_LDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>(),
94       ADS_TYPE_URL_RDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>(),
95       ADS_TYPE_URL_CDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>(),
96       ADS_TYPE_URL_EDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>()
97   );
98 
99 
100   // treat all the resource types as state-of-the-world, send back all resources of a particular
101   // type when any of them change.
setXdsConfig(final String type, final Map<String, T> resources)102   public <T extends Message> void setXdsConfig(final String type, final Map<String, T> resources) {
103     logger.log(Level.FINE, "setting config {0} {1}", new Object[]{type, resources});
104     syncContext.execute(new Runnable() {
105       @Override
106       public void run() {
107         HashMap<String, Message> copyResources =  new HashMap<>(resources);
108         xdsResources.put(type, copyResources);
109         String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndDecrement());
110 
111         for (Map.Entry<StreamObserver<DiscoveryResponse>, Set<String>> entry :
112             subscribers.get(type).entrySet()) {
113           DiscoveryResponse response = generateResponse(type, newVersionInfo,
114               String.valueOf(xdsNonces.get(type).get(entry.getKey()).incrementAndGet()),
115               entry.getValue());
116           entry.getKey().onNext(response);
117         }
118       }
119     });
120   }
121 
122   @Override
streamAggregatedResources( final StreamObserver<DiscoveryResponse> responseObserver)123   public StreamObserver<DiscoveryRequest> streamAggregatedResources(
124       final StreamObserver<DiscoveryResponse> responseObserver) {
125     final StreamObserver<DiscoveryRequest> requestObserver =
126         new StreamObserver<DiscoveryRequest>() {
127       @Override
128       public void onNext(final DiscoveryRequest value) {
129         syncContext.execute(new Runnable() {
130           @Override
131           public void run() {
132             logger.log(Level.FINEST, "control plane received request {0}", value);
133             if (value.hasErrorDetail()) {
134               logger.log(Level.FINE, "control plane received nack resource {0}, error {1}",
135                   new Object[]{value.getResourceNamesList(), value.getErrorDetail()});
136               return;
137             }
138             String resourceType = value.getTypeUrl();
139             if (!value.getResponseNonce().isEmpty()
140                 && !String.valueOf(xdsNonces.get(resourceType)).equals(value.getResponseNonce())) {
141               logger.log(Level.FINE, "Resource nonce does not match, ignore.");
142               return;
143             }
144             Set<String> requestedResourceNames = new HashSet<>(value.getResourceNamesList());
145             if (subscribers.get(resourceType).containsKey(responseObserver)
146                 && subscribers.get(resourceType).get(responseObserver)
147                     .equals(requestedResourceNames)) {
148               logger.log(Level.FINEST, "control plane received ack for resource: {0}",
149                   value.getResourceNamesList());
150               return;
151             }
152             if (!xdsNonces.get(resourceType).containsKey(responseObserver)) {
153               xdsNonces.get(resourceType).put(responseObserver, new AtomicInteger(0));
154             }
155             DiscoveryResponse response = generateResponse(resourceType,
156                 String.valueOf(xdsVersions.get(resourceType)),
157                 String.valueOf(xdsNonces.get(resourceType).get(responseObserver)),
158                 requestedResourceNames);
159             responseObserver.onNext(response);
160             subscribers.get(resourceType).put(responseObserver, requestedResourceNames);
161           }
162         });
163       }
164 
165       @Override
166       public void onError(Throwable t) {
167         logger.log(Level.FINE, "Control plane error: {0} ", t);
168         onCompleted();
169       }
170 
171       @Override
172       public void onCompleted() {
173         responseObserver.onCompleted();
174         for (String type : subscribers.keySet()) {
175           subscribers.get(type).remove(responseObserver);
176           xdsNonces.get(type).remove(responseObserver);
177         }
178       }
179     };
180     return requestObserver;
181   }
182 
183   //must run in syncContext
generateResponse(String resourceType, String version, String nonce, Set<String> resourceNames)184   private DiscoveryResponse generateResponse(String resourceType, String version, String nonce,
185                                              Set<String> resourceNames) {
186     DiscoveryResponse.Builder responseBuilder = DiscoveryResponse.newBuilder()
187         .setTypeUrl(resourceType)
188         .setVersionInfo(version)
189         .setNonce(nonce);
190     for (String resourceName: resourceNames) {
191       if (xdsResources.containsKey(resourceType)
192           && xdsResources.get(resourceType).containsKey(resourceName)) {
193         responseBuilder.addResources(Any.pack(xdsResources.get(resourceType).get(resourceName),
194             resourceType));
195       }
196     }
197     return responseBuilder.build();
198   }
199 }
200