xref: /aosp_15_r20/external/grpc-grpc-java/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2021 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.xds;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.base.Stopwatch;
23 import com.google.common.base.Supplier;
24 import com.google.common.collect.Sets;
25 import io.grpc.Status;
26 import io.grpc.xds.Stats.ClusterStats;
27 import io.grpc.xds.Stats.DroppedRequests;
28 import io.grpc.xds.Stats.UpstreamLocalityStats;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40 import javax.annotation.Nullable;
41 import javax.annotation.concurrent.ThreadSafe;
42 
43 /**
44  * Manages client side traffic stats. Drop stats are maintained in cluster (with edsServiceName)
45  * granularity and load stats (request counts) are maintained in locality granularity.
46  */
47 @ThreadSafe
48 final class LoadStatsManager2 {
49   // Recorders for drops of each cluster:edsServiceName.
50   private final Map<String, Map<String, ReferenceCounted<ClusterDropStats>>> allDropStats =
51       new HashMap<>();
52   // Recorders for loads of each cluster:edsServiceName:locality.
53   private final Map<String, Map<String,
54       Map<Locality, ReferenceCounted<ClusterLocalityStats>>>> allLoadStats = new HashMap<>();
55   private final Supplier<Stopwatch> stopwatchSupplier;
56 
LoadStatsManager2(Supplier<Stopwatch> stopwatchSupplier)57   LoadStatsManager2(Supplier<Stopwatch> stopwatchSupplier) {
58     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
59   }
60 
61   /**
62    * Gets or creates the stats object for recording drops for the specified cluster with
63    * edsServiceName. The returned object is reference counted and the caller should use {@link
64    * ClusterDropStats#release()} to release its <i>hard</i> reference when it is safe to discard
65    * future stats for the cluster.
66    */
getClusterDropStats( String cluster, @Nullable String edsServiceName)67   synchronized ClusterDropStats getClusterDropStats(
68       String cluster, @Nullable String edsServiceName) {
69     if (!allDropStats.containsKey(cluster)) {
70       allDropStats.put(cluster, new HashMap<String, ReferenceCounted<ClusterDropStats>>());
71     }
72     Map<String, ReferenceCounted<ClusterDropStats>> perClusterCounters = allDropStats.get(cluster);
73     if (!perClusterCounters.containsKey(edsServiceName)) {
74       perClusterCounters.put(
75           edsServiceName,
76           ReferenceCounted.wrap(new ClusterDropStats(
77               cluster, edsServiceName, stopwatchSupplier.get())));
78     }
79     ReferenceCounted<ClusterDropStats> ref = perClusterCounters.get(edsServiceName);
80     ref.retain();
81     return ref.get();
82   }
83 
releaseClusterDropCounter( String cluster, @Nullable String edsServiceName)84   private synchronized void releaseClusterDropCounter(
85       String cluster, @Nullable String edsServiceName) {
86     checkState(allDropStats.containsKey(cluster)
87             && allDropStats.get(cluster).containsKey(edsServiceName),
88         "stats for cluster %s, edsServiceName %s not exits", cluster, edsServiceName);
89     ReferenceCounted<ClusterDropStats> ref = allDropStats.get(cluster).get(edsServiceName);
90     ref.release();
91   }
92 
93   /**
94    * Gets or creates the stats object for recording loads for the specified locality (in the
95    * specified cluster with edsServiceName). The returned object is reference counted and the
96    * caller should use {@link ClusterLocalityStats#release} to release its <i>hard</i> reference
97    * when it is safe to discard the future stats for the locality.
98    */
getClusterLocalityStats( String cluster, @Nullable String edsServiceName, Locality locality)99   synchronized ClusterLocalityStats getClusterLocalityStats(
100       String cluster, @Nullable String edsServiceName, Locality locality) {
101     if (!allLoadStats.containsKey(cluster)) {
102       allLoadStats.put(
103           cluster,
104           new HashMap<String, Map<Locality, ReferenceCounted<ClusterLocalityStats>>>());
105     }
106     Map<String, Map<Locality, ReferenceCounted<ClusterLocalityStats>>> perClusterCounters =
107         allLoadStats.get(cluster);
108     if (!perClusterCounters.containsKey(edsServiceName)) {
109       perClusterCounters.put(
110           edsServiceName, new HashMap<Locality, ReferenceCounted<ClusterLocalityStats>>());
111     }
112     Map<Locality, ReferenceCounted<ClusterLocalityStats>> localityStats =
113         perClusterCounters.get(edsServiceName);
114     if (!localityStats.containsKey(locality)) {
115       localityStats.put(
116           locality,
117           ReferenceCounted.wrap(new ClusterLocalityStats(
118               cluster, edsServiceName, locality, stopwatchSupplier.get())));
119     }
120     ReferenceCounted<ClusterLocalityStats> ref = localityStats.get(locality);
121     ref.retain();
122     return ref.get();
123   }
124 
releaseClusterLocalityLoadCounter( String cluster, @Nullable String edsServiceName, Locality locality)125   private synchronized void releaseClusterLocalityLoadCounter(
126       String cluster, @Nullable String edsServiceName, Locality locality) {
127     checkState(allLoadStats.containsKey(cluster)
128             && allLoadStats.get(cluster).containsKey(edsServiceName)
129             && allLoadStats.get(cluster).get(edsServiceName).containsKey(locality),
130         "stats for cluster %s, edsServiceName %s, locality %s not exits",
131         cluster, edsServiceName, locality);
132     ReferenceCounted<ClusterLocalityStats> ref =
133         allLoadStats.get(cluster).get(edsServiceName).get(locality);
134     ref.release();
135   }
136 
137   /**
138    * Gets the traffic stats (drops and loads) as a list of {@link ClusterStats} recorded for the
139    * specified cluster since the previous call of this method or {@link
140    * #getAllClusterStatsReports}. A {@link ClusterStats} includes stats for a specific cluster with
141    * edsServiceName.
142    */
getClusterStatsReports(String cluster)143   synchronized List<ClusterStats> getClusterStatsReports(String cluster) {
144     if (!allDropStats.containsKey(cluster) && !allLoadStats.containsKey(cluster)) {
145       return Collections.emptyList();
146     }
147     Map<String, ReferenceCounted<ClusterDropStats>> clusterDropStats = allDropStats.get(cluster);
148     Map<String, Map<Locality, ReferenceCounted<ClusterLocalityStats>>> clusterLoadStats =
149         allLoadStats.get(cluster);
150     Map<String, ClusterStats.Builder> statsReportBuilders = new HashMap<>();
151     // Populate drop stats.
152     if (clusterDropStats != null) {
153       Set<String> toDiscard = new HashSet<>();
154       for (String edsServiceName : clusterDropStats.keySet()) {
155         ClusterStats.Builder builder = ClusterStats.newBuilder().clusterName(cluster);
156         if (edsServiceName != null) {
157           builder.clusterServiceName(edsServiceName);
158         }
159         ReferenceCounted<ClusterDropStats> ref = clusterDropStats.get(edsServiceName);
160         if (ref.getReferenceCount() == 0) {  // stats object no longer needed after snapshot
161           toDiscard.add(edsServiceName);
162         }
163         ClusterDropStatsSnapshot dropStatsSnapshot = ref.get().snapshot();
164         long totalCategorizedDrops = 0L;
165         for (Map.Entry<String, Long> entry : dropStatsSnapshot.categorizedDrops.entrySet()) {
166           builder.addDroppedRequests(DroppedRequests.create(entry.getKey(), entry.getValue()));
167           totalCategorizedDrops += entry.getValue();
168         }
169         builder.totalDroppedRequests(
170             totalCategorizedDrops + dropStatsSnapshot.uncategorizedDrops);
171         builder.loadReportIntervalNano(dropStatsSnapshot.durationNano);
172         statsReportBuilders.put(edsServiceName, builder);
173       }
174       clusterDropStats.keySet().removeAll(toDiscard);
175     }
176     // Populate load stats for all localities in the cluster.
177     if (clusterLoadStats != null) {
178       Set<String> toDiscard = new HashSet<>();
179       for (String edsServiceName : clusterLoadStats.keySet()) {
180         ClusterStats.Builder builder = statsReportBuilders.get(edsServiceName);
181         if (builder == null) {
182           builder = ClusterStats.newBuilder().clusterName(cluster);
183           if (edsServiceName != null) {
184             builder.clusterServiceName(edsServiceName);
185           }
186           statsReportBuilders.put(edsServiceName, builder);
187         }
188         Map<Locality, ReferenceCounted<ClusterLocalityStats>> localityStats =
189             clusterLoadStats.get(edsServiceName);
190         Set<Locality> localitiesToDiscard = new HashSet<>();
191         for (Locality locality : localityStats.keySet()) {
192           ReferenceCounted<ClusterLocalityStats> ref = localityStats.get(locality);
193           ClusterLocalityStatsSnapshot snapshot = ref.get().snapshot();
194           // Only discard stats object after all in-flight calls under recording had finished.
195           if (ref.getReferenceCount() == 0 && snapshot.callsInProgress == 0) {
196             localitiesToDiscard.add(locality);
197           }
198           UpstreamLocalityStats upstreamLocalityStats = UpstreamLocalityStats.create(
199               locality, snapshot.callsIssued, snapshot.callsSucceeded, snapshot.callsFailed,
200               snapshot.callsInProgress);
201           builder.addUpstreamLocalityStats(upstreamLocalityStats);
202           // Use the max (drops/loads) recording interval as the overall interval for the
203           // cluster's stats. In general, they should be mostly identical.
204           builder.loadReportIntervalNano(
205               Math.max(builder.loadReportIntervalNano(), snapshot.durationNano));
206         }
207         localityStats.keySet().removeAll(localitiesToDiscard);
208         if (localityStats.isEmpty()) {
209           toDiscard.add(edsServiceName);
210         }
211       }
212       clusterLoadStats.keySet().removeAll(toDiscard);
213     }
214     List<ClusterStats> res = new ArrayList<>();
215     for (ClusterStats.Builder builder : statsReportBuilders.values()) {
216       res.add(builder.build());
217     }
218     return Collections.unmodifiableList(res);
219   }
220 
221   /**
222    * Gets the traffic stats (drops and loads) as a list of {@link ClusterStats} recorded for all
223    * clusters since the previous call of this method or {@link #getClusterStatsReports} for each
224    * specific cluster. A {@link ClusterStats} includes stats for a specific cluster with
225    * edsServiceName.
226    */
getAllClusterStatsReports()227   synchronized List<ClusterStats> getAllClusterStatsReports() {
228     Set<String> allClusters = Sets.union(allDropStats.keySet(), allLoadStats.keySet());
229     List<ClusterStats> res = new ArrayList<>();
230     for (String cluster : allClusters) {
231       res.addAll(getClusterStatsReports(cluster));
232     }
233     return Collections.unmodifiableList(res);
234   }
235 
236   /**
237    * Recorder for dropped requests. One instance per cluster with edsServiceName.
238    */
239   @ThreadSafe
240   final class ClusterDropStats {
241     private final String clusterName;
242     @Nullable
243     private final String edsServiceName;
244     private final AtomicLong uncategorizedDrops = new AtomicLong();
245     private final ConcurrentMap<String, AtomicLong> categorizedDrops = new ConcurrentHashMap<>();
246     private final Stopwatch stopwatch;
247 
ClusterDropStats( String clusterName, @Nullable String edsServiceName, Stopwatch stopwatch)248     private ClusterDropStats(
249         String clusterName, @Nullable String edsServiceName, Stopwatch stopwatch) {
250       this.clusterName = checkNotNull(clusterName, "clusterName");
251       this.edsServiceName = edsServiceName;
252       this.stopwatch = checkNotNull(stopwatch, "stopwatch");
253       stopwatch.reset().start();
254     }
255 
256     /**
257      * Records a dropped request with the specified category.
258      */
recordDroppedRequest(String category)259     void recordDroppedRequest(String category) {
260       // There is a race between this method and snapshot(), causing one drop recorded but may not
261       // be included in any snapshot. This is acceptable and the race window is extremely small.
262       AtomicLong counter = categorizedDrops.putIfAbsent(category, new AtomicLong(1L));
263       if (counter != null) {
264         counter.getAndIncrement();
265       }
266     }
267 
268     /**
269      * Records a dropped request without category.
270      */
recordDroppedRequest()271     void recordDroppedRequest() {
272       uncategorizedDrops.getAndIncrement();
273     }
274 
275     /**
276      * Release the <i>hard</i> reference for this stats object (previously obtained via {@link
277      * LoadStatsManager2#getClusterDropStats}). The object may still be recording
278      * drops after this method, but there is no guarantee drops recorded after this point will
279      * be included in load reports.
280      */
release()281     void release() {
282       LoadStatsManager2.this.releaseClusterDropCounter(clusterName, edsServiceName);
283     }
284 
snapshot()285     private ClusterDropStatsSnapshot snapshot() {
286       Map<String, Long> drops = new HashMap<>();
287       for (Map.Entry<String, AtomicLong> entry : categorizedDrops.entrySet()) {
288         drops.put(entry.getKey(), entry.getValue().get());
289       }
290       categorizedDrops.clear();
291       long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS);
292       stopwatch.reset().start();
293       return new ClusterDropStatsSnapshot(drops, uncategorizedDrops.getAndSet(0), duration);
294     }
295   }
296 
297   private static final class ClusterDropStatsSnapshot {
298     private final Map<String, Long> categorizedDrops;
299     private final long uncategorizedDrops;
300     private final long durationNano;
301 
ClusterDropStatsSnapshot( Map<String, Long> categorizedDrops, long uncategorizedDrops, long durationNano)302     private ClusterDropStatsSnapshot(
303         Map<String, Long> categorizedDrops, long uncategorizedDrops, long durationNano) {
304       this.categorizedDrops = Collections.unmodifiableMap(
305           checkNotNull(categorizedDrops, "categorizedDrops"));
306       this.uncategorizedDrops = uncategorizedDrops;
307       this.durationNano = durationNano;
308     }
309   }
310 
311   /**
312    * Recorder for client loads. One instance per locality (in cluster with edsService).
313    */
314   @ThreadSafe
315   final class ClusterLocalityStats {
316     private final String clusterName;
317     @Nullable
318     private final String edsServiceName;
319     private final Locality locality;
320     private final Stopwatch stopwatch;
321     private final AtomicLong callsInProgress = new AtomicLong();
322     private final AtomicLong callsSucceeded = new AtomicLong();
323     private final AtomicLong callsFailed = new AtomicLong();
324     private final AtomicLong callsIssued = new AtomicLong();
325 
ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, Stopwatch stopwatch)326     private ClusterLocalityStats(
327         String clusterName, @Nullable String edsServiceName, Locality locality,
328         Stopwatch stopwatch) {
329       this.clusterName = checkNotNull(clusterName, "clusterName");
330       this.edsServiceName = edsServiceName;
331       this.locality = checkNotNull(locality, "locality");
332       this.stopwatch = checkNotNull(stopwatch, "stopwatch");
333       stopwatch.reset().start();
334     }
335 
336     /**
337      * Records a request being issued.
338      */
recordCallStarted()339     void recordCallStarted() {
340       callsIssued.getAndIncrement();
341       callsInProgress.getAndIncrement();
342     }
343 
344     /**
345      * Records a request finished with the given status.
346      */
recordCallFinished(Status status)347     void recordCallFinished(Status status) {
348       callsInProgress.getAndDecrement();
349       if (status.isOk()) {
350         callsSucceeded.getAndIncrement();
351       } else {
352         callsFailed.getAndIncrement();
353       }
354     }
355 
356     /**
357      * Release the <i>hard</i> reference for this stats object (previously obtained via {@link
358      * LoadStatsManager2#getClusterLocalityStats}). The object may still be
359      * recording loads after this method, but there is no guarantee loads recorded after this
360      * point will be included in load reports.
361      */
release()362     void release() {
363       LoadStatsManager2.this.releaseClusterLocalityLoadCounter(
364           clusterName, edsServiceName, locality);
365     }
366 
snapshot()367     private ClusterLocalityStatsSnapshot snapshot() {
368       long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS);
369       stopwatch.reset().start();
370       return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(),
371           callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration);
372     }
373   }
374 
375   private static final class ClusterLocalityStatsSnapshot {
376     private final long callsSucceeded;
377     private final long callsInProgress;
378     private final long callsFailed;
379     private final long callsIssued;
380     private final long durationNano;
381 
ClusterLocalityStatsSnapshot( long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued, long durationNano)382     private ClusterLocalityStatsSnapshot(
383         long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued,
384         long durationNano) {
385       this.callsSucceeded = callsSucceeded;
386       this.callsInProgress = callsInProgress;
387       this.callsFailed = callsFailed;
388       this.callsIssued = callsIssued;
389       this.durationNano = durationNano;
390     }
391   }
392 }
393