xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.base.Supplier;
26 import com.google.protobuf.Any;
27 import com.google.rpc.Code;
28 import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
29 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
30 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
31 import io.grpc.Channel;
32 import io.grpc.Context;
33 import io.grpc.InternalLogId;
34 import io.grpc.ManagedChannel;
35 import io.grpc.Status;
36 import io.grpc.SynchronizationContext;
37 import io.grpc.SynchronizationContext.ScheduledHandle;
38 import io.grpc.internal.BackoffPolicy;
39 import io.grpc.stub.ClientCallStreamObserver;
40 import io.grpc.stub.ClientResponseObserver;
41 import io.grpc.stub.StreamObserver;
42 import io.grpc.xds.Bootstrapper.ServerInfo;
43 import io.grpc.xds.EnvoyProtoData.Node;
44 import io.grpc.xds.XdsClient.ResourceStore;
45 import io.grpc.xds.XdsClient.XdsResponseHandler;
46 import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
47 import io.grpc.xds.XdsLogger.XdsLogLevel;
48 import java.util.Collection;
49 import java.util.Collections;
50 import java.util.HashMap;
51 import java.util.HashSet;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Set;
55 import java.util.concurrent.ScheduledExecutorService;
56 import java.util.concurrent.TimeUnit;
57 import javax.annotation.Nullable;
58 
59 /**
60  * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
61  * the xDS RPC stream.
62  */
63 final class ControlPlaneClient {
64 
65   public static final String CLOSED_BY_SERVER = "Closed by server";
66   private final SynchronizationContext syncContext;
67   private final InternalLogId logId;
68   private final XdsLogger logger;
69   private final ServerInfo serverInfo;
70   private final ManagedChannel channel;
71   private final XdsResponseHandler xdsResponseHandler;
72   private final ResourceStore resourceStore;
73   private final Context context;
74   private final ScheduledExecutorService timeService;
75   private final BackoffPolicy.Provider backoffPolicyProvider;
76   private final Stopwatch stopwatch;
77   private final Node bootstrapNode;
78   private final XdsClient.TimerLaunch timerLaunch;
79 
80   // Last successfully applied version_info for each resource type. Starts with empty string.
81   // A version_info is used to update management server with client's most recent knowledge of
82   // resources.
83   private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
84 
85   private boolean shutdown;
86   @Nullable
87   private AbstractAdsStream adsStream;
88   @Nullable
89   private BackoffPolicy retryBackoffPolicy;
90   @Nullable
91   private ScheduledHandle rpcRetryTimer;
92 
93   /** An entity that manages ADS RPCs over a single channel. */
94   // TODO: rename to XdsChannel
ControlPlaneClient( XdsChannelFactory xdsChannelFactory, ServerInfo serverInfo, Node bootstrapNode, XdsResponseHandler xdsResponseHandler, ResourceStore resourceStore, Context context, ScheduledExecutorService timeService, SynchronizationContext syncContext, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier, XdsClient.TimerLaunch timerLaunch)95   ControlPlaneClient(
96       XdsChannelFactory xdsChannelFactory,
97       ServerInfo serverInfo,
98       Node bootstrapNode,
99       XdsResponseHandler xdsResponseHandler,
100       ResourceStore resourceStore,
101       Context context,
102       ScheduledExecutorService
103       timeService,
104       SynchronizationContext syncContext,
105       BackoffPolicy.Provider backoffPolicyProvider,
106       Supplier<Stopwatch> stopwatchSupplier,
107       XdsClient.TimerLaunch timerLaunch) {
108     this.serverInfo = checkNotNull(serverInfo, "serverInfo");
109     this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
110     this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
111     this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
112     this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
113     this.context = checkNotNull(context, "context");
114     this.timeService = checkNotNull(timeService, "timeService");
115     this.syncContext = checkNotNull(syncContext, "syncContext");
116     this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
117     this.timerLaunch  = checkNotNull(timerLaunch, "timerLaunch");
118     stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
119     logId = InternalLogId.allocate("xds-client", serverInfo.target());
120     logger = XdsLogger.withLogId(logId);
121     logger.log(XdsLogLevel.INFO, "Created");
122   }
123 
124   /** The underlying channel. */
125   // Currently, only externally used for LrsClient.
channel()126   Channel channel() {
127     return channel;
128   }
129 
shutdown()130   void shutdown() {
131     syncContext.execute(new Runnable() {
132       @Override
133       public void run() {
134         shutdown = true;
135         logger.log(XdsLogLevel.INFO, "Shutting down");
136         if (adsStream != null) {
137           adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
138         }
139         if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
140           rpcRetryTimer.cancel();
141         }
142         channel.shutdown();
143       }
144     });
145   }
146 
147   @Override
toString()148   public String toString() {
149     return logId.toString();
150   }
151 
152   /**
153    * Updates the resource subscription for the given resource type.
154    */
155   // Must be synchronized.
adjustResourceSubscription(XdsResourceType<?> resourceType)156   void adjustResourceSubscription(XdsResourceType<?> resourceType) {
157     if (isInBackoff()) {
158       return;
159     }
160     if (adsStream == null) {
161       startRpcStream();
162     }
163     Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
164     if (resources != null) {
165       adsStream.sendDiscoveryRequest(resourceType, resources);
166     }
167   }
168 
169   /**
170    * Accepts the update for the given resource type by updating the latest resource version
171    * and sends an ACK request to the management server.
172    */
173   // Must be synchronized.
ackResponse(XdsResourceType<?> type, String versionInfo, String nonce)174   void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
175     versions.put(type, versionInfo);
176     logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
177         type.typeName(), nonce, versionInfo);
178     Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
179     if (resources == null) {
180       resources = Collections.emptyList();
181     }
182     adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
183   }
184 
185   /**
186    * Rejects the update for the given resource type and sends an NACK request (request with last
187    * accepted version) to the management server.
188    */
189   // Must be synchronized.
nackResponse(XdsResourceType<?> type, String nonce, String errorDetail)190   void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
191     String versionInfo = versions.getOrDefault(type, "");
192     logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
193         type.typeName(), nonce, versionInfo);
194     Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
195     if (resources == null) {
196       resources = Collections.emptyList();
197     }
198     adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
199   }
200 
201   /**
202    * Returns {@code true} if the resource discovery is currently in backoff.
203    */
204   // Must be synchronized.
isInBackoff()205   boolean isInBackoff() {
206     return rpcRetryTimer != null && rpcRetryTimer.isPending();
207   }
208 
isReady()209   boolean isReady() {
210     return adsStream != null && adsStream.isReady();
211   }
212 
213   /**
214    * Starts a timer for each requested resource that hasn't been responded to and
215    * has been waiting for the channel to get ready.
216    */
readyHandler()217   void readyHandler() {
218     if (!isReady()) {
219       return;
220     }
221 
222     if (isInBackoff()) {
223       rpcRetryTimer.cancel();
224       rpcRetryTimer = null;
225     }
226 
227     timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
228   }
229 
230   /**
231    * Establishes the RPC connection by creating a new RPC stream on the given channel for
232    * xDS protocol communication.
233    */
234   // Must be synchronized.
startRpcStream()235   private void startRpcStream() {
236     checkState(adsStream == null, "Previous adsStream has not been cleared yet");
237     adsStream = new AdsStreamV3();
238     Context prevContext = context.attach();
239     try {
240       adsStream.start();
241     } finally {
242       context.detach(prevContext);
243     }
244     logger.log(XdsLogLevel.INFO, "ADS stream started");
245     stopwatch.reset().start();
246   }
247 
248   @VisibleForTesting
249   final class RpcRetryTask implements Runnable {
250     @Override
run()251     public void run() {
252       if (shutdown) {
253         return;
254       }
255       startRpcStream();
256       Set<XdsResourceType<?>> subscribedResourceTypes =
257           new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
258       for (XdsResourceType<?> type : subscribedResourceTypes) {
259         Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
260         if (resources != null) {
261           adsStream.sendDiscoveryRequest(type, resources);
262         }
263       }
264       xdsResponseHandler.handleStreamRestarted(serverInfo);
265     }
266   }
267 
268   @VisibleForTesting
269   @Nullable
fromTypeUrl(String typeUrl)270   XdsResourceType<?> fromTypeUrl(String typeUrl) {
271     return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
272   }
273 
274   private abstract class AbstractAdsStream {
275     private boolean responseReceived;
276     private boolean closed;
277     // Response nonce for the most recently received discovery responses of each resource type.
278     // Client initiated requests start response nonce with empty string.
279     // Nonce in each response is echoed back in the following ACK/NACK request. It is
280     // used for management server to identify which response the client is ACKing/NACking.
281     // To avoid confusion, client-initiated requests will always use the nonce in
282     // most recently received responses of each resource type.
283     private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
284 
start()285     abstract void start();
286 
sendError(Exception error)287     abstract void sendError(Exception error);
288 
isReady()289     abstract boolean isReady();
290 
291     /**
292      * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
293      * {@code errorDetail}. Used for reacting to a specific discovery response. For
294      * client-initiated discovery requests, use {@link
295      * #sendDiscoveryRequest(XdsResourceType, Collection)}.
296      */
sendDiscoveryRequest(XdsResourceType<?> type, String version, Collection<String> resources, String nonce, @Nullable String errorDetail)297     abstract void sendDiscoveryRequest(XdsResourceType<?> type, String version,
298         Collection<String> resources, String nonce, @Nullable String errorDetail);
299 
300     /**
301      * Sends a client-initiated discovery request.
302      */
sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources)303     final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
304       logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
305       sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
306           respNonces.getOrDefault(type, ""), null);
307     }
308 
handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources, String nonce)309     final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
310                                  String nonce) {
311       checkNotNull(type, "type");
312       if (closed) {
313         return;
314       }
315       responseReceived = true;
316       respNonces.put(type, nonce);
317       xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
318     }
319 
handleRpcError(Throwable t)320     final void handleRpcError(Throwable t) {
321       handleRpcStreamClosed(Status.fromThrowable(t));
322     }
323 
handleRpcCompleted()324     final void handleRpcCompleted() {
325       handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
326     }
327 
handleRpcStreamClosed(Status error)328     private void handleRpcStreamClosed(Status error) {
329       if (closed) {
330         return;
331       }
332 
333       if (responseReceived || retryBackoffPolicy == null) {
334         // Reset the backoff sequence if had received a response, or backoff sequence
335         // has never been initialized.
336         retryBackoffPolicy = backoffPolicyProvider.get();
337       }
338       // Need this here to avoid tsan race condition in XdsClientImplTestBase.sendToNonexistentHost
339       long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
340       long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
341       rpcRetryTimer = syncContext.schedule(
342           new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
343 
344       checkArgument(!error.isOk(), "unexpected OK status");
345       String errorMsg = error.getDescription() != null
346           && error.getDescription().equals(CLOSED_BY_SERVER)
347               ? "ADS stream closed with status {0}: {1}. Cause: {2}"
348               : "ADS stream failed with status {0}: {1}. Cause: {2}";
349       logger.log(
350           XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
351       closed = true;
352       xdsResponseHandler.handleStreamClosed(error);
353       cleanUp();
354 
355       logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
356     }
357 
close(Exception error)358     private void close(Exception error) {
359       if (closed) {
360         return;
361       }
362       closed = true;
363       cleanUp();
364       sendError(error);
365     }
366 
cleanUp()367     private void cleanUp() {
368       if (adsStream == this) {
369         adsStream = null;
370       }
371     }
372   }
373 
374   private final class AdsStreamV3 extends AbstractAdsStream {
375     private StreamObserver<DiscoveryRequest> requestWriter;
376 
377     @Override
isReady()378     public boolean isReady() {
379       return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
380     }
381 
382     @Override
start()383     void start() {
384       AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
385           AggregatedDiscoveryServiceGrpc.newStub(channel);
386       StreamObserver<DiscoveryResponse> responseReader =
387           new ClientResponseObserver<DiscoveryRequest,DiscoveryResponse>() {
388 
389         @Override
390         public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
391           requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
392         }
393 
394         @Override
395         public void onNext(final DiscoveryResponse response) {
396           syncContext.execute(new Runnable() {
397             @Override
398             public void run() {
399               XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
400               if (logger.isLoggable(XdsLogLevel.DEBUG)) {
401                 logger.log(
402                     XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
403                     MessagePrinter.print(response));
404               }
405               if (type == null) {
406                 logger.log(
407                     XdsLogLevel.WARNING,
408                     "Ignore an unknown type of DiscoveryResponse: {0}",
409                     response.getTypeUrl());
410                 return;
411               }
412               handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
413                   response.getNonce());
414             }
415           });
416         }
417 
418         @Override
419         public void onError(final Throwable t) {
420           syncContext.execute(new Runnable() {
421             @Override
422             public void run() {
423               handleRpcError(t);
424             }
425           });
426         }
427 
428         @Override
429         public void onCompleted() {
430           syncContext.execute(new Runnable() {
431             @Override
432             public void run() {
433               handleRpcCompleted();
434             }
435           });
436         }
437       };
438       requestWriter = stub.streamAggregatedResources(responseReader);
439     }
440 
441     @Override
sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo, Collection<String> resources, String nonce, @Nullable String errorDetail)442     void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
443                               Collection<String> resources, String nonce,
444                               @Nullable String errorDetail) {
445       checkState(requestWriter != null, "ADS stream has not been started");
446       DiscoveryRequest.Builder builder =
447           DiscoveryRequest.newBuilder()
448               .setVersionInfo(versionInfo)
449               .setNode(bootstrapNode.toEnvoyProtoNode())
450               .addAllResourceNames(resources)
451               .setTypeUrl(type.typeUrl())
452               .setResponseNonce(nonce);
453       if (errorDetail != null) {
454         com.google.rpc.Status error =
455             com.google.rpc.Status.newBuilder()
456                 .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
457                 .setMessage(errorDetail)
458                 .build();
459         builder.setErrorDetail(error);
460       }
461       DiscoveryRequest request = builder.build();
462       requestWriter.onNext(request);
463       if (logger.isLoggable(XdsLogLevel.DEBUG)) {
464         logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request));
465       }
466     }
467 
468     @Override
sendError(Exception error)469     void sendError(Exception error) {
470       requestWriter.onError(error);
471     }
472   }
473 }
474