xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/main/java/io/grpc/xds/LoadReportClient.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.base.Supplier;
26 import com.google.protobuf.util.Durations;
27 import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
28 import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
29 import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
30 import io.grpc.Channel;
31 import io.grpc.Context;
32 import io.grpc.InternalLogId;
33 import io.grpc.Status;
34 import io.grpc.SynchronizationContext;
35 import io.grpc.SynchronizationContext.ScheduledHandle;
36 import io.grpc.internal.BackoffPolicy;
37 import io.grpc.stub.StreamObserver;
38 import io.grpc.xds.EnvoyProtoData.Node;
39 import io.grpc.xds.Stats.ClusterStats;
40 import io.grpc.xds.Stats.DroppedRequests;
41 import io.grpc.xds.Stats.UpstreamLocalityStats;
42 import io.grpc.xds.XdsLogger.XdsLogLevel;
43 import java.util.ArrayList;
44 import java.util.Collections;
45 import java.util.List;
46 import java.util.concurrent.ScheduledExecutorService;
47 import java.util.concurrent.TimeUnit;
48 import javax.annotation.Nullable;
49 
50 /**
51  * Client of xDS load reporting service based on LRS protocol, which reports load stats of
52  * gRPC client's perspective to a management server.
53  */
54 final class LoadReportClient {
55   private final InternalLogId logId;
56   private final XdsLogger logger;
57   private final Channel channel;
58   private final Context context;
59   private final Node node;
60   private final SynchronizationContext syncContext;
61   private final ScheduledExecutorService timerService;
62   private final Stopwatch retryStopwatch;
63   private final BackoffPolicy.Provider backoffPolicyProvider;
64   @VisibleForTesting
65   final LoadStatsManager2 loadStatsManager;
66 
67   private boolean started;
68   @Nullable
69   private BackoffPolicy lrsRpcRetryPolicy;
70   @Nullable
71   private ScheduledHandle lrsRpcRetryTimer;
72   @Nullable
73   @VisibleForTesting
74   LrsStream lrsStream;
75 
LoadReportClient( LoadStatsManager2 loadStatsManager, Channel channel, Context context, Node node, SynchronizationContext syncContext, ScheduledExecutorService scheduledExecutorService, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier)76   LoadReportClient(
77       LoadStatsManager2 loadStatsManager,
78       Channel channel,
79       Context context,
80       Node node,
81       SynchronizationContext syncContext,
82       ScheduledExecutorService scheduledExecutorService,
83       BackoffPolicy.Provider backoffPolicyProvider,
84       Supplier<Stopwatch> stopwatchSupplier) {
85     this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
86     this.channel = checkNotNull(channel, "xdsChannel");
87     this.context = checkNotNull(context, "context");
88     this.syncContext = checkNotNull(syncContext, "syncContext");
89     this.timerService = checkNotNull(scheduledExecutorService, "timeService");
90     this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
91     this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
92     this.node = checkNotNull(node, "node").toBuilder()
93         .addClientFeatures("envoy.lrs.supports_send_all_clusters").build();
94     logId = InternalLogId.allocate("lrs-client", null);
95     logger = XdsLogger.withLogId(logId);
96     logger.log(XdsLogLevel.INFO, "Created");
97   }
98 
99   /**
100    * Establishes load reporting communication and negotiates with traffic director to report load
101    * stats periodically. Calling this method on an already started {@link LoadReportClient} is
102    * no-op.
103    */
startLoadReporting()104   void startLoadReporting() {
105     syncContext.throwIfNotInThisSynchronizationContext();
106     if (started) {
107       return;
108     }
109     started = true;
110     logger.log(XdsLogLevel.INFO, "Starting load reporting RPC");
111     startLrsRpc();
112   }
113 
114   /**
115    * Terminates load reporting. Calling this method on an already stopped
116    * {@link LoadReportClient} is no-op.
117    */
stopLoadReporting()118   void stopLoadReporting() {
119     syncContext.throwIfNotInThisSynchronizationContext();
120     if (!started) {
121       return;
122     }
123     started = false;
124     logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC");
125     if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
126       lrsRpcRetryTimer.cancel();
127     }
128     if (lrsStream != null) {
129       lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
130     }
131     // Do not shutdown channel as it is not owned by LrsClient.
132   }
133 
134   @VisibleForTesting
135   static class LoadReportingTask implements Runnable {
136     private final LrsStream stream;
137 
LoadReportingTask(LrsStream stream)138     LoadReportingTask(LrsStream stream) {
139       this.stream = stream;
140     }
141 
142     @Override
run()143     public void run() {
144       stream.sendLoadReport();
145     }
146   }
147 
148   @VisibleForTesting
149   class LrsRpcRetryTask implements Runnable {
150 
151     @Override
run()152     public void run() {
153       startLrsRpc();
154     }
155   }
156 
startLrsRpc()157   private void startLrsRpc() {
158     if (!started) {
159       return;
160     }
161     checkState(lrsStream == null, "previous lbStream has not been cleared yet");
162     lrsStream = new LrsStream();
163     retryStopwatch.reset().start();
164     Context prevContext = context.attach();
165     try {
166       lrsStream.start();
167     } finally {
168       context.detach(prevContext);
169     }
170   }
171 
172   private final class LrsStream {
173     boolean initialResponseReceived;
174     boolean closed;
175     long intervalNano = -1;
176     boolean reportAllClusters;
177     List<String> clusterNames;  // clusters to report loads for, if not report all.
178     ScheduledHandle loadReportTimer;
179     StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
180 
start()181     void start() {
182       StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
183           new StreamObserver<LoadStatsResponse>() {
184             @Override
185             public void onNext(final LoadStatsResponse response) {
186               syncContext.execute(new Runnable() {
187                 @Override
188                 public void run() {
189                   logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
190                   handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
191                       Durations.toNanos(response.getLoadReportingInterval()));
192                 }
193               });
194             }
195 
196             @Override
197             public void onError(final Throwable t) {
198               syncContext.execute(new Runnable() {
199                 @Override
200                 public void run() {
201                   handleRpcError(t);
202                 }
203               });
204             }
205 
206             @Override
207             public void onCompleted() {
208               syncContext.execute(new Runnable() {
209                 @Override
210                 public void run() {
211                   handleRpcCompleted();
212                 }
213               });
214             }
215           };
216       lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
217           .streamLoadStats(lrsResponseReaderV3);
218       logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
219       sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
220     }
221 
sendLoadStatsRequest(List<ClusterStats> clusterStatsList)222     void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
223       LoadStatsRequest.Builder requestBuilder =
224           LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
225       for (ClusterStats stats : clusterStatsList) {
226         requestBuilder.addClusterStats(buildClusterStats(stats));
227       }
228       LoadStatsRequest request = requestBuilder.build();
229       lrsRequestWriterV3.onNext(request);
230       logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
231     }
232 
sendError(Exception error)233     void sendError(Exception error) {
234       lrsRequestWriterV3.onError(error);
235     }
236 
handleRpcResponse(List<String> clusters, boolean sendAllClusters, long loadReportIntervalNano)237     void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
238                            long loadReportIntervalNano) {
239       if (closed) {
240         return;
241       }
242       if (!initialResponseReceived) {
243         logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
244         initialResponseReceived = true;
245       }
246       reportAllClusters = sendAllClusters;
247       if (reportAllClusters) {
248         logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
249       } else {
250         logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters);
251         clusterNames = clusters;
252       }
253       intervalNano = loadReportIntervalNano;
254       logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano);
255       scheduleNextLoadReport();
256     }
257 
handleRpcError(Throwable t)258     void handleRpcError(Throwable t) {
259       handleStreamClosed(Status.fromThrowable(t));
260     }
261 
handleRpcCompleted()262     void handleRpcCompleted() {
263       handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
264     }
265 
sendLoadReport()266     private void sendLoadReport() {
267       if (closed) {
268         return;
269       }
270       List<ClusterStats> clusterStatsList;
271       if (reportAllClusters) {
272         clusterStatsList = loadStatsManager.getAllClusterStatsReports();
273       } else {
274         clusterStatsList = new ArrayList<>();
275         for (String name : clusterNames) {
276           clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name));
277         }
278       }
279       sendLoadStatsRequest(clusterStatsList);
280       scheduleNextLoadReport();
281     }
282 
scheduleNextLoadReport()283     private void scheduleNextLoadReport() {
284       // Cancel pending load report and reschedule with updated load reporting interval.
285       if (loadReportTimer != null && loadReportTimer.isPending()) {
286         loadReportTimer.cancel();
287         loadReportTimer = null;
288       }
289       if (intervalNano > 0) {
290         loadReportTimer = syncContext.schedule(
291             new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService);
292       }
293     }
294 
handleStreamClosed(Status status)295     private void handleStreamClosed(Status status) {
296       checkArgument(!status.isOk(), "unexpected OK status");
297       if (closed) {
298         return;
299       }
300       logger.log(
301           XdsLogLevel.ERROR,
302           "LRS stream closed with status {0}: {1}. Cause: {2}",
303           status.getCode(), status.getDescription(), status.getCause());
304       closed = true;
305       cleanUp();
306 
307       if (initialResponseReceived || lrsRpcRetryPolicy == null) {
308         // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
309         // has never been initialized.
310         lrsRpcRetryPolicy = backoffPolicyProvider.get();
311       }
312       // The back-off policy determines the interval between consecutive RPC upstarts, thus the
313       // actual delay may be smaller than the value from the back-off policy, or even negative,
314       // depending how much time was spent in the previous RPC.
315       long delayNanos =
316           lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
317       logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
318       if (delayNanos <= 0) {
319         startLrsRpc();
320       } else {
321         lrsRpcRetryTimer = syncContext.schedule(
322             new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService);
323       }
324     }
325 
close(Exception error)326     private void close(Exception error) {
327       if (closed) {
328         return;
329       }
330       closed = true;
331       cleanUp();
332       sendError(error);
333     }
334 
cleanUp()335     private void cleanUp() {
336       if (loadReportTimer != null && loadReportTimer.isPending()) {
337         loadReportTimer.cancel();
338         loadReportTimer = null;
339       }
340       if (lrsStream == this) {
341         lrsStream = null;
342       }
343     }
344 
buildClusterStats( ClusterStats stats)345     private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
346         ClusterStats stats) {
347       io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder =
348           io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
349               .setClusterName(stats.clusterName());
350       if (stats.clusterServiceName() != null) {
351         builder.setClusterServiceName(stats.clusterServiceName());
352       }
353       for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
354         builder.addUpstreamLocalityStats(
355             io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
356                 .setLocality(
357                     io.envoyproxy.envoy.config.core.v3.Locality.newBuilder()
358                         .setRegion(upstreamLocalityStats.locality().region())
359                         .setZone(upstreamLocalityStats.locality().zone())
360                         .setSubZone(upstreamLocalityStats.locality().subZone()))
361             .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
362             .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
363             .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
364             .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()));
365       }
366       for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
367         builder.addDroppedRequests(
368             io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
369                 .setCategory(droppedRequests.category())
370                 .setDroppedCount(droppedRequests.droppedCount()));
371       }
372       return builder
373           .setTotalDroppedRequests(stats.totalDroppedRequests())
374           .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano()))
375           .build();
376     }
377   }
378 }
379