1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.Stopwatch; 25 import com.google.common.base.Supplier; 26 import com.google.protobuf.Any; 27 import com.google.rpc.Code; 28 import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; 29 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; 30 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; 31 import io.grpc.Channel; 32 import io.grpc.Context; 33 import io.grpc.InternalLogId; 34 import io.grpc.ManagedChannel; 35 import io.grpc.Status; 36 import io.grpc.SynchronizationContext; 37 import io.grpc.SynchronizationContext.ScheduledHandle; 38 import io.grpc.internal.BackoffPolicy; 39 import io.grpc.stub.ClientCallStreamObserver; 40 import io.grpc.stub.ClientResponseObserver; 41 import io.grpc.stub.StreamObserver; 42 import io.grpc.xds.Bootstrapper.ServerInfo; 43 import io.grpc.xds.EnvoyProtoData.Node; 44 import io.grpc.xds.XdsClient.ResourceStore; 45 import io.grpc.xds.XdsClient.XdsResponseHandler; 46 import io.grpc.xds.XdsClientImpl.XdsChannelFactory; 47 import io.grpc.xds.XdsLogger.XdsLogLevel; 48 import java.util.Collection; 49 import java.util.Collections; 50 import java.util.HashMap; 51 import java.util.HashSet; 52 import java.util.List; 53 import java.util.Map; 54 import java.util.Set; 55 import java.util.concurrent.ScheduledExecutorService; 56 import java.util.concurrent.TimeUnit; 57 import javax.annotation.Nullable; 58 59 /** 60 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of 61 * the xDS RPC stream. 62 */ 63 final class ControlPlaneClient { 64 65 public static final String CLOSED_BY_SERVER = "Closed by server"; 66 private final SynchronizationContext syncContext; 67 private final InternalLogId logId; 68 private final XdsLogger logger; 69 private final ServerInfo serverInfo; 70 private final ManagedChannel channel; 71 private final XdsResponseHandler xdsResponseHandler; 72 private final ResourceStore resourceStore; 73 private final Context context; 74 private final ScheduledExecutorService timeService; 75 private final BackoffPolicy.Provider backoffPolicyProvider; 76 private final Stopwatch stopwatch; 77 private final Node bootstrapNode; 78 private final XdsClient.TimerLaunch timerLaunch; 79 80 // Last successfully applied version_info for each resource type. Starts with empty string. 81 // A version_info is used to update management server with client's most recent knowledge of 82 // resources. 83 private final Map<XdsResourceType<?>, String> versions = new HashMap<>(); 84 85 private boolean shutdown; 86 @Nullable 87 private AbstractAdsStream adsStream; 88 @Nullable 89 private BackoffPolicy retryBackoffPolicy; 90 @Nullable 91 private ScheduledHandle rpcRetryTimer; 92 93 /** An entity that manages ADS RPCs over a single channel. */ 94 // TODO: rename to XdsChannel ControlPlaneClient( XdsChannelFactory xdsChannelFactory, ServerInfo serverInfo, Node bootstrapNode, XdsResponseHandler xdsResponseHandler, ResourceStore resourceStore, Context context, ScheduledExecutorService timeService, SynchronizationContext syncContext, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier, XdsClient.TimerLaunch timerLaunch)95 ControlPlaneClient( 96 XdsChannelFactory xdsChannelFactory, 97 ServerInfo serverInfo, 98 Node bootstrapNode, 99 XdsResponseHandler xdsResponseHandler, 100 ResourceStore resourceStore, 101 Context context, 102 ScheduledExecutorService 103 timeService, 104 SynchronizationContext syncContext, 105 BackoffPolicy.Provider backoffPolicyProvider, 106 Supplier<Stopwatch> stopwatchSupplier, 107 XdsClient.TimerLaunch timerLaunch) { 108 this.serverInfo = checkNotNull(serverInfo, "serverInfo"); 109 this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); 110 this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); 111 this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber"); 112 this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode"); 113 this.context = checkNotNull(context, "context"); 114 this.timeService = checkNotNull(timeService, "timeService"); 115 this.syncContext = checkNotNull(syncContext, "syncContext"); 116 this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); 117 this.timerLaunch = checkNotNull(timerLaunch, "timerLaunch"); 118 stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); 119 logId = InternalLogId.allocate("xds-client", serverInfo.target()); 120 logger = XdsLogger.withLogId(logId); 121 logger.log(XdsLogLevel.INFO, "Created"); 122 } 123 124 /** The underlying channel. */ 125 // Currently, only externally used for LrsClient. channel()126 Channel channel() { 127 return channel; 128 } 129 shutdown()130 void shutdown() { 131 syncContext.execute(new Runnable() { 132 @Override 133 public void run() { 134 shutdown = true; 135 logger.log(XdsLogLevel.INFO, "Shutting down"); 136 if (adsStream != null) { 137 adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); 138 } 139 if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { 140 rpcRetryTimer.cancel(); 141 } 142 channel.shutdown(); 143 } 144 }); 145 } 146 147 @Override toString()148 public String toString() { 149 return logId.toString(); 150 } 151 152 /** 153 * Updates the resource subscription for the given resource type. 154 */ 155 // Must be synchronized. adjustResourceSubscription(XdsResourceType<?> resourceType)156 void adjustResourceSubscription(XdsResourceType<?> resourceType) { 157 if (isInBackoff()) { 158 return; 159 } 160 if (adsStream == null) { 161 startRpcStream(); 162 } 163 Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType); 164 if (resources != null) { 165 adsStream.sendDiscoveryRequest(resourceType, resources); 166 } 167 } 168 169 /** 170 * Accepts the update for the given resource type by updating the latest resource version 171 * and sends an ACK request to the management server. 172 */ 173 // Must be synchronized. ackResponse(XdsResourceType<?> type, String versionInfo, String nonce)174 void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) { 175 versions.put(type, versionInfo); 176 logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}", 177 type.typeName(), nonce, versionInfo); 178 Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type); 179 if (resources == null) { 180 resources = Collections.emptyList(); 181 } 182 adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null); 183 } 184 185 /** 186 * Rejects the update for the given resource type and sends an NACK request (request with last 187 * accepted version) to the management server. 188 */ 189 // Must be synchronized. nackResponse(XdsResourceType<?> type, String nonce, String errorDetail)190 void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) { 191 String versionInfo = versions.getOrDefault(type, ""); 192 logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}", 193 type.typeName(), nonce, versionInfo); 194 Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type); 195 if (resources == null) { 196 resources = Collections.emptyList(); 197 } 198 adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail); 199 } 200 201 /** 202 * Returns {@code true} if the resource discovery is currently in backoff. 203 */ 204 // Must be synchronized. isInBackoff()205 boolean isInBackoff() { 206 return rpcRetryTimer != null && rpcRetryTimer.isPending(); 207 } 208 isReady()209 boolean isReady() { 210 return adsStream != null && adsStream.isReady(); 211 } 212 213 /** 214 * Starts a timer for each requested resource that hasn't been responded to and 215 * has been waiting for the channel to get ready. 216 */ readyHandler()217 void readyHandler() { 218 if (!isReady()) { 219 return; 220 } 221 222 if (isInBackoff()) { 223 rpcRetryTimer.cancel(); 224 rpcRetryTimer = null; 225 } 226 227 timerLaunch.startSubscriberTimersIfNeeded(serverInfo); 228 } 229 230 /** 231 * Establishes the RPC connection by creating a new RPC stream on the given channel for 232 * xDS protocol communication. 233 */ 234 // Must be synchronized. startRpcStream()235 private void startRpcStream() { 236 checkState(adsStream == null, "Previous adsStream has not been cleared yet"); 237 adsStream = new AdsStreamV3(); 238 Context prevContext = context.attach(); 239 try { 240 adsStream.start(); 241 } finally { 242 context.detach(prevContext); 243 } 244 logger.log(XdsLogLevel.INFO, "ADS stream started"); 245 stopwatch.reset().start(); 246 } 247 248 @VisibleForTesting 249 final class RpcRetryTask implements Runnable { 250 @Override run()251 public void run() { 252 if (shutdown) { 253 return; 254 } 255 startRpcStream(); 256 Set<XdsResourceType<?>> subscribedResourceTypes = 257 new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values()); 258 for (XdsResourceType<?> type : subscribedResourceTypes) { 259 Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type); 260 if (resources != null) { 261 adsStream.sendDiscoveryRequest(type, resources); 262 } 263 } 264 xdsResponseHandler.handleStreamRestarted(serverInfo); 265 } 266 } 267 268 @VisibleForTesting 269 @Nullable fromTypeUrl(String typeUrl)270 XdsResourceType<?> fromTypeUrl(String typeUrl) { 271 return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl); 272 } 273 274 private abstract class AbstractAdsStream { 275 private boolean responseReceived; 276 private boolean closed; 277 // Response nonce for the most recently received discovery responses of each resource type. 278 // Client initiated requests start response nonce with empty string. 279 // Nonce in each response is echoed back in the following ACK/NACK request. It is 280 // used for management server to identify which response the client is ACKing/NACking. 281 // To avoid confusion, client-initiated requests will always use the nonce in 282 // most recently received responses of each resource type. 283 private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>(); 284 start()285 abstract void start(); 286 sendError(Exception error)287 abstract void sendError(Exception error); 288 isReady()289 abstract boolean isReady(); 290 291 /** 292 * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and 293 * {@code errorDetail}. Used for reacting to a specific discovery response. For 294 * client-initiated discovery requests, use {@link 295 * #sendDiscoveryRequest(XdsResourceType, Collection)}. 296 */ sendDiscoveryRequest(XdsResourceType<?> type, String version, Collection<String> resources, String nonce, @Nullable String errorDetail)297 abstract void sendDiscoveryRequest(XdsResourceType<?> type, String version, 298 Collection<String> resources, String nonce, @Nullable String errorDetail); 299 300 /** 301 * Sends a client-initiated discovery request. 302 */ sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources)303 final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) { 304 logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources); 305 sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources, 306 respNonces.getOrDefault(type, ""), null); 307 } 308 handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources, String nonce)309 final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources, 310 String nonce) { 311 checkNotNull(type, "type"); 312 if (closed) { 313 return; 314 } 315 responseReceived = true; 316 respNonces.put(type, nonce); 317 xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce); 318 } 319 handleRpcError(Throwable t)320 final void handleRpcError(Throwable t) { 321 handleRpcStreamClosed(Status.fromThrowable(t)); 322 } 323 handleRpcCompleted()324 final void handleRpcCompleted() { 325 handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); 326 } 327 handleRpcStreamClosed(Status error)328 private void handleRpcStreamClosed(Status error) { 329 if (closed) { 330 return; 331 } 332 333 if (responseReceived || retryBackoffPolicy == null) { 334 // Reset the backoff sequence if had received a response, or backoff sequence 335 // has never been initialized. 336 retryBackoffPolicy = backoffPolicyProvider.get(); 337 } 338 // Need this here to avoid tsan race condition in XdsClientImplTestBase.sendToNonexistentHost 339 long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); 340 long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed); 341 rpcRetryTimer = syncContext.schedule( 342 new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); 343 344 checkArgument(!error.isOk(), "unexpected OK status"); 345 String errorMsg = error.getDescription() != null 346 && error.getDescription().equals(CLOSED_BY_SERVER) 347 ? "ADS stream closed with status {0}: {1}. Cause: {2}" 348 : "ADS stream failed with status {0}: {1}. Cause: {2}"; 349 logger.log( 350 XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause()); 351 closed = true; 352 xdsResponseHandler.handleStreamClosed(error); 353 cleanUp(); 354 355 logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos); 356 } 357 close(Exception error)358 private void close(Exception error) { 359 if (closed) { 360 return; 361 } 362 closed = true; 363 cleanUp(); 364 sendError(error); 365 } 366 cleanUp()367 private void cleanUp() { 368 if (adsStream == this) { 369 adsStream = null; 370 } 371 } 372 } 373 374 private final class AdsStreamV3 extends AbstractAdsStream { 375 private StreamObserver<DiscoveryRequest> requestWriter; 376 377 @Override isReady()378 public boolean isReady() { 379 return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady(); 380 } 381 382 @Override start()383 void start() { 384 AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = 385 AggregatedDiscoveryServiceGrpc.newStub(channel); 386 StreamObserver<DiscoveryResponse> responseReader = 387 new ClientResponseObserver<DiscoveryRequest,DiscoveryResponse>() { 388 389 @Override 390 public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) { 391 requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler); 392 } 393 394 @Override 395 public void onNext(final DiscoveryResponse response) { 396 syncContext.execute(new Runnable() { 397 @Override 398 public void run() { 399 XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl()); 400 if (logger.isLoggable(XdsLogLevel.DEBUG)) { 401 logger.log( 402 XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, 403 MessagePrinter.print(response)); 404 } 405 if (type == null) { 406 logger.log( 407 XdsLogLevel.WARNING, 408 "Ignore an unknown type of DiscoveryResponse: {0}", 409 response.getTypeUrl()); 410 return; 411 } 412 handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), 413 response.getNonce()); 414 } 415 }); 416 } 417 418 @Override 419 public void onError(final Throwable t) { 420 syncContext.execute(new Runnable() { 421 @Override 422 public void run() { 423 handleRpcError(t); 424 } 425 }); 426 } 427 428 @Override 429 public void onCompleted() { 430 syncContext.execute(new Runnable() { 431 @Override 432 public void run() { 433 handleRpcCompleted(); 434 } 435 }); 436 } 437 }; 438 requestWriter = stub.streamAggregatedResources(responseReader); 439 } 440 441 @Override sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo, Collection<String> resources, String nonce, @Nullable String errorDetail)442 void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo, 443 Collection<String> resources, String nonce, 444 @Nullable String errorDetail) { 445 checkState(requestWriter != null, "ADS stream has not been started"); 446 DiscoveryRequest.Builder builder = 447 DiscoveryRequest.newBuilder() 448 .setVersionInfo(versionInfo) 449 .setNode(bootstrapNode.toEnvoyProtoNode()) 450 .addAllResourceNames(resources) 451 .setTypeUrl(type.typeUrl()) 452 .setResponseNonce(nonce); 453 if (errorDetail != null) { 454 com.google.rpc.Status error = 455 com.google.rpc.Status.newBuilder() 456 .setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code 457 .setMessage(errorDetail) 458 .build(); 459 builder.setErrorDetail(error); 460 } 461 DiscoveryRequest request = builder.build(); 462 requestWriter.onNext(request); 463 if (logger.isLoggable(XdsLogLevel.DEBUG)) { 464 logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request)); 465 } 466 } 467 468 @Override sendError(Exception error)469 void sendError(Exception error) { 470 requestWriter.onError(error); 471 } 472 } 473 } 474