1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.Stopwatch; 25 import com.google.common.base.Supplier; 26 import com.google.protobuf.util.Durations; 27 import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; 28 import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; 29 import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; 30 import io.grpc.Channel; 31 import io.grpc.Context; 32 import io.grpc.InternalLogId; 33 import io.grpc.Status; 34 import io.grpc.SynchronizationContext; 35 import io.grpc.SynchronizationContext.ScheduledHandle; 36 import io.grpc.internal.BackoffPolicy; 37 import io.grpc.stub.StreamObserver; 38 import io.grpc.xds.EnvoyProtoData.Node; 39 import io.grpc.xds.Stats.ClusterStats; 40 import io.grpc.xds.Stats.DroppedRequests; 41 import io.grpc.xds.Stats.UpstreamLocalityStats; 42 import io.grpc.xds.XdsLogger.XdsLogLevel; 43 import java.util.ArrayList; 44 import java.util.Collections; 45 import java.util.List; 46 import java.util.concurrent.ScheduledExecutorService; 47 import java.util.concurrent.TimeUnit; 48 import javax.annotation.Nullable; 49 50 /** 51 * Client of xDS load reporting service based on LRS protocol, which reports load stats of 52 * gRPC client's perspective to a management server. 53 */ 54 final class LoadReportClient { 55 private final InternalLogId logId; 56 private final XdsLogger logger; 57 private final Channel channel; 58 private final Context context; 59 private final Node node; 60 private final SynchronizationContext syncContext; 61 private final ScheduledExecutorService timerService; 62 private final Stopwatch retryStopwatch; 63 private final BackoffPolicy.Provider backoffPolicyProvider; 64 @VisibleForTesting 65 final LoadStatsManager2 loadStatsManager; 66 67 private boolean started; 68 @Nullable 69 private BackoffPolicy lrsRpcRetryPolicy; 70 @Nullable 71 private ScheduledHandle lrsRpcRetryTimer; 72 @Nullable 73 @VisibleForTesting 74 LrsStream lrsStream; 75 LoadReportClient( LoadStatsManager2 loadStatsManager, Channel channel, Context context, Node node, SynchronizationContext syncContext, ScheduledExecutorService scheduledExecutorService, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier)76 LoadReportClient( 77 LoadStatsManager2 loadStatsManager, 78 Channel channel, 79 Context context, 80 Node node, 81 SynchronizationContext syncContext, 82 ScheduledExecutorService scheduledExecutorService, 83 BackoffPolicy.Provider backoffPolicyProvider, 84 Supplier<Stopwatch> stopwatchSupplier) { 85 this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager"); 86 this.channel = checkNotNull(channel, "xdsChannel"); 87 this.context = checkNotNull(context, "context"); 88 this.syncContext = checkNotNull(syncContext, "syncContext"); 89 this.timerService = checkNotNull(scheduledExecutorService, "timeService"); 90 this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); 91 this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); 92 this.node = checkNotNull(node, "node").toBuilder() 93 .addClientFeatures("envoy.lrs.supports_send_all_clusters").build(); 94 logId = InternalLogId.allocate("lrs-client", null); 95 logger = XdsLogger.withLogId(logId); 96 logger.log(XdsLogLevel.INFO, "Created"); 97 } 98 99 /** 100 * Establishes load reporting communication and negotiates with traffic director to report load 101 * stats periodically. Calling this method on an already started {@link LoadReportClient} is 102 * no-op. 103 */ startLoadReporting()104 void startLoadReporting() { 105 syncContext.throwIfNotInThisSynchronizationContext(); 106 if (started) { 107 return; 108 } 109 started = true; 110 logger.log(XdsLogLevel.INFO, "Starting load reporting RPC"); 111 startLrsRpc(); 112 } 113 114 /** 115 * Terminates load reporting. Calling this method on an already stopped 116 * {@link LoadReportClient} is no-op. 117 */ stopLoadReporting()118 void stopLoadReporting() { 119 syncContext.throwIfNotInThisSynchronizationContext(); 120 if (!started) { 121 return; 122 } 123 started = false; 124 logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC"); 125 if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) { 126 lrsRpcRetryTimer.cancel(); 127 } 128 if (lrsStream != null) { 129 lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException()); 130 } 131 // Do not shutdown channel as it is not owned by LrsClient. 132 } 133 134 @VisibleForTesting 135 static class LoadReportingTask implements Runnable { 136 private final LrsStream stream; 137 LoadReportingTask(LrsStream stream)138 LoadReportingTask(LrsStream stream) { 139 this.stream = stream; 140 } 141 142 @Override run()143 public void run() { 144 stream.sendLoadReport(); 145 } 146 } 147 148 @VisibleForTesting 149 class LrsRpcRetryTask implements Runnable { 150 151 @Override run()152 public void run() { 153 startLrsRpc(); 154 } 155 } 156 startLrsRpc()157 private void startLrsRpc() { 158 if (!started) { 159 return; 160 } 161 checkState(lrsStream == null, "previous lbStream has not been cleared yet"); 162 lrsStream = new LrsStream(); 163 retryStopwatch.reset().start(); 164 Context prevContext = context.attach(); 165 try { 166 lrsStream.start(); 167 } finally { 168 context.detach(prevContext); 169 } 170 } 171 172 private final class LrsStream { 173 boolean initialResponseReceived; 174 boolean closed; 175 long intervalNano = -1; 176 boolean reportAllClusters; 177 List<String> clusterNames; // clusters to report loads for, if not report all. 178 ScheduledHandle loadReportTimer; 179 StreamObserver<LoadStatsRequest> lrsRequestWriterV3; 180 start()181 void start() { 182 StreamObserver<LoadStatsResponse> lrsResponseReaderV3 = 183 new StreamObserver<LoadStatsResponse>() { 184 @Override 185 public void onNext(final LoadStatsResponse response) { 186 syncContext.execute(new Runnable() { 187 @Override 188 public void run() { 189 logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); 190 handleRpcResponse(response.getClustersList(), response.getSendAllClusters(), 191 Durations.toNanos(response.getLoadReportingInterval())); 192 } 193 }); 194 } 195 196 @Override 197 public void onError(final Throwable t) { 198 syncContext.execute(new Runnable() { 199 @Override 200 public void run() { 201 handleRpcError(t); 202 } 203 }); 204 } 205 206 @Override 207 public void onCompleted() { 208 syncContext.execute(new Runnable() { 209 @Override 210 public void run() { 211 handleRpcCompleted(); 212 } 213 }); 214 } 215 }; 216 lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady() 217 .streamLoadStats(lrsResponseReaderV3); 218 logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request"); 219 sendLoadStatsRequest(Collections.<ClusterStats>emptyList()); 220 } 221 sendLoadStatsRequest(List<ClusterStats> clusterStatsList)222 void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) { 223 LoadStatsRequest.Builder requestBuilder = 224 LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode()); 225 for (ClusterStats stats : clusterStatsList) { 226 requestBuilder.addClusterStats(buildClusterStats(stats)); 227 } 228 LoadStatsRequest request = requestBuilder.build(); 229 lrsRequestWriterV3.onNext(request); 230 logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request); 231 } 232 sendError(Exception error)233 void sendError(Exception error) { 234 lrsRequestWriterV3.onError(error); 235 } 236 handleRpcResponse(List<String> clusters, boolean sendAllClusters, long loadReportIntervalNano)237 void handleRpcResponse(List<String> clusters, boolean sendAllClusters, 238 long loadReportIntervalNano) { 239 if (closed) { 240 return; 241 } 242 if (!initialResponseReceived) { 243 logger.log(XdsLogLevel.DEBUG, "Initial LRS response received"); 244 initialResponseReceived = true; 245 } 246 reportAllClusters = sendAllClusters; 247 if (reportAllClusters) { 248 logger.log(XdsLogLevel.INFO, "Report loads for all clusters"); 249 } else { 250 logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters); 251 clusterNames = clusters; 252 } 253 intervalNano = loadReportIntervalNano; 254 logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano); 255 scheduleNextLoadReport(); 256 } 257 handleRpcError(Throwable t)258 void handleRpcError(Throwable t) { 259 handleStreamClosed(Status.fromThrowable(t)); 260 } 261 handleRpcCompleted()262 void handleRpcCompleted() { 263 handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); 264 } 265 sendLoadReport()266 private void sendLoadReport() { 267 if (closed) { 268 return; 269 } 270 List<ClusterStats> clusterStatsList; 271 if (reportAllClusters) { 272 clusterStatsList = loadStatsManager.getAllClusterStatsReports(); 273 } else { 274 clusterStatsList = new ArrayList<>(); 275 for (String name : clusterNames) { 276 clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name)); 277 } 278 } 279 sendLoadStatsRequest(clusterStatsList); 280 scheduleNextLoadReport(); 281 } 282 scheduleNextLoadReport()283 private void scheduleNextLoadReport() { 284 // Cancel pending load report and reschedule with updated load reporting interval. 285 if (loadReportTimer != null && loadReportTimer.isPending()) { 286 loadReportTimer.cancel(); 287 loadReportTimer = null; 288 } 289 if (intervalNano > 0) { 290 loadReportTimer = syncContext.schedule( 291 new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService); 292 } 293 } 294 handleStreamClosed(Status status)295 private void handleStreamClosed(Status status) { 296 checkArgument(!status.isOk(), "unexpected OK status"); 297 if (closed) { 298 return; 299 } 300 logger.log( 301 XdsLogLevel.ERROR, 302 "LRS stream closed with status {0}: {1}. Cause: {2}", 303 status.getCode(), status.getDescription(), status.getCause()); 304 closed = true; 305 cleanUp(); 306 307 if (initialResponseReceived || lrsRpcRetryPolicy == null) { 308 // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence 309 // has never been initialized. 310 lrsRpcRetryPolicy = backoffPolicyProvider.get(); 311 } 312 // The back-off policy determines the interval between consecutive RPC upstarts, thus the 313 // actual delay may be smaller than the value from the back-off policy, or even negative, 314 // depending how much time was spent in the previous RPC. 315 long delayNanos = 316 lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS); 317 logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos); 318 if (delayNanos <= 0) { 319 startLrsRpc(); 320 } else { 321 lrsRpcRetryTimer = syncContext.schedule( 322 new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService); 323 } 324 } 325 close(Exception error)326 private void close(Exception error) { 327 if (closed) { 328 return; 329 } 330 closed = true; 331 cleanUp(); 332 sendError(error); 333 } 334 cleanUp()335 private void cleanUp() { 336 if (loadReportTimer != null && loadReportTimer.isPending()) { 337 loadReportTimer.cancel(); 338 loadReportTimer = null; 339 } 340 if (lrsStream == this) { 341 lrsStream = null; 342 } 343 } 344 buildClusterStats( ClusterStats stats)345 private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats( 346 ClusterStats stats) { 347 io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder = 348 io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder() 349 .setClusterName(stats.clusterName()); 350 if (stats.clusterServiceName() != null) { 351 builder.setClusterServiceName(stats.clusterServiceName()); 352 } 353 for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) { 354 builder.addUpstreamLocalityStats( 355 io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder() 356 .setLocality( 357 io.envoyproxy.envoy.config.core.v3.Locality.newBuilder() 358 .setRegion(upstreamLocalityStats.locality().region()) 359 .setZone(upstreamLocalityStats.locality().zone()) 360 .setSubZone(upstreamLocalityStats.locality().subZone())) 361 .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests()) 362 .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests()) 363 .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress()) 364 .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())); 365 } 366 for (DroppedRequests droppedRequests : stats.droppedRequestsList()) { 367 builder.addDroppedRequests( 368 io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder() 369 .setCategory(droppedRequests.category()) 370 .setDroppedCount(droppedRequests.droppedCount())); 371 } 372 return builder 373 .setTotalDroppedRequests(stats.totalDroppedRequests()) 374 .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano())) 375 .build(); 376 } 377 } 378 } 379