1 /* 2 * Copyright 2021 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 18 package io.grpc.xds; 19 20 import com.google.common.collect.ImmutableMap; 21 import com.google.protobuf.Any; 22 import com.google.protobuf.Message; 23 import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; 24 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; 25 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; 26 import io.grpc.SynchronizationContext; 27 import io.grpc.stub.StreamObserver; 28 import java.util.HashMap; 29 import java.util.HashSet; 30 import java.util.Map; 31 import java.util.Set; 32 import java.util.concurrent.ConcurrentHashMap; 33 import java.util.concurrent.atomic.AtomicInteger; 34 import java.util.logging.Level; 35 import java.util.logging.Logger; 36 37 /** 38 * A bidi-stream service that acts as a local xDS Control Plane. 39 * It accepts xDS config injection through a method call {@link #setXdsConfig}. Handling AdsStream 40 * response or updating xds config are run in syncContext. 41 * 42 * <p>The service maintains lookup tables: 43 * Subscriber table: map from each resource type, to a map from each client to subscribed resource 44 * names set. 45 * Resources table: store the resources in raw proto message. 46 * 47 * <p>xDS protocol requires version/nonce to avoid various race conditions. In this impl: 48 * Version stores the latest version number per each resource type. It is simply bumped up on each 49 * xds config set. 50 * Nonce stores the nonce number for each resource type and for each client. Incoming xDS requests 51 * share the same proto message type but may at different resources update phases: 52 * 1) Original: an initial xDS request. 53 * 2) NACK an xDS response. 54 * 3) ACK an xDS response. 55 * The service is capable of distinguish these cases when handling the request. 56 */ 57 final class XdsTestControlPlaneService extends 58 AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase { 59 private static final Logger logger = Logger.getLogger(XdsTestControlPlaneService.class.getName()); 60 61 private final SynchronizationContext syncContext = new SynchronizationContext( 62 new Thread.UncaughtExceptionHandler() { 63 @Override 64 public void uncaughtException(Thread t, Throwable e) { 65 throw new AssertionError(e); 66 } 67 }); 68 69 static final String ADS_TYPE_URL_LDS = 70 "type.googleapis.com/envoy.config.listener.v3.Listener"; 71 static final String ADS_TYPE_URL_RDS = 72 "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; 73 static final String ADS_TYPE_URL_CDS = 74 "type.googleapis.com/envoy.config.cluster.v3.Cluster"; 75 static final String ADS_TYPE_URL_EDS = 76 "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; 77 78 private final Map<String, HashMap<String, Message>> xdsResources = new HashMap<>(); 79 private ImmutableMap<String, Map<StreamObserver<DiscoveryResponse>, Set<String>>> subscribers 80 = ImmutableMap.of( 81 ADS_TYPE_URL_LDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>(), 82 ADS_TYPE_URL_RDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>(), 83 ADS_TYPE_URL_CDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>(), 84 ADS_TYPE_URL_EDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, Set<String>>()); 85 private final ImmutableMap<String, AtomicInteger> xdsVersions = ImmutableMap.of( 86 ADS_TYPE_URL_LDS, new AtomicInteger(1), 87 ADS_TYPE_URL_RDS, new AtomicInteger(1), 88 ADS_TYPE_URL_CDS, new AtomicInteger(1), 89 ADS_TYPE_URL_EDS, new AtomicInteger(1) 90 ); 91 private final ImmutableMap<String, Map<StreamObserver<DiscoveryResponse>, AtomicInteger>> 92 xdsNonces = ImmutableMap.of( 93 ADS_TYPE_URL_LDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>(), 94 ADS_TYPE_URL_RDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>(), 95 ADS_TYPE_URL_CDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>(), 96 ADS_TYPE_URL_EDS, new ConcurrentHashMap<StreamObserver<DiscoveryResponse>, AtomicInteger>() 97 ); 98 99 100 // treat all the resource types as state-of-the-world, send back all resources of a particular 101 // type when any of them change. setXdsConfig(final String type, final Map<String, T> resources)102 public <T extends Message> void setXdsConfig(final String type, final Map<String, T> resources) { 103 logger.log(Level.FINE, "setting config {0} {1}", new Object[]{type, resources}); 104 syncContext.execute(new Runnable() { 105 @Override 106 public void run() { 107 HashMap<String, Message> copyResources = new HashMap<>(resources); 108 xdsResources.put(type, copyResources); 109 String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndDecrement()); 110 111 for (Map.Entry<StreamObserver<DiscoveryResponse>, Set<String>> entry : 112 subscribers.get(type).entrySet()) { 113 DiscoveryResponse response = generateResponse(type, newVersionInfo, 114 String.valueOf(xdsNonces.get(type).get(entry.getKey()).incrementAndGet()), 115 entry.getValue()); 116 entry.getKey().onNext(response); 117 } 118 } 119 }); 120 } 121 122 @Override streamAggregatedResources( final StreamObserver<DiscoveryResponse> responseObserver)123 public StreamObserver<DiscoveryRequest> streamAggregatedResources( 124 final StreamObserver<DiscoveryResponse> responseObserver) { 125 final StreamObserver<DiscoveryRequest> requestObserver = 126 new StreamObserver<DiscoveryRequest>() { 127 @Override 128 public void onNext(final DiscoveryRequest value) { 129 syncContext.execute(new Runnable() { 130 @Override 131 public void run() { 132 logger.log(Level.FINEST, "control plane received request {0}", value); 133 if (value.hasErrorDetail()) { 134 logger.log(Level.FINE, "control plane received nack resource {0}, error {1}", 135 new Object[]{value.getResourceNamesList(), value.getErrorDetail()}); 136 return; 137 } 138 String resourceType = value.getTypeUrl(); 139 if (!value.getResponseNonce().isEmpty() 140 && !String.valueOf(xdsNonces.get(resourceType)).equals(value.getResponseNonce())) { 141 logger.log(Level.FINE, "Resource nonce does not match, ignore."); 142 return; 143 } 144 Set<String> requestedResourceNames = new HashSet<>(value.getResourceNamesList()); 145 if (subscribers.get(resourceType).containsKey(responseObserver) 146 && subscribers.get(resourceType).get(responseObserver) 147 .equals(requestedResourceNames)) { 148 logger.log(Level.FINEST, "control plane received ack for resource: {0}", 149 value.getResourceNamesList()); 150 return; 151 } 152 if (!xdsNonces.get(resourceType).containsKey(responseObserver)) { 153 xdsNonces.get(resourceType).put(responseObserver, new AtomicInteger(0)); 154 } 155 DiscoveryResponse response = generateResponse(resourceType, 156 String.valueOf(xdsVersions.get(resourceType)), 157 String.valueOf(xdsNonces.get(resourceType).get(responseObserver)), 158 requestedResourceNames); 159 responseObserver.onNext(response); 160 subscribers.get(resourceType).put(responseObserver, requestedResourceNames); 161 } 162 }); 163 } 164 165 @Override 166 public void onError(Throwable t) { 167 logger.log(Level.FINE, "Control plane error: {0} ", t); 168 onCompleted(); 169 } 170 171 @Override 172 public void onCompleted() { 173 responseObserver.onCompleted(); 174 for (String type : subscribers.keySet()) { 175 subscribers.get(type).remove(responseObserver); 176 xdsNonces.get(type).remove(responseObserver); 177 } 178 } 179 }; 180 return requestObserver; 181 } 182 183 //must run in syncContext generateResponse(String resourceType, String version, String nonce, Set<String> resourceNames)184 private DiscoveryResponse generateResponse(String resourceType, String version, String nonce, 185 Set<String> resourceNames) { 186 DiscoveryResponse.Builder responseBuilder = DiscoveryResponse.newBuilder() 187 .setTypeUrl(resourceType) 188 .setVersionInfo(version) 189 .setNonce(nonce); 190 for (String resourceName: resourceNames) { 191 if (xdsResources.containsKey(resourceType) 192 && xdsResources.get(resourceType).containsKey(resourceName)) { 193 responseBuilder.addResources(Any.pack(xdsResources.get(resourceType).get(resourceName), 194 resourceType)); 195 } 196 } 197 return responseBuilder.build(); 198 } 199 } 200