xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds.orca;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.github.xds.data.orca.v3.OrcaLoadReport;
22 import com.google.common.annotations.VisibleForTesting;
23 import io.grpc.CallOptions;
24 import io.grpc.ClientStreamTracer;
25 import io.grpc.ClientStreamTracer.StreamInfo;
26 import io.grpc.ExperimentalApi;
27 import io.grpc.LoadBalancer;
28 import io.grpc.Metadata;
29 import io.grpc.internal.ForwardingClientStreamTracer;
30 import io.grpc.protobuf.ProtoUtils;
31 import io.grpc.services.InternalCallMetricRecorder;
32 import io.grpc.services.MetricReport;
33 import java.util.ArrayList;
34 import java.util.List;
35 
36 /**
37  * Utility class that provides method for {@link LoadBalancer} to install listeners to receive
38  * per-request backend cost metrics in the format of Open Request Cost Aggregation (ORCA).
39  */
40 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/9128")
41 public abstract class OrcaPerRequestUtil {
42   private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER = new ClientStreamTracer() {};
43   private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY =
44       new ClientStreamTracer.Factory() {
45         @Override
46         public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
47           return NOOP_CLIENT_STREAM_TRACER;
48         }
49       };
50   private static final OrcaPerRequestUtil DEFAULT_INSTANCE =
51       new OrcaPerRequestUtil() {
52         @Override
53         public ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
54             OrcaPerRequestReportListener listener) {
55           return newOrcaClientStreamTracerFactory(NOOP_CLIENT_STREAM_TRACER_FACTORY, listener);
56         }
57 
58         @Override
59         public ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
60             ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) {
61           return new OrcaReportingTracerFactory(delegate, listener);
62         }
63       };
64 
65   /**
66    * Gets an {@code OrcaPerRequestUtil} instance that provides actual implementation of
67    * {@link #newOrcaClientStreamTracerFactory}.
68    */
getInstance()69   public static OrcaPerRequestUtil getInstance() {
70     return DEFAULT_INSTANCE;
71   }
72 
73   /**
74    * Creates a new {@link io.grpc.ClientStreamTracer.Factory} with provided {@link
75    * OrcaPerRequestReportListener} installed to receive callback when a per-request ORCA report is
76    * received.
77    *
78    * <p>Example usages for leaf level policy (e.g., WRR policy)
79    *
80    * <pre>
81    *   {@code
82    *   class WrrPicker extends SubchannelPicker {
83    *
84    *     public PickResult pickSubchannel(PickSubchannelArgs args) {
85    *       Subchannel subchannel = ...  // WRR picking logic
86    *       return PickResult.withSubchannel(
87    *           subchannel,
88    *           OrcaPerRequestReportUtil.getInstance().newOrcaClientStreamTracerFactory(listener));
89    *     }
90    *   }
91    *   }
92    * </pre>
93    *
94    * @param listener contains the callback to be invoked when a per-request ORCA report is received.
95    */
newOrcaClientStreamTracerFactory( OrcaPerRequestReportListener listener)96   public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
97       OrcaPerRequestReportListener listener);
98 
99   /**
100    * Creates a new {@link io.grpc.ClientStreamTracer.Factory} with provided {@link
101    * OrcaPerRequestReportListener} installed to receive callback when a per-request ORCA report is
102    * received.
103    *
104    * <p>Example usages:
105    *
106    * <ul>
107    *   <li> Delegating policy (e.g., xDS)
108    *     <pre>
109    *       {@code
110    *       class XdsPicker extends SubchannelPicker {
111    *
112    *         public PickResult pickSubchannel(PickSubchannelArgs args) {
113    *           SubchannelPicker perLocalityPicker = ...  // locality picking logic
114    *           Result result = perLocalityPicker.pickSubchannel(args);
115    *           return PickResult.withSubchannel(
116    *               result.getSubchannel(),
117    *               OrcaPerRequestReportUtil.getInstance().newOrcaClientTracerFactory(
118    *                   result.getStreamTracerFactory(), listener));
119    *
120    *         }
121    *       }
122    *       }
123    *     </pre>
124    *   </li>
125    *   <li> Delegating policy with additional tracing logic
126    *     <pre>
127    *       {@code
128    *       class WrappingPicker extends SubchannelPicker {
129    *
130    *         public PickResult pickSubchannel(PickSubchannelArgs args) {
131    *           Result result = delegate.pickSubchannel(args);
132    *           return PickResult.withSubchannel(
133    *               result.getSubchannel(),
134    *               new ClientStreamTracer.Factory() {
135    *                 public ClientStreamTracer newClientStreamTracer(
136    *                     StreamInfo info, Metadata metadata) {
137    *                   ClientStreamTracer.Factory orcaTracerFactory =
138    *                       OrcaPerRequestReportUtil.getInstance().newOrcaClientStreamTracerFactory(
139    *                           result.getStreamTracerFactory(), listener);
140    *
141    *                   // Wrap the tracer from the delegate factory if you need to trace the
142    *                   // stream for your own.
143    *                   final ClientStreamTracer orcaTracer =
144    *                       orcaTracerFactory.newClientStreamTracer(info, metadata);
145    *
146    *                   return ForwardingClientStreamTracer() {
147    *                     protected ClientStreamTracer delegate() {
148    *                       return orcaTracer;
149    *                     }
150    *
151    *                     public void inboundMessage(int seqNo) {
152    *                       // Handle this event.
153    *                       ...
154    *                     }
155    *                   };
156    *                 }
157    *               });
158    *         }
159    *       }
160    *       }
161    *     </pre>
162    *   </li>
163    * </ul>
164    *
165    * @param delegate the delegate factory to produce other client stream tracing.
166    * @param listener contains the callback to be invoked when a per-request ORCA report is received.
167    */
newOrcaClientStreamTracerFactory( ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener)168   public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
169       ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener);
170 
171   /**
172    * The listener interface for receiving per-request ORCA reports from backends. The class that is
173    * interested in processing backend cost metrics implements this interface, and the object created
174    * with that class is registered with a component, using methods in {@link OrcaPerRequestUtil}.
175    * When an ORCA report is received, that object's {@code onLoadReport} method is invoked.
176    */
177   public interface OrcaPerRequestReportListener {
178 
179     /**
180      * Invoked when a per-request ORCA report is received.
181      *
182      * <p>Note this callback will be invoked from the network thread as the RPC finishes,
183      * implementations should not block.
184      *
185      * @param report load report in the format of grpc {@link MetricReport}.
186      */
onLoadReport(MetricReport report)187     void onLoadReport(MetricReport report);
188   }
189 
190   /**
191    * An {@link OrcaReportingTracerFactory} wraps a delegated {@link ClientStreamTracer.Factory} with
192    * additional functionality to produce {@link ClientStreamTracer} instances that extract
193    * per-request ORCA reports and push to registered listeners for calls they trace.
194    */
195   @VisibleForTesting
196   static final class OrcaReportingTracerFactory extends
197       ClientStreamTracer.Factory {
198 
199     @VisibleForTesting
200     static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY =
201         Metadata.Key.of(
202             "endpoint-load-metrics-bin",
203             ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance()));
204 
205     private static final CallOptions.Key<OrcaReportBroker> ORCA_REPORT_BROKER_KEY =
206         CallOptions.Key.create("internal-orca-report-broker");
207     private final ClientStreamTracer.Factory delegate;
208     private final OrcaPerRequestReportListener listener;
209 
OrcaReportingTracerFactory( ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener)210     OrcaReportingTracerFactory(
211         ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) {
212       this.delegate = checkNotNull(delegate, "delegate");
213       this.listener = checkNotNull(listener, "listener");
214     }
215 
216     @Override
newClientStreamTracer(StreamInfo info, Metadata headers)217     public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
218       OrcaReportBroker broker = info.getCallOptions().getOption(ORCA_REPORT_BROKER_KEY);
219       boolean augmented = false;
220       if (broker == null) {
221         broker = new OrcaReportBroker();
222         info =
223             info.toBuilder()
224                 .setCallOptions(info.getCallOptions().withOption(ORCA_REPORT_BROKER_KEY, broker))
225                 .build();
226         augmented = true;
227       }
228       broker.addListener(listener);
229       ClientStreamTracer tracer = delegate.newClientStreamTracer(info, headers);
230       if (augmented) {
231         final ClientStreamTracer currTracer = tracer;
232         final OrcaReportBroker currBroker = broker;
233         // The actual tracer that performs ORCA report deserialization.
234         tracer =
235             new ForwardingClientStreamTracer() {
236               @Override
237               protected ClientStreamTracer delegate() {
238                 return currTracer;
239               }
240 
241               @Override
242               public void inboundTrailers(Metadata trailers) {
243                 OrcaLoadReport report = trailers.get(ORCA_ENDPOINT_LOAD_METRICS_KEY);
244                 if (report != null) {
245                   currBroker.onReport(report);
246                 }
247                 delegate().inboundTrailers(trailers);
248               }
249             };
250       }
251       return tracer;
252     }
253   }
254 
fromOrcaLoadReport(OrcaLoadReport loadReport)255   static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) {
256     return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(),
257         loadReport.getApplicationUtilization(), loadReport.getMemUtilization(),
258         loadReport.getRpsFractional(), loadReport.getEps(), loadReport.getRequestCostMap(),
259         loadReport.getUtilizationMap());
260   }
261 
262   /**
263    * A container class to hold registered {@link OrcaPerRequestReportListener}s and invoke all of
264    * them when an {@link OrcaLoadReport} is received.
265    */
266   private static final class OrcaReportBroker {
267 
268     private final List<OrcaPerRequestReportListener> listeners = new ArrayList<>();
269 
addListener(OrcaPerRequestReportListener listener)270     void addListener(OrcaPerRequestReportListener listener) {
271       listeners.add(listener);
272     }
273 
onReport(OrcaLoadReport report)274     void onReport(OrcaLoadReport report) {
275       MetricReport metricReport = fromOrcaLoadReport(report);
276       for (OrcaPerRequestReportListener listener : listeners) {
277         listener.onLoadReport(metricReport);
278       }
279     }
280   }
281 }
282