xref: /aosp_15_r20/external/grpc-grpc-java/rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.rls;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.MoreObjects;
25 import io.grpc.ChannelLogger.ChannelLogLevel;
26 import io.grpc.ConnectivityState;
27 import io.grpc.LoadBalancer;
28 import io.grpc.LoadBalancer.Helper;
29 import io.grpc.LoadBalancer.Subchannel;
30 import io.grpc.LoadBalancer.SubchannelPicker;
31 import io.grpc.LoadBalancerProvider;
32 import io.grpc.LoadBalancerRegistry;
33 import io.grpc.NameResolver.ConfigOrError;
34 import io.grpc.internal.ObjectPool;
35 import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
36 import io.grpc.rls.RlsProtoData.RouteLookupConfig;
37 import io.grpc.util.ForwardingLoadBalancerHelper;
38 import java.util.ArrayList;
39 import java.util.Collections;
40 import java.util.HashMap;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Objects;
44 import java.util.concurrent.atomic.AtomicLong;
45 import javax.annotation.Nullable;
46 
47 /** Configuration for RLS load balancing policy. */
48 final class LbPolicyConfiguration {
49 
50   private final RouteLookupConfig routeLookupConfig;
51   @Nullable
52   private final Map<String, ?> routeLookupChannelServiceConfig;
53   private final ChildLoadBalancingPolicy policy;
54 
LbPolicyConfiguration( RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig, ChildLoadBalancingPolicy policy)55   LbPolicyConfiguration(
56       RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig,
57       ChildLoadBalancingPolicy policy) {
58     this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig");
59     this.routeLookupChannelServiceConfig = routeLookupChannelServiceConfig;
60     this.policy = checkNotNull(policy, "policy");
61   }
62 
getRouteLookupConfig()63   RouteLookupConfig getRouteLookupConfig() {
64     return routeLookupConfig;
65   }
66 
67   @Nullable
getRouteLookupChannelServiceConfig()68   Map<String, ?> getRouteLookupChannelServiceConfig() {
69     return routeLookupChannelServiceConfig;
70   }
71 
getLoadBalancingPolicy()72   ChildLoadBalancingPolicy getLoadBalancingPolicy() {
73     return policy;
74   }
75 
76   @Override
equals(Object o)77   public boolean equals(Object o) {
78     if (this == o) {
79       return true;
80     }
81     if (o == null || getClass() != o.getClass()) {
82       return false;
83     }
84     LbPolicyConfiguration that = (LbPolicyConfiguration) o;
85     return Objects.equals(routeLookupConfig, that.routeLookupConfig)
86         && Objects.equals(routeLookupChannelServiceConfig, that.routeLookupChannelServiceConfig)
87         && Objects.equals(policy, that.policy);
88   }
89 
90   @Override
hashCode()91   public int hashCode() {
92     return Objects.hash(routeLookupConfig, routeLookupChannelServiceConfig, policy);
93   }
94 
95   @Override
toString()96   public String toString() {
97     return MoreObjects.toStringHelper(this)
98         .add("routeLookupConfig", routeLookupConfig)
99         .add("routeLookupChannelServiceConfig", routeLookupChannelServiceConfig)
100         .add("policy", policy)
101         .toString();
102   }
103 
104   /** ChildLoadBalancingPolicy is an elected child policy to delegate requests. */
105   static final class ChildLoadBalancingPolicy {
106 
107     private final Map<String, Object> effectiveRawChildPolicy;
108     private final LoadBalancerProvider effectiveLbProvider;
109     private final String targetFieldName;
110 
111     @VisibleForTesting
ChildLoadBalancingPolicy( String targetFieldName, Map<String, Object> effectiveRawChildPolicy, LoadBalancerProvider effectiveLbProvider)112     ChildLoadBalancingPolicy(
113         String targetFieldName,
114         Map<String, Object> effectiveRawChildPolicy,
115         LoadBalancerProvider effectiveLbProvider) {
116       checkArgument(
117           targetFieldName != null && !targetFieldName.isEmpty(),
118           "targetFieldName cannot be empty or null");
119       this.targetFieldName = targetFieldName;
120       this.effectiveRawChildPolicy =
121           checkNotNull(effectiveRawChildPolicy, "effectiveRawChildPolicy");
122       this.effectiveLbProvider = checkNotNull(effectiveLbProvider, "effectiveLbProvider");
123     }
124 
125     /** Creates ChildLoadBalancingPolicy. */
126     @SuppressWarnings("unchecked")
create( String childPolicyConfigTargetFieldName, List<Map<String, ?>> childPolicies)127     static ChildLoadBalancingPolicy create(
128         String childPolicyConfigTargetFieldName, List<Map<String, ?>> childPolicies)
129         throws InvalidChildPolicyConfigException {
130       Map<String, Object> effectiveChildPolicy = null;
131       LoadBalancerProvider effectiveLbProvider = null;
132       List<String> policyTried = new ArrayList<>();
133 
134       LoadBalancerRegistry lbRegistry = LoadBalancerRegistry.getDefaultRegistry();
135       for (Map<String, ?> childPolicy : childPolicies) {
136         if (childPolicy.isEmpty()) {
137           continue;
138         }
139         if (childPolicy.size() != 1) {
140           throw
141               new InvalidChildPolicyConfigException(
142                   "childPolicy should have exactly one loadbalancing policy");
143         }
144         String policyName = childPolicy.keySet().iterator().next();
145         LoadBalancerProvider provider = lbRegistry.getProvider(policyName);
146         if (provider != null) {
147           effectiveLbProvider = provider;
148           effectiveChildPolicy = Collections.unmodifiableMap(childPolicy);
149           break;
150         }
151         policyTried.add(policyName);
152       }
153       if (effectiveChildPolicy == null) {
154         throw
155             new InvalidChildPolicyConfigException(
156                 String.format("no valid childPolicy found, policy tried: %s", policyTried));
157       }
158       return
159           new ChildLoadBalancingPolicy(
160               childPolicyConfigTargetFieldName,
161               (Map<String, Object>) effectiveChildPolicy.values().iterator().next(),
162               effectiveLbProvider);
163     }
164 
165     /** Creates a child load balancer config for given target from elected raw child policy. */
getEffectiveChildPolicy(String target)166     Map<String, ?> getEffectiveChildPolicy(String target) {
167       Map<String, Object> childPolicy = new HashMap<>(effectiveRawChildPolicy);
168       childPolicy.put(targetFieldName, target);
169       return childPolicy;
170     }
171 
172     /** Returns the elected child {@link LoadBalancerProvider}. */
getEffectiveLbProvider()173     LoadBalancerProvider getEffectiveLbProvider() {
174       return effectiveLbProvider;
175     }
176 
177     @Override
equals(Object o)178     public boolean equals(Object o) {
179       if (this == o) {
180         return true;
181       }
182       if (o == null || getClass() != o.getClass()) {
183         return false;
184       }
185       ChildLoadBalancingPolicy that = (ChildLoadBalancingPolicy) o;
186       return Objects.equals(effectiveRawChildPolicy, that.effectiveRawChildPolicy)
187           && Objects.equals(effectiveLbProvider, that.effectiveLbProvider)
188           && Objects.equals(targetFieldName, that.targetFieldName);
189     }
190 
191     @Override
hashCode()192     public int hashCode() {
193       return Objects.hash(effectiveRawChildPolicy, effectiveLbProvider, targetFieldName);
194     }
195 
196     @Override
toString()197     public String toString() {
198       return MoreObjects.toStringHelper(this)
199           .add("effectiveRawChildPolicy", effectiveRawChildPolicy)
200           .add("effectiveLbProvider", effectiveLbProvider)
201           .add("childPolicyConfigTargetFieldName", targetFieldName)
202           .toString();
203     }
204   }
205 
206   /** Factory for {@link ChildPolicyWrapper}. */
207   static final class RefCountedChildPolicyWrapperFactory {
208     // GuardedBy CachingRlsLbClient.lock
209     @VisibleForTesting
210     final Map<String /* target */, RefCountedChildPolicyWrapper> childPolicyMap =
211         new HashMap<>();
212 
213     private final ChildLoadBalancerHelperProvider childLbHelperProvider;
214     private final ChildLbStatusListener childLbStatusListener;
215     private final ChildLoadBalancingPolicy childPolicy;
216     private final ResolvedAddressFactory childLbResolvedAddressFactory;
217 
RefCountedChildPolicyWrapperFactory( ChildLoadBalancingPolicy childPolicy, ResolvedAddressFactory childLbResolvedAddressFactory, ChildLoadBalancerHelperProvider childLbHelperProvider, ChildLbStatusListener childLbStatusListener)218     public RefCountedChildPolicyWrapperFactory(
219         ChildLoadBalancingPolicy childPolicy,
220         ResolvedAddressFactory childLbResolvedAddressFactory,
221         ChildLoadBalancerHelperProvider childLbHelperProvider,
222         ChildLbStatusListener childLbStatusListener) {
223       this.childPolicy = checkNotNull(childPolicy, "childPolicy");
224       this.childLbResolvedAddressFactory =
225           checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory");
226       this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
227       this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
228     }
229 
230     // GuardedBy CachingRlsLbClient.lock
createOrGet(String target)231     ChildPolicyWrapper createOrGet(String target) {
232       // TODO(creamsoup) check if the target is valid or not
233       RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
234       if (pooledChildPolicyWrapper == null) {
235         ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper(
236             target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider,
237             childLbStatusListener);
238         pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
239         childPolicyMap.put(target, pooledChildPolicyWrapper);
240         return pooledChildPolicyWrapper.getObject();
241       } else {
242         ChildPolicyWrapper childPolicyWrapper = pooledChildPolicyWrapper.getObject();
243         if (childPolicyWrapper.getPicker() != null) {
244           childPolicyWrapper.refreshState();
245         }
246         return childPolicyWrapper;
247       }
248     }
249 
250     // GuardedBy CachingRlsLbClient.lock
createOrGet(List<String> targets)251     List<ChildPolicyWrapper> createOrGet(List<String> targets) {
252       List<ChildPolicyWrapper> retVal = new ArrayList<>();
253       for (String target : targets) {
254         retVal.add(createOrGet(target));
255       }
256       return retVal;
257     }
258 
259     // GuardedBy CachingRlsLbClient.lock
release(ChildPolicyWrapper childPolicyWrapper)260     void release(ChildPolicyWrapper childPolicyWrapper) {
261       checkNotNull(childPolicyWrapper, "childPolicyWrapper");
262       String target = childPolicyWrapper.getTarget();
263       RefCountedChildPolicyWrapper existing = childPolicyMap.get(target);
264       checkState(existing != null, "Cannot access already released object");
265       existing.returnObject(childPolicyWrapper);
266       if (existing.isReleased()) {
267         childPolicyMap.remove(target);
268       }
269     }
270   }
271 
272   /**
273    * ChildPolicyWrapper is a wrapper class for child load balancing policy with associated helper /
274    * utility classes to manage the child policy.
275    */
276   static final class ChildPolicyWrapper {
277 
278     private final String target;
279     private final ChildPolicyReportingHelper helper;
280     private final LoadBalancer lb;
281     private volatile SubchannelPicker picker;
282     private ConnectivityState state;
283 
ChildPolicyWrapper( String target, ChildLoadBalancingPolicy childPolicy, final ResolvedAddressFactory childLbResolvedAddressFactory, ChildLoadBalancerHelperProvider childLbHelperProvider, ChildLbStatusListener childLbStatusListener)284     public ChildPolicyWrapper(
285         String target,
286         ChildLoadBalancingPolicy childPolicy,
287         final ResolvedAddressFactory childLbResolvedAddressFactory,
288         ChildLoadBalancerHelperProvider childLbHelperProvider,
289         ChildLbStatusListener childLbStatusListener) {
290       this.target = target;
291       this.helper =
292           new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
293       LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
294       final ConfigOrError lbConfig =
295           lbProvider
296               .parseLoadBalancingPolicyConfig(
297                   childPolicy.getEffectiveChildPolicy(target));
298       this.lb = lbProvider.newLoadBalancer(helper);
299       helper.getChannelLogger().log(
300           ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", lbConfig.getConfig());
301       helper.getSynchronizationContext().execute(
302           new Runnable() {
303             @Override
304             public void run() {
305               if (!lb.acceptResolvedAddresses(
306                   childLbResolvedAddressFactory.create(lbConfig.getConfig()))) {
307                 helper.refreshNameResolution();
308               }
309               lb.requestConnection();
310             }
311           });
312     }
313 
getTarget()314     String getTarget() {
315       return target;
316     }
317 
getPicker()318     SubchannelPicker getPicker() {
319       return picker;
320     }
321 
getHelper()322     ChildPolicyReportingHelper getHelper() {
323       return helper;
324     }
325 
getState()326     public ConnectivityState getState() {
327       return state;
328     }
329 
refreshState()330     void refreshState() {
331       helper.getSynchronizationContext().execute(
332           new Runnable() {
333             @Override
334             public void run() {
335               helper.updateBalancingState(state, picker);
336             }
337           }
338       );
339     }
340 
shutdown()341     void shutdown() {
342       helper.getSynchronizationContext().execute(
343           new Runnable() {
344             @Override
345             public void run() {
346               lb.shutdown();
347             }
348           }
349       );
350     }
351 
352     @Override
toString()353     public String toString() {
354       return MoreObjects.toStringHelper(this)
355           .add("target", target)
356           .add("picker", picker)
357           .add("state", state)
358           .toString();
359     }
360 
361     /**
362      * A delegating {@link io.grpc.LoadBalancer.Helper} maintains status of {@link
363      * ChildPolicyWrapper} when {@link Subchannel} status changed. This helper is used between child
364      * policy and parent load-balancer where each picker in child policy is governed by a governing
365      * picker (RlsPicker). The governing picker will be reported back to the parent load-balancer.
366      */
367     final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper {
368 
369       private final ChildLoadBalancerHelper delegate;
370       private final ChildLbStatusListener listener;
371 
ChildPolicyReportingHelper( ChildLoadBalancerHelperProvider childHelperProvider, ChildLbStatusListener listener)372       ChildPolicyReportingHelper(
373           ChildLoadBalancerHelperProvider childHelperProvider,
374           ChildLbStatusListener listener) {
375         checkNotNull(childHelperProvider, "childHelperProvider");
376         this.delegate = childHelperProvider.forTarget(getTarget());
377         this.listener = checkNotNull(listener, "listener");
378       }
379 
380       @Override
delegate()381       protected Helper delegate() {
382         return delegate;
383       }
384 
385       @Override
updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker)386       public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
387         picker = newPicker;
388         state = newState;
389         super.updateBalancingState(newState, newPicker);
390         listener.onStatusChanged(newState);
391       }
392     }
393   }
394 
395   /** Listener for child lb status change events. */
396   interface ChildLbStatusListener {
397 
398     /** Notifies when child lb status changes. */
onStatusChanged(ConnectivityState newState)399     void onStatusChanged(ConnectivityState newState);
400   }
401 
402   private static final class RefCountedChildPolicyWrapper
403       implements ObjectPool<ChildPolicyWrapper> {
404 
405     private final AtomicLong refCnt = new AtomicLong();
406     @Nullable
407     private ChildPolicyWrapper childPolicyWrapper;
408 
RefCountedChildPolicyWrapper(ChildPolicyWrapper childPolicyWrapper)409     private RefCountedChildPolicyWrapper(ChildPolicyWrapper childPolicyWrapper) {
410       this.childPolicyWrapper = checkNotNull(childPolicyWrapper, "childPolicyWrapper");
411     }
412 
413     @Override
getObject()414     public ChildPolicyWrapper getObject() {
415       checkState(!isReleased(), "ChildPolicyWrapper is already released");
416       refCnt.getAndIncrement();
417       return childPolicyWrapper;
418     }
419 
420     @Override
421     @Nullable
returnObject(Object object)422     public ChildPolicyWrapper returnObject(Object object) {
423       checkState(
424           !isReleased(),
425           "cannot return already released ChildPolicyWrapper, this is possibly a bug.");
426       checkState(
427           childPolicyWrapper == object,
428           "returned object doesn't match the pooled childPolicyWrapper");
429       long newCnt = refCnt.decrementAndGet();
430       checkState(newCnt != -1, "Cannot return never pooled childPolicyWrapper");
431       if (newCnt == 0) {
432         childPolicyWrapper.shutdown();
433         childPolicyWrapper = null;
434       }
435       return null;
436     }
437 
isReleased()438     boolean isReleased() {
439       return childPolicyWrapper == null;
440     }
441 
of(ChildPolicyWrapper childPolicyWrapper)442     static RefCountedChildPolicyWrapper of(ChildPolicyWrapper childPolicyWrapper) {
443       return new RefCountedChildPolicyWrapper(childPolicyWrapper);
444     }
445 
446     @Override
toString()447     public String toString() {
448       return MoreObjects.toStringHelper(this)
449           .add("object", childPolicyWrapper)
450           .add("refCnt", refCnt.get())
451           .toString();
452     }
453   }
454 
455   /** Exception thrown when attempting to parse child policy encountered parsing issue. */
456   static final class InvalidChildPolicyConfigException extends Exception {
457 
458     private static final long serialVersionUID = 0L;
459 
InvalidChildPolicyConfigException(String message)460     InvalidChildPolicyConfigException(String message) {
461       super(message);
462     }
463 
464     @Override
fillInStackTrace()465     public synchronized Throwable fillInStackTrace() {
466       // no stack trace above this point
467       return this;
468     }
469   }
470 }
471