1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds.orca; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static io.grpc.ConnectivityState.IDLE; 22 import static io.grpc.ConnectivityState.READY; 23 24 import com.github.xds.data.orca.v3.OrcaLoadReport; 25 import com.github.xds.service.orca.v3.OpenRcaServiceGrpc; 26 import com.github.xds.service.orca.v3.OrcaLoadReportRequest; 27 import com.google.common.annotations.VisibleForTesting; 28 import com.google.common.base.MoreObjects; 29 import com.google.common.base.Objects; 30 import com.google.common.base.Stopwatch; 31 import com.google.common.base.Supplier; 32 import com.google.protobuf.util.Durations; 33 import io.grpc.Attributes; 34 import io.grpc.CallOptions; 35 import io.grpc.Channel; 36 import io.grpc.ChannelLogger; 37 import io.grpc.ChannelLogger.ChannelLogLevel; 38 import io.grpc.ClientCall; 39 import io.grpc.ConnectivityStateInfo; 40 import io.grpc.ExperimentalApi; 41 import io.grpc.LoadBalancer; 42 import io.grpc.LoadBalancer.CreateSubchannelArgs; 43 import io.grpc.LoadBalancer.Helper; 44 import io.grpc.LoadBalancer.Subchannel; 45 import io.grpc.LoadBalancer.SubchannelStateListener; 46 import io.grpc.Metadata; 47 import io.grpc.Status; 48 import io.grpc.Status.Code; 49 import io.grpc.SynchronizationContext; 50 import io.grpc.SynchronizationContext.ScheduledHandle; 51 import io.grpc.internal.BackoffPolicy; 52 import io.grpc.internal.ExponentialBackoffPolicy; 53 import io.grpc.internal.GrpcUtil; 54 import io.grpc.services.MetricReport; 55 import io.grpc.util.ForwardingLoadBalancerHelper; 56 import io.grpc.util.ForwardingSubchannel; 57 import java.util.HashMap; 58 import java.util.Map; 59 import java.util.concurrent.ScheduledExecutorService; 60 import java.util.concurrent.TimeUnit; 61 import java.util.logging.Level; 62 import java.util.logging.Logger; 63 import javax.annotation.Nullable; 64 65 /** 66 * Utility class that provides method for {@link LoadBalancer} to install listeners to receive 67 * out-of-band backend metrics in the format of Open Request Cost Aggregation (ORCA). 68 */ 69 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/9129") 70 public final class OrcaOobUtil { 71 private static final Logger logger = Logger.getLogger(OrcaPerRequestUtil.class.getName()); 72 OrcaOobUtil()73 private OrcaOobUtil() {} 74 75 /** 76 * Creates a new {@link io.grpc.LoadBalancer.Helper} with provided 77 * {@link OrcaOobReportListener} installed 78 * to receive callback when an out-of-band ORCA report is received. 79 * 80 * <p>Example usages: 81 * 82 * <ul> 83 * <li> Leaf policy (e.g., WRR policy) 84 * <pre> 85 * {@code 86 * class WrrLoadbalancer extends LoadBalancer { 87 * private final Helper originHelper; // the original Helper 88 * 89 * public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { 90 * // listener implements the logic for WRR's usage of backend metrics. 91 * OrcaReportingHelper orcaHelper = 92 * OrcaOobUtil.newOrcaReportingHelper(originHelper); 93 * Subchannel subchannel = 94 * orcaHelper.createSubchannel(CreateSubchannelArgs.newBuilder()...); 95 * OrcaOobUtil.setListener( 96 * subchannel, 97 * listener, 98 * OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build()); 99 * ... 100 * } 101 * } 102 * } 103 * </pre> 104 * </li> 105 * <li> Delegating policy doing per-child-policy aggregation 106 * <pre> 107 * {@code 108 * class XdsLoadBalancer extends LoadBalancer { 109 * private final Helper orcaHelper; // the original Helper 110 * 111 * public XdsLoadBalancer(LoadBalancer.Helper helper) { 112 * this.orcaHelper = OrcaUtil.newOrcaReportingHelper(helper); 113 * } 114 * private void createChildPolicy( 115 * Locality locality, LoadBalancerProvider childPolicyProvider) { 116 * // Each Locality has a child policy, and the parent does per-locality aggregation by 117 * // summing everything up. 118 * 119 * // Create an OrcaReportingHelperWrapper for each Locality. 120 * // listener implements the logic for locality-level backend metric aggregation. 121 * LoadBalancer childLb = childPolicyProvider.newLoadBalancer( 122 * new ForwardingLoadBalancerHelper() { 123 * public Subchannel createSubchannel(CreateSubchannelArgs args) { 124 * Subchannel subchannel = super.createSubchannel(args); 125 * OrcaOobUtil.setListener(subchannel, listener, 126 * OrcaReportingConfig.newBuilder().setReportInterval(30, SECOND).build()); 127 * return subchannel; 128 * } 129 * public LoadBalancer.Helper delegate() { 130 * return orcaHelper; 131 * } 132 * }); 133 * } 134 * } 135 * } 136 * </pre> 137 * </li> 138 * </ul> 139 * 140 * @param delegate the delegate helper that provides essentials for establishing subchannels to 141 * backends. 142 */ newOrcaReportingHelper(LoadBalancer.Helper delegate)143 public static LoadBalancer.Helper newOrcaReportingHelper(LoadBalancer.Helper delegate) { 144 return newOrcaReportingHelper( 145 delegate, 146 new ExponentialBackoffPolicy.Provider(), 147 GrpcUtil.STOPWATCH_SUPPLIER); 148 } 149 150 @VisibleForTesting newOrcaReportingHelper( LoadBalancer.Helper delegate, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier)151 static LoadBalancer.Helper newOrcaReportingHelper( 152 LoadBalancer.Helper delegate, 153 BackoffPolicy.Provider backoffPolicyProvider, 154 Supplier<Stopwatch> stopwatchSupplier) { 155 return new OrcaReportingHelper(delegate, backoffPolicyProvider, stopwatchSupplier); 156 } 157 158 /** 159 * The listener interface for receiving out-of-band ORCA reports from backends. The class that is 160 * interested in processing backend cost metrics implements this interface, and the object created 161 * with that class is registered with a component, using methods in {@link OrcaPerRequestUtil}. 162 * When an ORCA report is received, that object's {@code onLoadReport} method is invoked. 163 */ 164 public interface OrcaOobReportListener { 165 166 /** 167 * Invoked when an out-of-band ORCA report is received. 168 * 169 * <p>Note this callback will be invoked from the {@link SynchronizationContext} of the 170 * delegated helper, implementations should not block. 171 * 172 * @param report load report in the format of grpc {@link MetricReport}. 173 */ onLoadReport(MetricReport report)174 void onLoadReport(MetricReport report); 175 } 176 177 static final Attributes.Key<SubchannelImpl> ORCA_REPORTING_STATE_KEY = 178 Attributes.Key.create("internal-orca-reporting-state"); 179 180 /** 181 * Update {@link OrcaOobReportListener} to receive Out-of-Band metrics report for the 182 * particular subchannel connection, and set the configuration of receiving ORCA reports, 183 * such as the interval of receiving reports. Set listener to null to remove listener, and the 184 * config will have no effect. 185 * 186 * <p>This method needs to be called from the SynchronizationContext returned by the wrapped 187 * helper's {@link Helper#getSynchronizationContext()}. 188 * 189 * <p>Each load balancing policy must call this method to configure the backend load reporting. 190 * Otherwise, it will not receive ORCA reports. 191 * 192 * <p>If multiple load balancing policies configure reporting with different intervals, reports 193 * come with the minimum of those intervals. 194 * 195 * @param subchannel the server connected by this subchannel to receive the metrics. 196 * 197 * @param listener the callback upon receiving backend metrics from the Out-Of-Band stream. 198 * Setting to null to removes the listener from the subchannel. 199 * 200 * @param config the configuration to be set. It has no effect when listener is null. 201 * 202 */ setListener(Subchannel subchannel, OrcaOobReportListener listener, OrcaReportingConfig config)203 public static void setListener(Subchannel subchannel, OrcaOobReportListener listener, 204 OrcaReportingConfig config) { 205 SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY); 206 if (orcaSubchannel == null) { 207 throw new IllegalArgumentException("Subchannel does not have orca Out-Of-Band stream enabled." 208 + " Try to use a subchannel created by OrcaOobUtil.OrcaHelper."); 209 } 210 orcaSubchannel.orcaState.setListener(orcaSubchannel, listener, config); 211 } 212 213 /** 214 * An {@link OrcaReportingHelper} wraps a delegated {@link LoadBalancer.Helper} with additional 215 * functionality to manage RPCs for out-of-band ORCA reporting for each backend it establishes 216 * connection to. Subchannels created through it will retrieve ORCA load reports if the server 217 * supports it. 218 */ 219 static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper { 220 private final LoadBalancer.Helper delegate; 221 private final SynchronizationContext syncContext; 222 private final BackoffPolicy.Provider backoffPolicyProvider; 223 private final Supplier<Stopwatch> stopwatchSupplier; 224 OrcaReportingHelper( LoadBalancer.Helper delegate, BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier)225 OrcaReportingHelper( 226 LoadBalancer.Helper delegate, 227 BackoffPolicy.Provider backoffPolicyProvider, 228 Supplier<Stopwatch> stopwatchSupplier) { 229 this.delegate = checkNotNull(delegate, "delegate"); 230 this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); 231 this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); 232 syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext"); 233 } 234 235 @Override delegate()236 protected Helper delegate() { 237 return delegate; 238 } 239 240 @Override createSubchannel(CreateSubchannelArgs args)241 public Subchannel createSubchannel(CreateSubchannelArgs args) { 242 syncContext.throwIfNotInThisSynchronizationContext(); 243 Subchannel subchannel = super.createSubchannel(args); 244 SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY); 245 OrcaReportingState orcaState; 246 if (orcaSubchannel == null) { 247 // Only the first load balancing policy requesting ORCA reports instantiates an 248 // OrcaReportingState. 249 orcaState = new OrcaReportingState(syncContext, delegate().getScheduledExecutorService()); 250 } else { 251 orcaState = orcaSubchannel.orcaState; 252 } 253 return new SubchannelImpl(subchannel, orcaState); 254 } 255 256 /** 257 * An {@link OrcaReportingState} is a client of ORCA service running on a single backend. 258 * 259 * <p>All methods are run from {@code syncContext}. 260 */ 261 private final class OrcaReportingState implements SubchannelStateListener { 262 263 private final SynchronizationContext syncContext; 264 private final ScheduledExecutorService timeService; 265 private final Map<OrcaOobReportListener, OrcaReportingConfig> configs = new HashMap<>(); 266 @Nullable private Subchannel subchannel; 267 @Nullable private ChannelLogger subchannelLogger; 268 @Nullable 269 private SubchannelStateListener stateListener; 270 @Nullable private BackoffPolicy backoffPolicy; 271 @Nullable private OrcaReportingStream orcaRpc; 272 @Nullable private ScheduledHandle retryTimer; 273 @Nullable private OrcaReportingConfig overallConfig; 274 private final Runnable retryTask = 275 new Runnable() { 276 @Override 277 public void run() { 278 startRpc(); 279 } 280 }; 281 private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE); 282 // True if server returned UNIMPLEMENTED. 283 private boolean disabled; 284 private boolean started; 285 OrcaReportingState( SynchronizationContext syncContext, ScheduledExecutorService timeService)286 OrcaReportingState( 287 SynchronizationContext syncContext, 288 ScheduledExecutorService timeService) { 289 this.syncContext = checkNotNull(syncContext, "syncContext"); 290 this.timeService = checkNotNull(timeService, "timeService"); 291 } 292 init(Subchannel subchannel, SubchannelStateListener stateListener)293 void init(Subchannel subchannel, SubchannelStateListener stateListener) { 294 checkState(this.subchannel == null, "init() already called"); 295 this.subchannel = checkNotNull(subchannel, "subchannel"); 296 this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger"); 297 this.stateListener = checkNotNull(stateListener, "stateListener"); 298 started = true; 299 } 300 setListener(SubchannelImpl orcaSubchannel, OrcaOobReportListener listener, OrcaReportingConfig config)301 void setListener(SubchannelImpl orcaSubchannel, OrcaOobReportListener listener, 302 OrcaReportingConfig config) { 303 syncContext.execute(new Runnable() { 304 @Override 305 public void run() { 306 OrcaOobReportListener oldListener = orcaSubchannel.reportListener; 307 if (oldListener != null) { 308 configs.remove(oldListener); 309 } 310 if (listener != null) { 311 configs.put(listener, config); 312 } 313 orcaSubchannel.reportListener = listener; 314 setReportingConfig(config); 315 } 316 }); 317 } 318 setReportingConfig(OrcaReportingConfig config)319 private void setReportingConfig(OrcaReportingConfig config) { 320 boolean reconfigured = false; 321 // Real reporting interval is the minimum of intervals requested by all participating 322 // helpers. 323 if (configs.isEmpty()) { 324 overallConfig = null; 325 reconfigured = true; 326 } else if (overallConfig == null) { 327 overallConfig = config.toBuilder().build(); 328 reconfigured = true; 329 } else { 330 long minInterval = Long.MAX_VALUE; 331 for (OrcaReportingConfig c : configs.values()) { 332 if (c.getReportIntervalNanos() < minInterval) { 333 minInterval = c.getReportIntervalNanos(); 334 } 335 } 336 if (overallConfig.getReportIntervalNanos() != minInterval) { 337 overallConfig = overallConfig.toBuilder() 338 .setReportInterval(minInterval, TimeUnit.NANOSECONDS).build(); 339 reconfigured = true; 340 } 341 } 342 if (reconfigured) { 343 stopRpc("ORCA reporting reconfigured"); 344 adjustOrcaReporting(); 345 } 346 } 347 348 @Override onSubchannelState(ConnectivityStateInfo newState)349 public void onSubchannelState(ConnectivityStateInfo newState) { 350 if (Objects.equal(state.getState(), READY) && !Objects.equal(newState.getState(), READY)) { 351 // A connection was lost. We will reset disabled flag because ORCA service 352 // may be available on the new connection. 353 disabled = false; 354 } 355 state = newState; 356 adjustOrcaReporting(); 357 // Propagate subchannel state update to downstream listeners. 358 stateListener.onSubchannelState(newState); 359 } 360 adjustOrcaReporting()361 void adjustOrcaReporting() { 362 if (!disabled && overallConfig != null && Objects.equal(state.getState(), READY)) { 363 if (orcaRpc == null && !isRetryTimerPending()) { 364 startRpc(); 365 } 366 } else { 367 stopRpc("Client stops ORCA reporting"); 368 backoffPolicy = null; 369 } 370 } 371 startRpc()372 void startRpc() { 373 checkState(orcaRpc == null, "previous orca reporting RPC has not been cleaned up"); 374 checkState(subchannel != null, "init() not called"); 375 subchannelLogger.log( 376 ChannelLogLevel.DEBUG, "Starting ORCA reporting for {0}", subchannel.getAllAddresses()); 377 orcaRpc = new OrcaReportingStream(subchannel.asChannel(), stopwatchSupplier.get()); 378 orcaRpc.start(); 379 } 380 stopRpc(String msg)381 void stopRpc(String msg) { 382 if (orcaRpc != null) { 383 orcaRpc.cancel(msg); 384 orcaRpc = null; 385 } 386 if (retryTimer != null) { 387 retryTimer.cancel(); 388 retryTimer = null; 389 } 390 } 391 isRetryTimerPending()392 boolean isRetryTimerPending() { 393 return retryTimer != null && retryTimer.isPending(); 394 } 395 396 @Override toString()397 public String toString() { 398 return MoreObjects.toStringHelper(this) 399 .add("disabled", disabled) 400 .add("orcaRpc", orcaRpc) 401 .add("reportingConfig", overallConfig) 402 .add("connectivityState", state) 403 .toString(); 404 } 405 406 private class OrcaReportingStream extends ClientCall.Listener<OrcaLoadReport> { 407 408 private final ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call; 409 private final Stopwatch stopwatch; 410 private boolean callHasResponded; 411 OrcaReportingStream(Channel channel, Stopwatch stopwatch)412 OrcaReportingStream(Channel channel, Stopwatch stopwatch) { 413 call = 414 checkNotNull(channel, "channel") 415 .newCall(OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); 416 this.stopwatch = checkNotNull(stopwatch, "stopwatch"); 417 } 418 start()419 void start() { 420 stopwatch.reset().start(); 421 call.start(this, new Metadata()); 422 call.sendMessage( 423 OrcaLoadReportRequest.newBuilder() 424 .setReportInterval(Durations.fromNanos(overallConfig.getReportIntervalNanos())) 425 .build()); 426 call.halfClose(); 427 call.request(1); 428 } 429 430 @Override onMessage(final OrcaLoadReport response)431 public void onMessage(final OrcaLoadReport response) { 432 syncContext.execute( 433 new Runnable() { 434 @Override 435 public void run() { 436 if (orcaRpc == OrcaReportingStream.this) { 437 handleResponse(response); 438 } 439 } 440 }); 441 } 442 443 @Override onClose(final Status status, Metadata trailers)444 public void onClose(final Status status, Metadata trailers) { 445 syncContext.execute( 446 new Runnable() { 447 @Override 448 public void run() { 449 if (orcaRpc == OrcaReportingStream.this) { 450 orcaRpc = null; 451 handleStreamClosed(status); 452 } 453 } 454 }); 455 } 456 handleResponse(OrcaLoadReport response)457 void handleResponse(OrcaLoadReport response) { 458 callHasResponded = true; 459 backoffPolicy = null; 460 subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response); 461 MetricReport metricReport = OrcaPerRequestUtil.fromOrcaLoadReport(response); 462 for (OrcaOobReportListener listener : configs.keySet()) { 463 listener.onLoadReport(metricReport); 464 } 465 call.request(1); 466 } 467 handleStreamClosed(Status status)468 void handleStreamClosed(Status status) { 469 if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) { 470 disabled = true; 471 logger.log( 472 Level.SEVERE, 473 "Backend {0} OpenRcaService is disabled. Server returned: {1}", 474 new Object[] {subchannel.getAllAddresses(), status}); 475 subchannelLogger.log(ChannelLogLevel.ERROR, "OpenRcaService disabled: {0}", status); 476 return; 477 } 478 long delayNanos = 0; 479 // Backoff only when no response has been received. 480 if (!callHasResponded) { 481 if (backoffPolicy == null) { 482 backoffPolicy = backoffPolicyProvider.get(); 483 } 484 delayNanos = backoffPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS); 485 } 486 subchannelLogger.log( 487 ChannelLogLevel.DEBUG, 488 "ORCA reporting stream closed with {0}, backoff in {1} ns", 489 status, 490 delayNanos <= 0 ? 0 : delayNanos); 491 if (delayNanos <= 0) { 492 startRpc(); 493 } else { 494 checkState(!isRetryTimerPending(), "Retry double scheduled"); 495 retryTimer = 496 syncContext.schedule(retryTask, delayNanos, TimeUnit.NANOSECONDS, timeService); 497 } 498 } 499 cancel(String msg)500 void cancel(String msg) { 501 call.cancel(msg, null); 502 } 503 504 @Override toString()505 public String toString() { 506 return MoreObjects.toStringHelper(this) 507 .add("callStarted", call != null) 508 .add("callHasResponded", callHasResponded) 509 .toString(); 510 } 511 } 512 } 513 } 514 515 @VisibleForTesting 516 static final class SubchannelImpl extends ForwardingSubchannel { 517 private final Subchannel delegate; 518 private final OrcaReportingHelper.OrcaReportingState orcaState; 519 @Nullable private OrcaOobReportListener reportListener; 520 SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState)521 SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState) { 522 this.delegate = checkNotNull(delegate, "delegate"); 523 this.orcaState = checkNotNull(orcaState, "orcaState"); 524 } 525 526 @Override delegate()527 protected Subchannel delegate() { 528 return delegate; 529 } 530 531 @Override start(SubchannelStateListener listener)532 public void start(SubchannelStateListener listener) { 533 if (!orcaState.started) { 534 orcaState.init(this, listener); 535 super.start(orcaState); 536 } else { 537 super.start(listener); 538 } 539 } 540 541 @Override getAttributes()542 public Attributes getAttributes() { 543 return super.getAttributes().toBuilder().set(ORCA_REPORTING_STATE_KEY, this).build(); 544 } 545 } 546 547 /** Configuration for out-of-band ORCA reporting service RPC. */ 548 public static final class OrcaReportingConfig { 549 550 private final long reportIntervalNanos; 551 OrcaReportingConfig(long reportIntervalNanos)552 private OrcaReportingConfig(long reportIntervalNanos) { 553 this.reportIntervalNanos = reportIntervalNanos; 554 } 555 556 /** Creates a new builder. */ newBuilder()557 public static Builder newBuilder() { 558 return new Builder(); 559 } 560 561 /** Returns the configured maximum interval of receiving out-of-band ORCA reports. */ getReportIntervalNanos()562 public long getReportIntervalNanos() { 563 return reportIntervalNanos; 564 } 565 566 /** Returns a builder with the same initial values as this object. */ toBuilder()567 public Builder toBuilder() { 568 return newBuilder().setReportInterval(reportIntervalNanos, TimeUnit.NANOSECONDS); 569 } 570 571 @Override toString()572 public String toString() { 573 return MoreObjects.toStringHelper(this) 574 .add("reportIntervalNanos", reportIntervalNanos) 575 .toString(); 576 } 577 578 public static final class Builder { 579 580 private long reportIntervalNanos; 581 Builder()582 Builder() {} 583 584 /** 585 * Sets the maximum expected interval of receiving out-of-band ORCA report. The actual 586 * reporting interval might be smaller if there are other load balancing policies requesting 587 * for more frequent cost metric report. 588 * 589 * @param reportInterval the maximum expected interval of receiving periodical ORCA reports. 590 * @param unit time unit of {@code reportInterval} value. 591 */ setReportInterval(long reportInterval, TimeUnit unit)592 public Builder setReportInterval(long reportInterval, TimeUnit unit) { 593 reportIntervalNanos = unit.toNanos(reportInterval); 594 return this; 595 } 596 597 /** Creates a new {@link OrcaReportingConfig} object. */ build()598 public OrcaReportingConfig build() { 599 return new OrcaReportingConfig(reportIntervalNanos); 600 } 601 } 602 } 603 } 604