1 /* 2 * Copyright 2021 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.base.Stopwatch; 23 import com.google.common.base.Supplier; 24 import com.google.common.collect.Sets; 25 import io.grpc.Status; 26 import io.grpc.xds.Stats.ClusterStats; 27 import io.grpc.xds.Stats.DroppedRequests; 28 import io.grpc.xds.Stats.UpstreamLocalityStats; 29 import java.util.ArrayList; 30 import java.util.Collections; 31 import java.util.HashMap; 32 import java.util.HashSet; 33 import java.util.List; 34 import java.util.Map; 35 import java.util.Set; 36 import java.util.concurrent.ConcurrentHashMap; 37 import java.util.concurrent.ConcurrentMap; 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.atomic.AtomicLong; 40 import javax.annotation.Nullable; 41 import javax.annotation.concurrent.ThreadSafe; 42 43 /** 44 * Manages client side traffic stats. Drop stats are maintained in cluster (with edsServiceName) 45 * granularity and load stats (request counts) are maintained in locality granularity. 46 */ 47 @ThreadSafe 48 final class LoadStatsManager2 { 49 // Recorders for drops of each cluster:edsServiceName. 50 private final Map<String, Map<String, ReferenceCounted<ClusterDropStats>>> allDropStats = 51 new HashMap<>(); 52 // Recorders for loads of each cluster:edsServiceName:locality. 53 private final Map<String, Map<String, 54 Map<Locality, ReferenceCounted<ClusterLocalityStats>>>> allLoadStats = new HashMap<>(); 55 private final Supplier<Stopwatch> stopwatchSupplier; 56 LoadStatsManager2(Supplier<Stopwatch> stopwatchSupplier)57 LoadStatsManager2(Supplier<Stopwatch> stopwatchSupplier) { 58 this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); 59 } 60 61 /** 62 * Gets or creates the stats object for recording drops for the specified cluster with 63 * edsServiceName. The returned object is reference counted and the caller should use {@link 64 * ClusterDropStats#release()} to release its <i>hard</i> reference when it is safe to discard 65 * future stats for the cluster. 66 */ getClusterDropStats( String cluster, @Nullable String edsServiceName)67 synchronized ClusterDropStats getClusterDropStats( 68 String cluster, @Nullable String edsServiceName) { 69 if (!allDropStats.containsKey(cluster)) { 70 allDropStats.put(cluster, new HashMap<String, ReferenceCounted<ClusterDropStats>>()); 71 } 72 Map<String, ReferenceCounted<ClusterDropStats>> perClusterCounters = allDropStats.get(cluster); 73 if (!perClusterCounters.containsKey(edsServiceName)) { 74 perClusterCounters.put( 75 edsServiceName, 76 ReferenceCounted.wrap(new ClusterDropStats( 77 cluster, edsServiceName, stopwatchSupplier.get()))); 78 } 79 ReferenceCounted<ClusterDropStats> ref = perClusterCounters.get(edsServiceName); 80 ref.retain(); 81 return ref.get(); 82 } 83 releaseClusterDropCounter( String cluster, @Nullable String edsServiceName)84 private synchronized void releaseClusterDropCounter( 85 String cluster, @Nullable String edsServiceName) { 86 checkState(allDropStats.containsKey(cluster) 87 && allDropStats.get(cluster).containsKey(edsServiceName), 88 "stats for cluster %s, edsServiceName %s not exits", cluster, edsServiceName); 89 ReferenceCounted<ClusterDropStats> ref = allDropStats.get(cluster).get(edsServiceName); 90 ref.release(); 91 } 92 93 /** 94 * Gets or creates the stats object for recording loads for the specified locality (in the 95 * specified cluster with edsServiceName). The returned object is reference counted and the 96 * caller should use {@link ClusterLocalityStats#release} to release its <i>hard</i> reference 97 * when it is safe to discard the future stats for the locality. 98 */ getClusterLocalityStats( String cluster, @Nullable String edsServiceName, Locality locality)99 synchronized ClusterLocalityStats getClusterLocalityStats( 100 String cluster, @Nullable String edsServiceName, Locality locality) { 101 if (!allLoadStats.containsKey(cluster)) { 102 allLoadStats.put( 103 cluster, 104 new HashMap<String, Map<Locality, ReferenceCounted<ClusterLocalityStats>>>()); 105 } 106 Map<String, Map<Locality, ReferenceCounted<ClusterLocalityStats>>> perClusterCounters = 107 allLoadStats.get(cluster); 108 if (!perClusterCounters.containsKey(edsServiceName)) { 109 perClusterCounters.put( 110 edsServiceName, new HashMap<Locality, ReferenceCounted<ClusterLocalityStats>>()); 111 } 112 Map<Locality, ReferenceCounted<ClusterLocalityStats>> localityStats = 113 perClusterCounters.get(edsServiceName); 114 if (!localityStats.containsKey(locality)) { 115 localityStats.put( 116 locality, 117 ReferenceCounted.wrap(new ClusterLocalityStats( 118 cluster, edsServiceName, locality, stopwatchSupplier.get()))); 119 } 120 ReferenceCounted<ClusterLocalityStats> ref = localityStats.get(locality); 121 ref.retain(); 122 return ref.get(); 123 } 124 releaseClusterLocalityLoadCounter( String cluster, @Nullable String edsServiceName, Locality locality)125 private synchronized void releaseClusterLocalityLoadCounter( 126 String cluster, @Nullable String edsServiceName, Locality locality) { 127 checkState(allLoadStats.containsKey(cluster) 128 && allLoadStats.get(cluster).containsKey(edsServiceName) 129 && allLoadStats.get(cluster).get(edsServiceName).containsKey(locality), 130 "stats for cluster %s, edsServiceName %s, locality %s not exits", 131 cluster, edsServiceName, locality); 132 ReferenceCounted<ClusterLocalityStats> ref = 133 allLoadStats.get(cluster).get(edsServiceName).get(locality); 134 ref.release(); 135 } 136 137 /** 138 * Gets the traffic stats (drops and loads) as a list of {@link ClusterStats} recorded for the 139 * specified cluster since the previous call of this method or {@link 140 * #getAllClusterStatsReports}. A {@link ClusterStats} includes stats for a specific cluster with 141 * edsServiceName. 142 */ getClusterStatsReports(String cluster)143 synchronized List<ClusterStats> getClusterStatsReports(String cluster) { 144 if (!allDropStats.containsKey(cluster) && !allLoadStats.containsKey(cluster)) { 145 return Collections.emptyList(); 146 } 147 Map<String, ReferenceCounted<ClusterDropStats>> clusterDropStats = allDropStats.get(cluster); 148 Map<String, Map<Locality, ReferenceCounted<ClusterLocalityStats>>> clusterLoadStats = 149 allLoadStats.get(cluster); 150 Map<String, ClusterStats.Builder> statsReportBuilders = new HashMap<>(); 151 // Populate drop stats. 152 if (clusterDropStats != null) { 153 Set<String> toDiscard = new HashSet<>(); 154 for (String edsServiceName : clusterDropStats.keySet()) { 155 ClusterStats.Builder builder = ClusterStats.newBuilder().clusterName(cluster); 156 if (edsServiceName != null) { 157 builder.clusterServiceName(edsServiceName); 158 } 159 ReferenceCounted<ClusterDropStats> ref = clusterDropStats.get(edsServiceName); 160 if (ref.getReferenceCount() == 0) { // stats object no longer needed after snapshot 161 toDiscard.add(edsServiceName); 162 } 163 ClusterDropStatsSnapshot dropStatsSnapshot = ref.get().snapshot(); 164 long totalCategorizedDrops = 0L; 165 for (Map.Entry<String, Long> entry : dropStatsSnapshot.categorizedDrops.entrySet()) { 166 builder.addDroppedRequests(DroppedRequests.create(entry.getKey(), entry.getValue())); 167 totalCategorizedDrops += entry.getValue(); 168 } 169 builder.totalDroppedRequests( 170 totalCategorizedDrops + dropStatsSnapshot.uncategorizedDrops); 171 builder.loadReportIntervalNano(dropStatsSnapshot.durationNano); 172 statsReportBuilders.put(edsServiceName, builder); 173 } 174 clusterDropStats.keySet().removeAll(toDiscard); 175 } 176 // Populate load stats for all localities in the cluster. 177 if (clusterLoadStats != null) { 178 Set<String> toDiscard = new HashSet<>(); 179 for (String edsServiceName : clusterLoadStats.keySet()) { 180 ClusterStats.Builder builder = statsReportBuilders.get(edsServiceName); 181 if (builder == null) { 182 builder = ClusterStats.newBuilder().clusterName(cluster); 183 if (edsServiceName != null) { 184 builder.clusterServiceName(edsServiceName); 185 } 186 statsReportBuilders.put(edsServiceName, builder); 187 } 188 Map<Locality, ReferenceCounted<ClusterLocalityStats>> localityStats = 189 clusterLoadStats.get(edsServiceName); 190 Set<Locality> localitiesToDiscard = new HashSet<>(); 191 for (Locality locality : localityStats.keySet()) { 192 ReferenceCounted<ClusterLocalityStats> ref = localityStats.get(locality); 193 ClusterLocalityStatsSnapshot snapshot = ref.get().snapshot(); 194 // Only discard stats object after all in-flight calls under recording had finished. 195 if (ref.getReferenceCount() == 0 && snapshot.callsInProgress == 0) { 196 localitiesToDiscard.add(locality); 197 } 198 UpstreamLocalityStats upstreamLocalityStats = UpstreamLocalityStats.create( 199 locality, snapshot.callsIssued, snapshot.callsSucceeded, snapshot.callsFailed, 200 snapshot.callsInProgress); 201 builder.addUpstreamLocalityStats(upstreamLocalityStats); 202 // Use the max (drops/loads) recording interval as the overall interval for the 203 // cluster's stats. In general, they should be mostly identical. 204 builder.loadReportIntervalNano( 205 Math.max(builder.loadReportIntervalNano(), snapshot.durationNano)); 206 } 207 localityStats.keySet().removeAll(localitiesToDiscard); 208 if (localityStats.isEmpty()) { 209 toDiscard.add(edsServiceName); 210 } 211 } 212 clusterLoadStats.keySet().removeAll(toDiscard); 213 } 214 List<ClusterStats> res = new ArrayList<>(); 215 for (ClusterStats.Builder builder : statsReportBuilders.values()) { 216 res.add(builder.build()); 217 } 218 return Collections.unmodifiableList(res); 219 } 220 221 /** 222 * Gets the traffic stats (drops and loads) as a list of {@link ClusterStats} recorded for all 223 * clusters since the previous call of this method or {@link #getClusterStatsReports} for each 224 * specific cluster. A {@link ClusterStats} includes stats for a specific cluster with 225 * edsServiceName. 226 */ getAllClusterStatsReports()227 synchronized List<ClusterStats> getAllClusterStatsReports() { 228 Set<String> allClusters = Sets.union(allDropStats.keySet(), allLoadStats.keySet()); 229 List<ClusterStats> res = new ArrayList<>(); 230 for (String cluster : allClusters) { 231 res.addAll(getClusterStatsReports(cluster)); 232 } 233 return Collections.unmodifiableList(res); 234 } 235 236 /** 237 * Recorder for dropped requests. One instance per cluster with edsServiceName. 238 */ 239 @ThreadSafe 240 final class ClusterDropStats { 241 private final String clusterName; 242 @Nullable 243 private final String edsServiceName; 244 private final AtomicLong uncategorizedDrops = new AtomicLong(); 245 private final ConcurrentMap<String, AtomicLong> categorizedDrops = new ConcurrentHashMap<>(); 246 private final Stopwatch stopwatch; 247 ClusterDropStats( String clusterName, @Nullable String edsServiceName, Stopwatch stopwatch)248 private ClusterDropStats( 249 String clusterName, @Nullable String edsServiceName, Stopwatch stopwatch) { 250 this.clusterName = checkNotNull(clusterName, "clusterName"); 251 this.edsServiceName = edsServiceName; 252 this.stopwatch = checkNotNull(stopwatch, "stopwatch"); 253 stopwatch.reset().start(); 254 } 255 256 /** 257 * Records a dropped request with the specified category. 258 */ recordDroppedRequest(String category)259 void recordDroppedRequest(String category) { 260 // There is a race between this method and snapshot(), causing one drop recorded but may not 261 // be included in any snapshot. This is acceptable and the race window is extremely small. 262 AtomicLong counter = categorizedDrops.putIfAbsent(category, new AtomicLong(1L)); 263 if (counter != null) { 264 counter.getAndIncrement(); 265 } 266 } 267 268 /** 269 * Records a dropped request without category. 270 */ recordDroppedRequest()271 void recordDroppedRequest() { 272 uncategorizedDrops.getAndIncrement(); 273 } 274 275 /** 276 * Release the <i>hard</i> reference for this stats object (previously obtained via {@link 277 * LoadStatsManager2#getClusterDropStats}). The object may still be recording 278 * drops after this method, but there is no guarantee drops recorded after this point will 279 * be included in load reports. 280 */ release()281 void release() { 282 LoadStatsManager2.this.releaseClusterDropCounter(clusterName, edsServiceName); 283 } 284 snapshot()285 private ClusterDropStatsSnapshot snapshot() { 286 Map<String, Long> drops = new HashMap<>(); 287 for (Map.Entry<String, AtomicLong> entry : categorizedDrops.entrySet()) { 288 drops.put(entry.getKey(), entry.getValue().get()); 289 } 290 categorizedDrops.clear(); 291 long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS); 292 stopwatch.reset().start(); 293 return new ClusterDropStatsSnapshot(drops, uncategorizedDrops.getAndSet(0), duration); 294 } 295 } 296 297 private static final class ClusterDropStatsSnapshot { 298 private final Map<String, Long> categorizedDrops; 299 private final long uncategorizedDrops; 300 private final long durationNano; 301 ClusterDropStatsSnapshot( Map<String, Long> categorizedDrops, long uncategorizedDrops, long durationNano)302 private ClusterDropStatsSnapshot( 303 Map<String, Long> categorizedDrops, long uncategorizedDrops, long durationNano) { 304 this.categorizedDrops = Collections.unmodifiableMap( 305 checkNotNull(categorizedDrops, "categorizedDrops")); 306 this.uncategorizedDrops = uncategorizedDrops; 307 this.durationNano = durationNano; 308 } 309 } 310 311 /** 312 * Recorder for client loads. One instance per locality (in cluster with edsService). 313 */ 314 @ThreadSafe 315 final class ClusterLocalityStats { 316 private final String clusterName; 317 @Nullable 318 private final String edsServiceName; 319 private final Locality locality; 320 private final Stopwatch stopwatch; 321 private final AtomicLong callsInProgress = new AtomicLong(); 322 private final AtomicLong callsSucceeded = new AtomicLong(); 323 private final AtomicLong callsFailed = new AtomicLong(); 324 private final AtomicLong callsIssued = new AtomicLong(); 325 ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, Stopwatch stopwatch)326 private ClusterLocalityStats( 327 String clusterName, @Nullable String edsServiceName, Locality locality, 328 Stopwatch stopwatch) { 329 this.clusterName = checkNotNull(clusterName, "clusterName"); 330 this.edsServiceName = edsServiceName; 331 this.locality = checkNotNull(locality, "locality"); 332 this.stopwatch = checkNotNull(stopwatch, "stopwatch"); 333 stopwatch.reset().start(); 334 } 335 336 /** 337 * Records a request being issued. 338 */ recordCallStarted()339 void recordCallStarted() { 340 callsIssued.getAndIncrement(); 341 callsInProgress.getAndIncrement(); 342 } 343 344 /** 345 * Records a request finished with the given status. 346 */ recordCallFinished(Status status)347 void recordCallFinished(Status status) { 348 callsInProgress.getAndDecrement(); 349 if (status.isOk()) { 350 callsSucceeded.getAndIncrement(); 351 } else { 352 callsFailed.getAndIncrement(); 353 } 354 } 355 356 /** 357 * Release the <i>hard</i> reference for this stats object (previously obtained via {@link 358 * LoadStatsManager2#getClusterLocalityStats}). The object may still be 359 * recording loads after this method, but there is no guarantee loads recorded after this 360 * point will be included in load reports. 361 */ release()362 void release() { 363 LoadStatsManager2.this.releaseClusterLocalityLoadCounter( 364 clusterName, edsServiceName, locality); 365 } 366 snapshot()367 private ClusterLocalityStatsSnapshot snapshot() { 368 long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS); 369 stopwatch.reset().start(); 370 return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(), 371 callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration); 372 } 373 } 374 375 private static final class ClusterLocalityStatsSnapshot { 376 private final long callsSucceeded; 377 private final long callsInProgress; 378 private final long callsFailed; 379 private final long callsIssued; 380 private final long durationNano; 381 ClusterLocalityStatsSnapshot( long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued, long durationNano)382 private ClusterLocalityStatsSnapshot( 383 long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued, 384 long durationNano) { 385 this.callsSucceeded = callsSucceeded; 386 this.callsInProgress = callsInProgress; 387 this.callsFailed = callsFailed; 388 this.callsIssued = callsIssued; 389 this.durationNano = durationNano; 390 } 391 } 392 } 393