xref: /aosp_15_r20/external/grpc-grpc-java/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.truth.Truth.assertThat;
21 import static io.grpc.ConnectivityState.CONNECTING;
22 import static io.grpc.ConnectivityState.IDLE;
23 import static io.grpc.ConnectivityState.READY;
24 import static io.grpc.ConnectivityState.SHUTDOWN;
25 import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26 import static io.grpc.grpclb.GrpclbState.BUFFER_ENTRY;
27 import static io.grpc.grpclb.GrpclbState.DROP_PICK_RESULT;
28 import static io.grpc.grpclb.GrpclbState.NO_USE_AUTHORITY_SUFFIX;
29 import static org.junit.Assert.assertEquals;
30 import static org.junit.Assert.assertFalse;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.AdditionalAnswers.delegatesTo;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyString;
37 import static org.mockito.ArgumentMatchers.eq;
38 import static org.mockito.ArgumentMatchers.same;
39 import static org.mockito.Mockito.atLeast;
40 import static org.mockito.Mockito.doAnswer;
41 import static org.mockito.Mockito.inOrder;
42 import static org.mockito.Mockito.mock;
43 import static org.mockito.Mockito.never;
44 import static org.mockito.Mockito.times;
45 import static org.mockito.Mockito.verify;
46 import static org.mockito.Mockito.verifyNoMoreInteractions;
47 import static org.mockito.Mockito.when;
48 
49 import com.google.common.collect.Iterables;
50 import com.google.protobuf.ByteString;
51 import com.google.protobuf.util.Durations;
52 import com.google.protobuf.util.Timestamps;
53 import io.grpc.Attributes;
54 import io.grpc.ChannelLogger;
55 import io.grpc.ClientStreamTracer;
56 import io.grpc.ConnectivityState;
57 import io.grpc.ConnectivityStateInfo;
58 import io.grpc.Context;
59 import io.grpc.Context.CancellableContext;
60 import io.grpc.EquivalentAddressGroup;
61 import io.grpc.LoadBalancer.CreateSubchannelArgs;
62 import io.grpc.LoadBalancer.Helper;
63 import io.grpc.LoadBalancer.PickResult;
64 import io.grpc.LoadBalancer.PickSubchannelArgs;
65 import io.grpc.LoadBalancer.ResolvedAddresses;
66 import io.grpc.LoadBalancer.Subchannel;
67 import io.grpc.LoadBalancer.SubchannelPicker;
68 import io.grpc.LoadBalancer.SubchannelStateListener;
69 import io.grpc.ManagedChannel;
70 import io.grpc.Metadata;
71 import io.grpc.Status;
72 import io.grpc.Status.Code;
73 import io.grpc.SynchronizationContext;
74 import io.grpc.grpclb.GrpclbState.BackendEntry;
75 import io.grpc.grpclb.GrpclbState.DropEntry;
76 import io.grpc.grpclb.GrpclbState.ErrorEntry;
77 import io.grpc.grpclb.GrpclbState.IdleSubchannelEntry;
78 import io.grpc.grpclb.GrpclbState.Mode;
79 import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
80 import io.grpc.inprocess.InProcessChannelBuilder;
81 import io.grpc.inprocess.InProcessServerBuilder;
82 import io.grpc.internal.BackoffPolicy;
83 import io.grpc.internal.FakeClock;
84 import io.grpc.lb.v1.ClientStats;
85 import io.grpc.lb.v1.ClientStatsPerToken;
86 import io.grpc.lb.v1.FallbackResponse;
87 import io.grpc.lb.v1.InitialLoadBalanceRequest;
88 import io.grpc.lb.v1.InitialLoadBalanceResponse;
89 import io.grpc.lb.v1.LoadBalanceRequest;
90 import io.grpc.lb.v1.LoadBalanceResponse;
91 import io.grpc.lb.v1.LoadBalancerGrpc;
92 import io.grpc.lb.v1.Server;
93 import io.grpc.lb.v1.ServerList;
94 import io.grpc.stub.StreamObserver;
95 import java.net.InetSocketAddress;
96 import java.net.SocketAddress;
97 import java.text.MessageFormat;
98 import java.util.ArrayDeque;
99 import java.util.ArrayList;
100 import java.util.Arrays;
101 import java.util.Collections;
102 import java.util.List;
103 import java.util.concurrent.ScheduledExecutorService;
104 import java.util.concurrent.TimeUnit;
105 import javax.annotation.Nonnull;
106 import javax.annotation.Nullable;
107 import org.junit.After;
108 import org.junit.Before;
109 import org.junit.Rule;
110 import org.junit.Test;
111 import org.junit.runner.RunWith;
112 import org.junit.runners.JUnit4;
113 import org.mockito.AdditionalAnswers;
114 import org.mockito.ArgumentCaptor;
115 import org.mockito.ArgumentMatchers;
116 import org.mockito.Captor;
117 import org.mockito.InOrder;
118 import org.mockito.Mock;
119 import org.mockito.invocation.InvocationOnMock;
120 import org.mockito.junit.MockitoJUnit;
121 import org.mockito.junit.MockitoRule;
122 import org.mockito.stubbing.Answer;
123 
124 /** Unit tests for {@link GrpclbLoadBalancer}. */
125 @RunWith(JUnit4.class)
126 public class GrpclbLoadBalancerTest {
127   @Rule public final MockitoRule mocks = MockitoJUnit.rule();
128 
129   private static final String SERVICE_AUTHORITY = "api.google.com";
130 
131   // The tasks are wrapped by SynchronizationContext, so we can't compare the types
132   // directly.
133   private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
134       new FakeClock.TaskFilter() {
135         @Override
136         public boolean shouldAccept(Runnable command) {
137           return command.toString().contains(GrpclbState.LoadReportingTask.class.getSimpleName());
138         }
139       };
140   private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER =
141       new FakeClock.TaskFilter() {
142         @Override
143         public boolean shouldAccept(Runnable command) {
144           return command.toString().contains(GrpclbState.FallbackModeTask.class.getSimpleName());
145         }
146       };
147   private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER =
148       new FakeClock.TaskFilter() {
149         @Override
150         public boolean shouldAccept(Runnable command) {
151           return command.toString().contains(GrpclbState.LbRpcRetryTask.class.getSimpleName());
152         }
153       };
154   private static final Attributes LB_BACKEND_ATTRS =
155       Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build();
156 
157   private final Helper helper = mock(Helper.class, delegatesTo(new FakeHelper()));
158   private final SubchannelPool subchannelPool =
159       mock(
160           SubchannelPool.class,
161           delegatesTo(new CachedSubchannelPool(helper)));
162   private final ArrayList<String> logs = new ArrayList<>();
163   private final ChannelLogger channelLogger = new ChannelLogger() {
164       @Override
165       public void log(ChannelLogLevel level, String msg) {
166         logs.add(level + ": " + msg);
167       }
168 
169       @Override
170       public void log(ChannelLogLevel level, String template, Object... args) {
171         log(level, MessageFormat.format(template, args));
172       }
173     };
174   private SubchannelPicker currentPicker;
175   private LoadBalancerGrpc.LoadBalancerImplBase mockLbService;
176   @Captor
177   private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor;
178   private final FakeClock fakeClock = new FakeClock();
179   private final ArrayDeque<StreamObserver<LoadBalanceRequest>> lbRequestObservers =
180       new ArrayDeque<>();
181   private final ArrayDeque<Subchannel> mockSubchannels = new ArrayDeque<>();
182   private final ArrayDeque<ManagedChannel> fakeOobChannels = new ArrayDeque<>();
183   private final ArrayList<Subchannel> unpooledSubchannelTracker = new ArrayList<>();
184   private final ArrayList<ManagedChannel> oobChannelTracker = new ArrayList<>();
185   private final SynchronizationContext syncContext = new SynchronizationContext(
186       new Thread.UncaughtExceptionHandler() {
187         @Override
188         public void uncaughtException(Thread t, Throwable e) {
189           throw new AssertionError(e);
190         }
191       });
192   private static final ClientStreamTracer.StreamInfo STREAM_INFO =
193       ClientStreamTracer.StreamInfo.newBuilder().build();
194 
195   private io.grpc.Server fakeLbServer;
196   @Captor
197   private ArgumentCaptor<SubchannelPicker> pickerCaptor;
198   @Mock
199   private BackoffPolicy.Provider backoffPolicyProvider;
200   @Mock
201   private BackoffPolicy backoffPolicy1;
202   @Mock
203   private BackoffPolicy backoffPolicy2;
204   private GrpclbLoadBalancer balancer;
205   private final ArgumentCaptor<CreateSubchannelArgs> createSubchannelArgsCaptor =
206       ArgumentCaptor.forClass(CreateSubchannelArgs.class);
207 
208   @Before
setUp()209   public void setUp() throws Exception {
210     mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo(
211         new LoadBalancerGrpc.LoadBalancerImplBase() {
212           @Override
213           @SuppressWarnings("unchecked")
214           public StreamObserver<LoadBalanceRequest> balanceLoad(
215               final StreamObserver<LoadBalanceResponse> responseObserver) {
216             StreamObserver<LoadBalanceRequest> requestObserver =
217                 mock(StreamObserver.class);
218             Answer<Void> closeRpc = new Answer<Void>() {
219                 @Override
220                 public Void answer(InvocationOnMock invocation) {
221                   responseObserver.onCompleted();
222                   return null;
223                 }
224               };
225             doAnswer(closeRpc).when(requestObserver).onCompleted();
226             lbRequestObservers.add(requestObserver);
227             return requestObserver;
228           }
229         }));
230     fakeLbServer = InProcessServerBuilder.forName("fakeLb")
231         .directExecutor().addService(mockLbService).build().start();
232     when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
233     when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
234     when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
235     balancer = new GrpclbLoadBalancer(
236         helper,
237         Context.ROOT,
238         subchannelPool,
239         fakeClock.getTimeProvider(),
240         fakeClock.getStopwatchSupplier().get(),
241         backoffPolicyProvider);
242   }
243 
244   @After
tearDown()245   public void tearDown() {
246     try {
247       if (balancer != null) {
248         syncContext.execute(new Runnable() {
249             @Override
250             public void run() {
251               balancer.shutdown();
252             }
253           });
254       }
255       for (ManagedChannel channel : oobChannelTracker) {
256         assertTrue(channel + " is shutdown", channel.isShutdown());
257         // balancer should have closed the LB stream, terminating the OOB channel.
258         assertTrue(channel + " is terminated", channel.isTerminated());
259       }
260       for (Subchannel subchannel : unpooledSubchannelTracker) {
261         verify(subchannel).shutdown();
262       }
263       // No timer should linger after shutdown
264       assertThat(fakeClock.getPendingTasks()).isEmpty();
265     } finally {
266       if (fakeLbServer != null) {
267         fakeLbServer.shutdownNow();
268       }
269     }
270   }
271 
272   @Test
roundRobinPickerNoDrop()273   public void roundRobinPickerNoDrop() {
274     GrpclbClientLoadRecorder loadRecorder =
275         new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
276     Subchannel subchannel = mock(Subchannel.class);
277     BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
278     BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
279 
280     List<BackendEntry> pickList = Arrays.asList(b1, b2);
281     RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
282 
283     PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
284     Metadata headers1 = new Metadata();
285     // The existing token on the headers will be replaced
286     headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
287     when(args1.getHeaders()).thenReturn(headers1);
288     assertSame(b1.result, picker.pickSubchannel(args1));
289     verify(args1).getHeaders();
290     assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
291 
292     PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
293     Metadata headers2 = new Metadata();
294     when(args2.getHeaders()).thenReturn(headers2);
295     assertSame(b2.result, picker.pickSubchannel(args2));
296     verify(args2).getHeaders();
297     assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
298 
299     PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
300     Metadata headers3 = new Metadata();
301     when(args3.getHeaders()).thenReturn(headers3);
302     assertSame(b1.result, picker.pickSubchannel(args3));
303     verify(args3).getHeaders();
304     assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
305 
306     verify(subchannel, never()).getAttributes();
307   }
308 
309   @Test
roundRobinPickerWithDrop()310   public void roundRobinPickerWithDrop() {
311     assertTrue(DROP_PICK_RESULT.isDrop());
312     GrpclbClientLoadRecorder loadRecorder =
313         new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
314     Subchannel subchannel = mock(Subchannel.class);
315     // 1 out of 2 requests are to be dropped
316     DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
317     List<DropEntry> dropList = Arrays.asList(null, d);
318 
319     BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
320     BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
321     List<BackendEntry> pickList = Arrays.asList(b1, b2);
322     RoundRobinPicker picker = new RoundRobinPicker(dropList, pickList);
323 
324     // dropList[0], pickList[0]
325     PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
326     Metadata headers1 = new Metadata();
327     headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
328     when(args1.getHeaders()).thenReturn(headers1);
329     assertSame(b1.result, picker.pickSubchannel(args1));
330     verify(args1).getHeaders();
331     assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
332 
333     // dropList[1]: drop
334     PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
335     Metadata headers2 = new Metadata();
336     when(args2.getHeaders()).thenReturn(headers2);
337     assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args2));
338     verify(args2, never()).getHeaders();
339 
340     // dropList[0], pickList[1]
341     PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
342     Metadata headers3 = new Metadata();
343     when(args3.getHeaders()).thenReturn(headers3);
344     assertSame(b2.result, picker.pickSubchannel(args3));
345     verify(args3).getHeaders();
346     assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
347 
348     // dropList[1]: drop
349     PickSubchannelArgs args4 = mock(PickSubchannelArgs.class);
350     Metadata headers4 = new Metadata();
351     when(args4.getHeaders()).thenReturn(headers4);
352     assertSame(DROP_PICK_RESULT, picker.pickSubchannel(args4));
353     verify(args4, never()).getHeaders();
354 
355     // dropList[0], pickList[0]
356     PickSubchannelArgs args5 = mock(PickSubchannelArgs.class);
357     Metadata headers5 = new Metadata();
358     when(args5.getHeaders()).thenReturn(headers5);
359     assertSame(b1.result, picker.pickSubchannel(args5));
360     verify(args5).getHeaders();
361     assertThat(headers5.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
362 
363     verify(subchannel, never()).getAttributes();
364   }
365 
366   @Test
roundRobinPickerWithIdleEntry_noDrop()367   public void roundRobinPickerWithIdleEntry_noDrop() {
368     Subchannel subchannel = mock(Subchannel.class);
369     IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
370 
371     RoundRobinPicker picker =
372         new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry));
373     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
374 
375     verify(subchannel, never()).requestConnection();
376     assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
377     verify(subchannel).requestConnection();
378     assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
379     // Only the first pick triggers requestConnection()
380     verify(subchannel).requestConnection();
381   }
382 
383   @Test
roundRobinPickerWithIdleEntry_andDrop()384   public void roundRobinPickerWithIdleEntry_andDrop() {
385     GrpclbClientLoadRecorder loadRecorder =
386         new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
387     // 1 out of 2 requests are to be dropped
388     DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
389     List<DropEntry> dropList = Arrays.asList(null, d);
390 
391     Subchannel subchannel = mock(Subchannel.class);
392     IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
393 
394     RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry));
395     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
396 
397     verify(subchannel, never()).requestConnection();
398     assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
399     verify(subchannel).requestConnection();
400 
401     assertThat(picker.pickSubchannel(args)).isSameInstanceAs(DROP_PICK_RESULT);
402 
403     verify(subchannel).requestConnection();
404     assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
405     // Only the first pick triggers requestConnection()
406     verify(subchannel).requestConnection();
407   }
408 
409   @Test
loadReporting()410   public void loadReporting() {
411     Metadata headers = new Metadata();
412     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
413     when(args.getHeaders()).thenReturn(headers);
414 
415     long loadReportIntervalMillis = 1983;
416     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
417     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
418 
419     // Fallback timer is started as soon as address is resolved.
420     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
421 
422     assertEquals(1, fakeOobChannels.size());
423     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
424     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
425     assertEquals(1, lbRequestObservers.size());
426     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
427     InOrder inOrder = inOrder(lbRequestObserver);
428     InOrder helperInOrder = inOrder(helper, subchannelPool);
429 
430     inOrder.verify(lbRequestObserver).onNext(
431         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
432             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
433             .build()));
434 
435     // Simulate receiving LB response
436     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
437     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
438 
439     // Load reporting task is scheduled
440     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
441     assertEquals(0, fakeClock.runDueTasks());
442 
443     List<ServerEntry> backends = Arrays.asList(
444         new ServerEntry("127.0.0.1", 2000, "token0001"),
445         new ServerEntry("token0001"),  // drop
446         new ServerEntry("127.0.0.1", 2010, "token0002"),
447         new ServerEntry("token0003"));  // drop
448 
449     lbResponseObserver.onNext(buildLbResponse(backends));
450 
451     assertEquals(2, mockSubchannels.size());
452     Subchannel subchannel1 = mockSubchannels.poll();
453     Subchannel subchannel2 = mockSubchannels.poll();
454     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
455     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
456     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
457     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
458 
459     helperInOrder.verify(helper, atLeast(1))
460         .updateBalancingState(eq(READY), pickerCaptor.capture());
461     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
462     assertThat(picker.dropList).containsExactly(
463         null,
464         new DropEntry(getLoadRecorder(), "token0001"),
465         null,
466         new DropEntry(getLoadRecorder(), "token0003")).inOrder();
467     assertThat(picker.pickList).containsExactly(
468         new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
469         new BackendEntry(subchannel2, getLoadRecorder(), "token0002")).inOrder();
470 
471     // Report, no data
472     assertNextReport(
473         inOrder, lbRequestObserver, loadReportIntervalMillis,
474         ClientStats.newBuilder().build());
475 
476     PickResult pick1 = picker.pickSubchannel(args);
477     assertSame(subchannel1, pick1.getSubchannel());
478     assertSame(getLoadRecorder(), pick1.getStreamTracerFactory());
479 
480     // Merely the pick will not be recorded as upstart.
481     assertNextReport(
482         inOrder, lbRequestObserver, loadReportIntervalMillis,
483         ClientStats.newBuilder().build());
484 
485     ClientStreamTracer tracer1 =
486         pick1.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
487     tracer1.streamCreated(Attributes.EMPTY, new Metadata());
488 
489     PickResult pick2 = picker.pickSubchannel(args);
490     assertNull(pick2.getSubchannel());
491     assertSame(DROP_PICK_RESULT, pick2);
492 
493     // Report includes upstart of pick1 and the drop of pick2
494     assertNextReport(
495         inOrder, lbRequestObserver, loadReportIntervalMillis,
496         ClientStats.newBuilder()
497             .setNumCallsStarted(2)
498             .setNumCallsFinished(1)  // pick2
499             .addCallsFinishedWithDrop(
500                 ClientStatsPerToken.newBuilder()
501                     .setLoadBalanceToken("token0001")
502                     .setNumCalls(1)          // pick2
503                     .build())
504             .build());
505 
506     PickResult pick3 = picker.pickSubchannel(args);
507     assertSame(subchannel2, pick3.getSubchannel());
508     assertSame(getLoadRecorder(), pick3.getStreamTracerFactory());
509     ClientStreamTracer tracer3 =
510         pick3.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
511     tracer3.streamCreated(Attributes.EMPTY, new Metadata());
512 
513     // pick3 has sent out headers
514     tracer3.outboundHeaders();
515 
516     // 3rd report includes pick3's upstart
517     assertNextReport(
518         inOrder, lbRequestObserver, loadReportIntervalMillis,
519         ClientStats.newBuilder()
520             .setNumCallsStarted(1)
521             .build());
522 
523     PickResult pick4 = picker.pickSubchannel(args);
524     assertNull(pick4.getSubchannel());
525     assertSame(DROP_PICK_RESULT, pick4);
526 
527     // pick1 ended without sending anything
528     tracer1.streamClosed(Status.CANCELLED);
529 
530     // 4th report includes end of pick1 and drop of pick4
531     assertNextReport(
532         inOrder, lbRequestObserver, loadReportIntervalMillis,
533         ClientStats.newBuilder()
534             .setNumCallsStarted(1)  // pick4
535             .setNumCallsFinished(2)
536             .setNumCallsFinishedWithClientFailedToSend(1)   // pick1
537             .addCallsFinishedWithDrop(
538                 ClientStatsPerToken.newBuilder()
539                     .setLoadBalanceToken("token0003")
540                     .setNumCalls(1)   // pick4
541                     .build())
542             .build());
543 
544     PickResult pick5 = picker.pickSubchannel(args);
545     assertSame(subchannel1, pick1.getSubchannel());
546     assertSame(getLoadRecorder(), pick5.getStreamTracerFactory());
547     ClientStreamTracer tracer5 =
548         pick5.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
549     tracer5.streamCreated(Attributes.EMPTY, new Metadata());
550 
551     // pick3 ended without receiving response headers
552     tracer3.streamClosed(Status.DEADLINE_EXCEEDED);
553 
554     // pick5 sent and received headers
555     tracer5.outboundHeaders();
556     tracer5.inboundHeaders();
557 
558     // 5th report includes pick3's end and pick5's upstart
559     assertNextReport(
560         inOrder, lbRequestObserver, loadReportIntervalMillis,
561         ClientStats.newBuilder()
562             .setNumCallsStarted(1)  // pick5
563             .setNumCallsFinished(1)  // pick3
564             .build());
565 
566     // pick5 ends
567     tracer5.streamClosed(Status.OK);
568 
569     // 6th report includes pick5's end
570     assertNextReport(
571         inOrder, lbRequestObserver, loadReportIntervalMillis,
572         ClientStats.newBuilder()
573             .setNumCallsFinished(1)
574             .setNumCallsFinishedKnownReceived(1)
575             .build());
576 
577     assertEquals(1, fakeClock.numPendingTasks());
578     // Balancer closes the stream, scheduled reporting task cancelled
579     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
580     assertEquals(0, fakeClock.numPendingTasks());
581 
582     // New stream created
583     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
584     lbResponseObserver = lbResponseObserverCaptor.getValue();
585     assertEquals(1, lbRequestObservers.size());
586     lbRequestObserver = lbRequestObservers.poll();
587     inOrder = inOrder(lbRequestObserver);
588 
589     inOrder.verify(lbRequestObserver).onNext(
590         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
591             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
592             .build()));
593 
594     // Load reporting is also requested
595     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
596 
597     // No picker created because balancer is still using the results from the last stream
598     helperInOrder.verify(helper, never())
599         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
600 
601     // Make a new pick on that picker.  It will not show up on the report of the new stream, because
602     // that picker is associated with the previous stream.
603     PickResult pick6 = picker.pickSubchannel(args);
604     assertNull(pick6.getSubchannel());
605     assertSame(DROP_PICK_RESULT, pick6);
606     assertNextReport(
607         inOrder, lbRequestObserver, loadReportIntervalMillis,
608         ClientStats.newBuilder().build());
609 
610     // New stream got the list update
611     lbResponseObserver.onNext(buildLbResponse(backends));
612 
613     // Same backends, thus no new subchannels
614     helperInOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
615         any(EquivalentAddressGroup.class), any(Attributes.class));
616     // But the new RoundRobinEntries have a new loadRecorder, thus considered different from
617     // the previous list, thus a new picker is created
618     helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
619     picker = (RoundRobinPicker) pickerCaptor.getValue();
620 
621     PickResult pick1p = picker.pickSubchannel(args);
622     assertSame(subchannel1, pick1p.getSubchannel());
623     assertSame(getLoadRecorder(), pick1p.getStreamTracerFactory());
624     pick1p.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, new Metadata());
625 
626     // The pick from the new stream will be included in the report
627     assertNextReport(
628         inOrder, lbRequestObserver, loadReportIntervalMillis,
629         ClientStats.newBuilder()
630             .setNumCallsStarted(1)
631             .build());
632 
633     verify(args, atLeast(0)).getHeaders();
634     verifyNoMoreInteractions(args);
635   }
636 
637   @Test
abundantInitialResponse()638   public void abundantInitialResponse() {
639     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
640     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
641     assertEquals(1, fakeOobChannels.size());
642     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
643     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
644 
645     // Simulate LB initial response
646     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
647     lbResponseObserver.onNext(buildInitialResponse(1983));
648 
649     // Load reporting task is scheduled
650     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
651     FakeClock.ScheduledTask scheduledTask =
652         Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER));
653     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
654 
655     logs.clear();
656     // Simulate an abundant LB initial response, with a different report interval
657     lbResponseObserver.onNext(buildInitialResponse(9097));
658 
659     // This incident is logged
660     assertThat(logs).containsExactly(
661             "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildInitialResponse(9097),
662             "WARNING: [grpclb-<api.google.com>] "
663                 + "Ignoring unexpected response type: INITIAL_RESPONSE")
664         .inOrder();
665 
666     // It doesn't affect load-reporting at all
667     assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER))
668         .containsExactly(scheduledTask);
669     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
670   }
671 
672   @SuppressWarnings("unchecked")
673   @Test
raceBetweenHandleAddressesAndLbStreamClosure()674   public void raceBetweenHandleAddressesAndLbStreamClosure() {
675     InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1);
676     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
677         createResolvedBalancerAddresses(1));
678     assertEquals(1, fakeOobChannels.size());
679     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
680     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
681     assertEquals(1, lbRequestObservers.size());
682     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
683     verify(lbRequestObserver).onNext(
684         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
685             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
686             .build()));
687 
688     // Close lbStream
689     lbResponseObserver.onCompleted();
690     inOrder.verify(backoffPolicyProvider).get();
691     inOrder.verify(backoffPolicy1).nextBackoffNanos();
692     // Retry task scheduled
693     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
694     FakeClock.ScheduledTask retryTask =
695         Iterables.getOnlyElement(fakeClock.getPendingTasks(LB_RPC_RETRY_TASK_FILTER));
696     assertEquals(10L, retryTask.getDelay(TimeUnit.NANOSECONDS));
697 
698     // Receive the same Lb address again
699     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
700         createResolvedBalancerAddresses(1));
701     // Retry task cancelled
702     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
703     // Reuse the existing OOB channel
704     assertEquals(1, fakeOobChannels.size());
705     // Start a new LoadBalance RPC
706     inOrder.verify(mockLbService).balanceLoad(any(StreamObserver.class));
707     assertEquals(1, lbRequestObservers.size());
708     lbRequestObserver = lbRequestObservers.poll();
709     verify(lbRequestObserver).onNext(
710         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
711             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
712             .build()));
713 
714     // Simulate a race condition where the task has just started when it's cancelled
715     retryTask.command.run();
716     inOrder.verifyNoMoreInteractions();
717   }
718 
719   @Test
raceBetweenLoadReportingAndLbStreamClosure()720   public void raceBetweenLoadReportingAndLbStreamClosure() {
721     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
722     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
723     assertEquals(1, fakeOobChannels.size());
724     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
725     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
726     assertEquals(1, lbRequestObservers.size());
727     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
728     InOrder inOrder = inOrder(lbRequestObserver);
729 
730     inOrder.verify(lbRequestObserver).onNext(
731         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
732             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
733             .build()));
734 
735     // Simulate receiving LB response
736     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
737     lbResponseObserver.onNext(buildInitialResponse(1983));
738     // Load reporting task is scheduled
739     assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
740     FakeClock.ScheduledTask scheduledTask =
741         Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER));
742     assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
743 
744     // Close lbStream
745     lbResponseObserver.onCompleted();
746 
747     // Reporting task cancelled
748     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
749 
750     // Simulate a race condition where the task has just started when its cancelled
751     scheduledTask.command.run();
752 
753     // No report sent. No new task scheduled
754     inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class));
755     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
756   }
757 
assertNextReport( InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver, long loadReportIntervalMillis, ClientStats expectedReport)758   private void assertNextReport(
759       InOrder inOrder, StreamObserver<LoadBalanceRequest> lbRequestObserver,
760       long loadReportIntervalMillis, ClientStats expectedReport) {
761     assertEquals(0, fakeClock.forwardTime(loadReportIntervalMillis - 1, TimeUnit.MILLISECONDS));
762     inOrder.verifyNoMoreInteractions();
763     assertEquals(1, fakeClock.forwardTime(1, TimeUnit.MILLISECONDS));
764     assertEquals(1, fakeClock.numPendingTasks());
765     inOrder.verify(lbRequestObserver).onNext(
766         eq(LoadBalanceRequest.newBuilder()
767             .setClientStats(
768                 ClientStats.newBuilder(expectedReport)
769                     .setTimestamp(Timestamps.fromNanos(fakeClock.getTicker().read()))
770                     .build())
771             .build()));
772   }
773 
774   @Test
receiveNoBackendAndBalancerAddress()775   public void receiveNoBackendAndBalancerAddress() {
776     deliverResolvedAddresses(
777         Collections.<EquivalentAddressGroup>emptyList(),
778         Collections.<EquivalentAddressGroup>emptyList());
779     verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
780     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
781     assertThat(picker.dropList).isEmpty();
782     Status error = Iterables.getOnlyElement(picker.pickList).picked(new Metadata()).getStatus();
783     assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
784     assertThat(error.getDescription()).isEqualTo("No backend or balancer addresses found");
785   }
786 
787   @Test
nameResolutionFailsThenRecover()788   public void nameResolutionFailsThenRecover() {
789     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
790     deliverNameResolutionError(error);
791     verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
792     assertThat(logs)
793         .containsExactly(
794             "INFO: [grpclb-<api.google.com>] Created",
795             "DEBUG: [grpclb-<api.google.com>] Error: " + error)
796         .inOrder();
797     logs.clear();
798 
799     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
800     assertThat(picker.dropList).isEmpty();
801     PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
802     assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
803     assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription());
804 
805     // Recover with a subsequent success
806     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
807 
808     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
809 
810     verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
811         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
812     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
813   }
814 
815   @Test
grpclbThenNameResolutionFails()816   public void grpclbThenNameResolutionFails() {
817     InOrder inOrder = inOrder(helper, subchannelPool);
818     // Go to GRPCLB first
819     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
820     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
821 
822     verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
823         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
824     assertEquals(1, fakeOobChannels.size());
825     ManagedChannel oobChannel = fakeOobChannels.poll();
826     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
827     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
828 
829     // Let name resolution fail before round-robin list is ready
830     Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
831     deliverNameResolutionError(error);
832 
833     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
834     RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
835     assertThat(picker.dropList).isEmpty();
836     PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
837     assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
838     assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription());
839     assertFalse(oobChannel.isShutdown());
840 
841     // Simulate receiving LB response
842     List<ServerEntry> backends = Arrays.asList(
843         new ServerEntry("127.0.0.1", 2000, "TOKEN1"),
844         new ServerEntry("127.0.0.1", 2010, "TOKEN2"));
845     lbResponseObserver.onNext(buildInitialResponse());
846     lbResponseObserver.onNext(buildLbResponse(backends));
847 
848     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
849         eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)),
850         any(Attributes.class));
851     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
852         eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)),
853         any(Attributes.class));
854   }
855 
856   @Test
grpclbUpdatedAddresses_avoidsReconnect()857   public void grpclbUpdatedAddresses_avoidsReconnect() {
858     List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1);
859     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
860     deliverResolvedAddresses(backendList, grpclbBalancerList);
861 
862     verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
863         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
864     ManagedChannel oobChannel = fakeOobChannels.poll();
865     assertEquals(1, lbRequestObservers.size());
866 
867     List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1);
868     List<EquivalentAddressGroup> grpclbBalancerList2 = createResolvedBalancerAddresses(2);
869     deliverResolvedAddresses(backendList2, grpclbBalancerList2);
870     verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2)));
871     assertEquals(1, lbRequestObservers.size()); // No additional RPC
872   }
873 
874 
875   @Test
grpclbUpdatedAddresses_reconnectOnAuthorityChange()876   public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() {
877     List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1);
878     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
879     deliverResolvedAddresses(backendList, grpclbBalancerList);
880 
881     verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
882         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
883     ManagedChannel oobChannel = fakeOobChannels.poll();
884     assertEquals(1, lbRequestObservers.size());
885 
886     final String newAuthority = "some-new-authority";
887     List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1);
888     List<EquivalentAddressGroup> grpclbBalancerList2 =
889         Collections.singletonList(
890             new EquivalentAddressGroup(
891                 new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority)));
892     deliverResolvedAddresses(backendList2, grpclbBalancerList2);
893     verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(xattr(grpclbBalancerList2)));
894     assertEquals(1, lbRequestObservers.size()); // No additional RPC
895   }
896 
897   @Test
grpclbWorking()898   public void grpclbWorking() {
899     InOrder inOrder = inOrder(helper, subchannelPool);
900     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
901     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
902 
903     // Fallback timer is started as soon as the addresses are resolved.
904     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
905 
906     verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
907         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
908     assertEquals(1, fakeOobChannels.size());
909     ManagedChannel oobChannel = fakeOobChannels.poll();
910     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
911     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
912     assertEquals(1, lbRequestObservers.size());
913     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
914     verify(lbRequestObserver).onNext(
915         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
916             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
917             .build()));
918 
919     // Simulate receiving LB response
920     List<ServerEntry> backends1 = Arrays.asList(
921         new ServerEntry("127.0.0.1", 2000, "token0001"),
922         new ServerEntry("127.0.0.1", 2010, "token0002"));
923     inOrder.verify(helper, never())
924         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
925     logs.clear();
926     lbResponseObserver.onNext(buildInitialResponse());
927     assertThat(logs).containsExactly(
928         "INFO: [grpclb-<api.google.com>] Got an LB initial response: " + buildInitialResponse());
929     logs.clear();
930     lbResponseObserver.onNext(buildLbResponse(backends1));
931 
932     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
933         eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
934         any(Attributes.class));
935     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
936         eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
937         any(Attributes.class));
938     assertEquals(2, mockSubchannels.size());
939     Subchannel subchannel1 = mockSubchannels.poll();
940     Subchannel subchannel2 = mockSubchannels.poll();
941     verify(subchannel1).requestConnection();
942     verify(subchannel2).requestConnection();
943     assertEquals(
944         new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS),
945         subchannel1.getAddresses());
946     assertEquals(
947         new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS),
948         subchannel2.getAddresses());
949 
950     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
951     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
952 
953     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
954     RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
955     assertThat(picker0.dropList).containsExactly(null, null);
956     assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
957     inOrder.verifyNoMoreInteractions();
958 
959     assertThat(logs).containsExactly(
960         "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends1))
961         .inOrder();
962     logs.clear();
963 
964     // Let subchannels be connected
965     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
966     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
967 
968     RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
969 
970     assertThat(picker1.dropList).containsExactly(null, null);
971     assertThat(picker1.pickList).containsExactly(
972         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
973 
974     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
975     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
976 
977     RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
978     assertThat(picker2.dropList).containsExactly(null, null);
979     assertThat(picker2.pickList).containsExactly(
980         new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
981         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
982         .inOrder();
983 
984     // Disconnected subchannels
985     verify(subchannel1).requestConnection();
986     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
987     verify(subchannel1, times(2)).requestConnection();
988     inOrder.verify(helper).refreshNameResolution();
989     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
990 
991     RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
992     assertThat(picker3.dropList).containsExactly(null, null);
993     assertThat(picker3.pickList).containsExactly(
994         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
995 
996     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
997     inOrder.verifyNoMoreInteractions();
998 
999     // As long as there is at least one READY subchannel, round robin will work.
1000     ConnectivityStateInfo errorState1 =
1001         ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1"));
1002     deliverSubchannelState(subchannel1, errorState1);
1003     inOrder.verify(helper).refreshNameResolution();
1004     inOrder.verifyNoMoreInteractions();
1005 
1006     // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
1007     verify(subchannel2).requestConnection();
1008     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
1009     verify(subchannel2, times(2)).requestConnection();
1010     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
1011 
1012     RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
1013     assertThat(picker4.dropList).containsExactly(null, null);
1014     assertThat(picker4.pickList).containsExactly(BUFFER_ENTRY);
1015 
1016     // Update backends, with a drop entry
1017     List<ServerEntry> backends2 =
1018         Arrays.asList(
1019             new ServerEntry("127.0.0.1", 2030, "token0003"),  // New address
1020             new ServerEntry("token0003"),  // drop
1021             new ServerEntry("127.0.0.1", 2010, "token0004"),  // Existing address with token changed
1022             new ServerEntry("127.0.0.1", 2030, "token0005"),  // New address appearing second time
1023             new ServerEntry("token0006"));  // drop
1024     verify(subchannelPool, never())
1025         .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class));
1026 
1027     lbResponseObserver.onNext(buildLbResponse(backends2));
1028     assertThat(logs).containsExactly(
1029         "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends2))
1030         .inOrder();
1031     logs.clear();
1032 
1033     // not in backends2, closed
1034     verify(subchannelPool).returnSubchannel(same(subchannel1), same(errorState1));
1035     // backends2[2], will be kept
1036     verify(subchannelPool, never())
1037         .returnSubchannel(same(subchannel2), any(ConnectivityStateInfo.class));
1038 
1039     inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
1040         eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)),
1041         any(Attributes.class));
1042     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
1043         eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)),
1044         any(Attributes.class));
1045 
1046     ConnectivityStateInfo errorOnCachedSubchannel1 =
1047         ConnectivityStateInfo.forTransientFailure(
1048             Status.UNAVAILABLE.withDescription("You can get this error even if you are cached"));
1049     deliverSubchannelState(subchannel1, errorOnCachedSubchannel1);
1050 
1051     assertEquals(1, mockSubchannels.size());
1052     Subchannel subchannel3 = mockSubchannels.poll();
1053     verify(subchannel3).requestConnection();
1054     assertEquals(
1055         new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS),
1056         subchannel3.getAddresses());
1057     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
1058     RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
1059     assertThat(picker7.dropList).containsExactly(
1060         null,
1061         new DropEntry(getLoadRecorder(), "token0003"),
1062         null,
1063         null,
1064         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
1065     assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY);
1066 
1067     // State updates on obsolete subchannel1 will only be passed to the pool
1068     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
1069     deliverSubchannelState(
1070         subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
1071     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
1072 
1073     deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
1074     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1075     RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
1076     assertThat(picker8.dropList).containsExactly(
1077         null,
1078         new DropEntry(getLoadRecorder(), "token0003"),
1079         null,
1080         null,
1081         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
1082     // subchannel2 is still IDLE, thus not in the active list
1083     assertThat(picker8.pickList).containsExactly(
1084         new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
1085         new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
1086     // subchannel2 becomes READY and makes it into the list
1087     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
1088     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1089     RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
1090     assertThat(picker9.dropList).containsExactly(
1091         null,
1092         new DropEntry(getLoadRecorder(), "token0003"),
1093         null,
1094         null,
1095         new DropEntry(getLoadRecorder(), "token0006")).inOrder();
1096     assertThat(picker9.pickList).containsExactly(
1097         new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
1098         new BackendEntry(subchannel2, getLoadRecorder(), "token0004"),
1099         new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
1100     verify(subchannelPool, never())
1101         .returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class));
1102 
1103     // Update backends, with no entry
1104     lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
1105     verify(subchannelPool)
1106         .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY)));
1107     verify(subchannelPool)
1108         .returnSubchannel(same(subchannel3), eq(ConnectivityStateInfo.forNonError(READY)));
1109     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
1110     RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue();
1111     assertThat(picker10.dropList).isEmpty();
1112     assertThat(picker10.pickList)
1113         .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS));
1114 
1115     assertFalse(oobChannel.isShutdown());
1116     assertEquals(0, lbRequestObservers.size());
1117     verify(lbRequestObserver, never()).onCompleted();
1118     verify(lbRequestObserver, never()).onError(any(Throwable.class));
1119 
1120     // Load reporting was not requested, thus never scheduled
1121     assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
1122 
1123     verify(subchannelPool, never()).clear();
1124     balancer.shutdown();
1125     verify(subchannelPool).clear();
1126   }
1127 
1128   @Test
roundRobinMode_subchannelStayTransientFailureUntilReady()1129   public void roundRobinMode_subchannelStayTransientFailureUntilReady() {
1130     InOrder inOrder = inOrder(helper);
1131     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
1132     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
1133     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1134     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1135 
1136     // Simulate receiving LB response
1137     List<ServerEntry> backends1 = Arrays.asList(
1138         new ServerEntry("127.0.0.1", 2000, "token0001"),
1139         new ServerEntry("127.0.0.1", 2010, "token0002"));
1140     lbResponseObserver.onNext(buildInitialResponse());
1141     lbResponseObserver.onNext(buildLbResponse(backends1));
1142     assertEquals(2, mockSubchannels.size());
1143     Subchannel subchannel1 = mockSubchannels.poll();
1144     Subchannel subchannel2 = mockSubchannels.poll();
1145 
1146     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
1147     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
1148     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
1149 
1150     // Switch all subchannels to TRANSIENT_FAILURE, making the general state TRANSIENT_FAILURE too.
1151     Status error = Status.UNAVAILABLE.withDescription("error");
1152     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error));
1153     inOrder.verify(helper).refreshNameResolution();
1154     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forTransientFailure(error));
1155     inOrder.verify(helper).refreshNameResolution();
1156     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
1157     assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList)
1158         .containsExactly(new ErrorEntry(error));
1159 
1160     // Switch subchannel1 to IDLE, then to CONNECTING, which are ignored since the previous
1161     // subchannel state is TRANSIENT_FAILURE. General state is unchanged.
1162     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
1163     inOrder.verify(helper).refreshNameResolution();
1164     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
1165     inOrder.verifyNoMoreInteractions();
1166 
1167     // Switch subchannel1 to READY, which will affect the general state
1168     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
1169     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1170     assertThat(((RoundRobinPicker) pickerCaptor.getValue()).pickList)
1171         .containsExactly(new BackendEntry(subchannel1, getLoadRecorder(), "token0001"));
1172     inOrder.verifyNoMoreInteractions();
1173   }
1174 
1175   @Test
grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires()1176   public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() {
1177     subtestGrpclbFallbackTimeout(false, GrpclbState.FALLBACK_TIMEOUT_MS);
1178   }
1179 
1180   @Test
grpclbFallback_initialTimeout_timerExpires()1181   public void grpclbFallback_initialTimeout_timerExpires() {
1182     subtestGrpclbFallbackTimeout(true, GrpclbState.FALLBACK_TIMEOUT_MS);
1183   }
1184 
1185   @Test
grpclbFallback_timeout_serverListReceivedBeforeTimerExpires()1186   public void grpclbFallback_timeout_serverListReceivedBeforeTimerExpires() {
1187     subtestGrpclbFallbackTimeout(false, 12345);
1188   }
1189 
1190   @Test
grpclbFallback_timeout_timerExpires()1191   public void grpclbFallback_timeout_timerExpires() {
1192     subtestGrpclbFallbackTimeout(true, 12345);
1193   }
1194 
1195   // Fallback or not within the period of the initial timeout.
subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout)1196   private void subtestGrpclbFallbackTimeout(boolean timerExpires, long timeout) {
1197     long loadReportIntervalMillis = 1983;
1198     InOrder inOrder = inOrder(helper, subchannelPool);
1199 
1200     // Create balancer and backend addresses
1201     List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
1202     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
1203     deliverResolvedAddresses(
1204         backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));
1205     inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
1206         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
1207 
1208     // Attempted to connect to balancer
1209     assertEquals(1, fakeOobChannels.size());
1210     ManagedChannel oobChannel = fakeOobChannels.poll();
1211     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1212     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1213     assertEquals(1, lbRequestObservers.size());
1214     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1215 
1216     verify(lbRequestObserver).onNext(
1217         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1218             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1219             .build()));
1220     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1221     // We don't care if these methods have been run.
1222     inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
1223     inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
1224 
1225     inOrder.verifyNoMoreInteractions();
1226 
1227     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1228     fakeClock.forwardTime(timeout - 1, TimeUnit.MILLISECONDS);
1229     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1230 
1231     //////////////////////////////////
1232     // Fallback timer expires (or not)
1233     //////////////////////////////////
1234     if (timerExpires) {
1235       logs.clear();
1236       fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
1237 
1238       assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1239       assertThat(logs)
1240           .containsExactly("INFO: [grpclb-<api.google.com>] Using fallback backends")
1241           .inOrder();
1242 
1243       // Fall back to the backends from resolver
1244       fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
1245 
1246       assertFalse(oobChannel.isShutdown());
1247       verify(lbRequestObserver, never()).onCompleted();
1248     }
1249 
1250     //////////////////////////////////////////////////////////////////////
1251     // Name resolver sends new resolution results without any backend addr
1252     //////////////////////////////////////////////////////////////////////
1253     grpclbBalancerList = createResolvedBalancerAddresses(2);
1254     deliverResolvedAddresses(
1255         Collections.<EquivalentAddressGroup>emptyList(),
1256         grpclbBalancerList,
1257         GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));
1258 
1259     // New addresses are updated to the OobChannel
1260     inOrder.verify(helper).updateOobChannelAddresses(
1261         same(oobChannel), eq(xattr(grpclbBalancerList)));
1262 
1263     if (timerExpires) {
1264       // Still in fallback logic, except that the backend list is empty
1265       for (Subchannel subchannel : mockSubchannels) {
1266         verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
1267       }
1268 
1269       // RPC error status includes message of balancer RPC timeout
1270       inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
1271       PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
1272       assertThat(result.getStatus().getCode())
1273           .isEqualTo(Code.UNAVAILABLE);
1274       assertThat(result.getStatus().getDescription())
1275           .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription());
1276       assertThat(result.getStatus().getDescription())
1277           .contains(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription());
1278     }
1279 
1280     ////////////////////////////////////////////////////////////////
1281     // Name resolver sends new resolution results with backend addrs
1282     ////////////////////////////////////////////////////////////////
1283     // prevents the cached subchannel to be used
1284     subchannelPool.clear();
1285     backendList = createResolvedBackendAddresses(2);
1286     grpclbBalancerList = createResolvedBalancerAddresses(1);
1287     deliverResolvedAddresses(
1288         backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));
1289 
1290     // New LB address is updated to the OobChannel
1291     inOrder.verify(helper).updateOobChannelAddresses(
1292         same(oobChannel), eq(xattr(grpclbBalancerList)));
1293 
1294     if (timerExpires) {
1295       // New backend addresses are used for fallback
1296       fallbackTestVerifyUseOfFallbackBackendLists(
1297           inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
1298     }
1299 
1300     ////////////////////////////////////////////////
1301     // Break the LB stream after the timer expires
1302     ////////////////////////////////////////////////
1303     if (timerExpires) {
1304       Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
1305       lbResponseObserver.onError(streamError.asException());
1306 
1307       // The error will NOT propagate to picker because fallback list is in use.
1308       inOrder.verify(helper, never())
1309           .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1310       // A new stream is created
1311       verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
1312       lbResponseObserver = lbResponseObserverCaptor.getValue();
1313       assertEquals(1, lbRequestObservers.size());
1314       lbRequestObserver = lbRequestObservers.poll();
1315       verify(lbRequestObserver).onNext(
1316           eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1317               InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1318               .build()));
1319     }
1320 
1321     /////////////////////////////////
1322     // Balancer returns a server list
1323     /////////////////////////////////
1324     List<ServerEntry> serverList = Arrays.asList(
1325         new ServerEntry("127.0.0.1", 2000, "token0001"),
1326         new ServerEntry("127.0.0.1", 2010, "token0002"));
1327     lbResponseObserver.onNext(buildInitialResponse());
1328     lbResponseObserver.onNext(buildLbResponse(serverList));
1329 
1330     // Balancer-provided server list now in effect
1331     fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
1332 
1333     ///////////////////////////////////////////////////////////////
1334     // New backend addresses from resolver outside of fallback mode
1335     ///////////////////////////////////////////////////////////////
1336     backendList = createResolvedBackendAddresses(1);
1337     grpclbBalancerList = createResolvedBalancerAddresses(1);
1338     deliverResolvedAddresses(
1339         backendList, grpclbBalancerList, GrpclbConfig.create(Mode.ROUND_ROBIN, null, timeout));
1340     // Will not affect the round robin list at all
1341     inOrder.verify(helper, never())
1342         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1343 
1344     // No fallback timeout timer scheduled.
1345     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1346   }
1347 
1348   @Test
grpclbFallback_breakLbStreamBeforeFallbackTimerExpires()1349   public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
1350     long loadReportIntervalMillis = 1983;
1351     InOrder inOrder = inOrder(helper, subchannelPool);
1352 
1353     // Create balancer and backend addresses
1354     List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
1355     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
1356     deliverResolvedAddresses(backendList, grpclbBalancerList);
1357 
1358     inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
1359         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
1360 
1361     // Attempted to connect to balancer
1362     assertThat(fakeOobChannels).hasSize(1);
1363     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1364     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1365     assertThat(lbRequestObservers).hasSize(1);
1366     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1367 
1368     verify(lbRequestObserver).onNext(
1369         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1370             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1371             .build()));
1372     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1373     // We don't care if these methods have been run.
1374     inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
1375     inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
1376 
1377     inOrder.verifyNoMoreInteractions();
1378 
1379     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1380 
1381     /////////////////////////////////////////////
1382     // Break the LB stream before timer expires
1383     /////////////////////////////////////////////
1384     Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
1385     lbResponseObserver.onError(streamError.asException());
1386 
1387     // Fallback time has been short-circuited
1388     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1389 
1390     // Fall back to the backends from resolver
1391     fallbackTestVerifyUseOfFallbackBackendLists(
1392         inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
1393 
1394     // A new stream is created
1395     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
1396     assertThat(lbRequestObservers).hasSize(1);
1397     lbRequestObserver = lbRequestObservers.poll();
1398     verify(lbRequestObserver).onNext(
1399         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1400             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1401             .build()));
1402 
1403     //////////////////////////////////////////////////////////////////////
1404     // Name resolver sends new resolution results without any backend addr
1405     //////////////////////////////////////////////////////////////////////
1406     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
1407 
1408     // Still in fallback logic, except that the backend list is empty
1409     for (Subchannel subchannel : mockSubchannels) {
1410       verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
1411     }
1412 
1413     // RPC error status includes error of balancer stream
1414     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
1415     PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
1416     assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
1417     assertThat(result.getStatus().getDescription())
1418         .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription());
1419     assertThat(result.getStatus().getDescription()).contains(streamError.getDescription());
1420   }
1421 
1422   @Test
grpclbFallback_noBalancerAddress()1423   public void grpclbFallback_noBalancerAddress() {
1424     InOrder inOrder = inOrder(helper, subchannelPool);
1425 
1426     // Create 5 distinct backends
1427     List<EquivalentAddressGroup> backends = createResolvedBackendAddresses(5);
1428 
1429     // Name resolver gives the first two backend addresses
1430     List<EquivalentAddressGroup> backendList = backends.subList(0, 2);
1431     deliverResolvedAddresses(backendList, Collections.<EquivalentAddressGroup>emptyList());
1432 
1433     assertThat(logs).contains("INFO: [grpclb-<api.google.com>] Using fallback backends");
1434 
1435     // Fall back to the backends from resolver
1436     fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
1437 
1438     // No fallback timeout timer scheduled.
1439     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1440     verify(helper, never())
1441         .createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(), anyString());
1442     logs.clear();
1443 
1444     /////////////////////////////////////////////////////////////////////////////////////////
1445     // Name resolver sends new resolution results with new backend addr but no balancer addr
1446     /////////////////////////////////////////////////////////////////////////////////////////
1447     // Name resolver then gives the last three backends
1448     backendList = backends.subList(2, 5);
1449     deliverResolvedAddresses(backendList, Collections.<EquivalentAddressGroup>emptyList());
1450 
1451     assertThat(logs).contains("INFO: [grpclb-<api.google.com>] Using fallback backends");
1452 
1453     // Shift to use updated backends
1454     fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
1455     logs.clear();
1456 
1457     ///////////////////////////////////////////////////////////////////////////////////////
1458     // Name resolver sends new resolution results without any backend addr or balancer addr
1459     ///////////////////////////////////////////////////////////////////////////////////////
1460     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
1461         Collections.<EquivalentAddressGroup>emptyList());
1462     assertThat(logs).containsExactly(
1463         "DEBUG: [grpclb-<api.google.com>] Error: Status{code=UNAVAILABLE, "
1464             + "description=No backend or balancer addresses found, cause=null}");
1465 
1466     // Keep using existing fallback addresses without interruption
1467     for (Subchannel subchannel : mockSubchannels) {
1468       verify(subchannelPool, never())
1469           .returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
1470     }
1471     verify(helper, never())
1472         .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
1473   }
1474 
1475   /**
1476    * A test for a situation where we first only get backend addresses resolved and then in a
1477    * later name resolution get both backend and load balancer addresses. The first instance
1478    * will switch us to using fallback backends and it is important that in the second instance
1479    * we do not start a fallback timer as it will fail when it triggers if the fallback backends
1480    * are already in use.
1481    */
1482   @Test
grpclbFallback_noTimerWhenAlreadyInFallback()1483   public void grpclbFallback_noTimerWhenAlreadyInFallback() {
1484     // Initially we only get backend addresses without any LB ones. This should get us to use
1485     // fallback backends from the start as we won't be able to even talk to the load balancer.
1486     // No fallback timer would be started as we already started to use fallback backends.
1487     deliverResolvedAddresses(createResolvedBalancerAddresses(1),
1488         Collections.<EquivalentAddressGroup>emptyList());
1489     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1490 
1491     // Later a new name resolution call happens and we get both backend and LB addresses. Since we
1492     // are already operating with fallback backends a fallback timer should not be started to move
1493     // us to fallback mode.
1494     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(),
1495         createResolvedBalancerAddresses(1));
1496 
1497     // If a fallback timer is started it will eventually throw an exception when it tries to switch
1498     // us to using fallback backends when we already are using them.
1499     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1500   }
1501 
1502   @Test
grpclbFallback_balancerLost()1503   public void grpclbFallback_balancerLost() {
1504     subtestGrpclbFallbackConnectionLost(true, false);
1505   }
1506 
1507   @Test
grpclbFallback_subchannelsLost()1508   public void grpclbFallback_subchannelsLost() {
1509     subtestGrpclbFallbackConnectionLost(false, true);
1510   }
1511 
1512   @Test
grpclbFallback_allLost()1513   public void grpclbFallback_allLost() {
1514     subtestGrpclbFallbackConnectionLost(true, true);
1515   }
1516 
1517   // Fallback outside of the initial timeout, where all connections are lost.
subtestGrpclbFallbackConnectionLost( boolean balancerBroken, boolean allSubchannelsBroken)1518   private void subtestGrpclbFallbackConnectionLost(
1519       boolean balancerBroken, boolean allSubchannelsBroken) {
1520     long loadReportIntervalMillis = 1983;
1521     InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);
1522 
1523     // Create balancer and backend addresses
1524     List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
1525     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
1526     deliverResolvedAddresses(backendList, grpclbBalancerList);
1527 
1528     inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
1529         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
1530 
1531     // Attempted to connect to balancer
1532     assertEquals(1, fakeOobChannels.size());
1533     fakeOobChannels.poll();
1534     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1535     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1536     assertEquals(1, lbRequestObservers.size());
1537     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1538 
1539     verify(lbRequestObserver).onNext(
1540         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1541             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1542             .build()));
1543     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1544     // We don't care if these methods have been run.
1545     inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
1546     inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
1547 
1548     inOrder.verifyNoMoreInteractions();
1549 
1550     // Balancer returns a server list
1551     List<ServerEntry> serverList = Arrays.asList(
1552         new ServerEntry("127.0.0.1", 2000, "token0001"),
1553         new ServerEntry("127.0.0.1", 2010, "token0002"));
1554     lbResponseObserver.onNext(buildInitialResponse());
1555     lbResponseObserver.onNext(buildLbResponse(serverList));
1556 
1557     List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
1558 
1559     // Break connections
1560     if (balancerBroken) {
1561       lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1562       // A new stream to LB is created
1563       inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1564       lbResponseObserver = lbResponseObserverCaptor.getValue();
1565       assertEquals(1, lbRequestObservers.size());
1566       lbRequestObserver = lbRequestObservers.poll();
1567       inOrder.verify(helper).refreshNameResolution();
1568     }
1569     if (allSubchannelsBroken) {
1570       for (Subchannel subchannel : subchannels) {
1571         // A READY subchannel transits to IDLE when receiving a go-away
1572         deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
1573         inOrder.verify(helper).refreshNameResolution();
1574       }
1575     }
1576 
1577     if (balancerBroken && allSubchannelsBroken) {
1578       // Going into fallback
1579       subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
1580           inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
1581 
1582       // When in fallback mode, fallback timer should not be scheduled when all backend
1583       // connections are lost
1584       for (Subchannel subchannel : subchannels) {
1585         deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
1586         inOrder.verify(helper).refreshNameResolution();
1587       }
1588 
1589       // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
1590       List<ServerEntry> serverList2 = Arrays.asList(
1591           new ServerEntry("127.0.0.1", 2001, "token0003"),
1592           new ServerEntry("127.0.0.1", 2011, "token0004"));
1593       lbResponseObserver.onNext(buildInitialResponse());
1594       lbResponseObserver.onNext(buildLbResponse(serverList2));
1595 
1596       fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList2);
1597     }
1598     assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
1599 
1600     // No subchannel to fallback backends should have been created if no fallback happened
1601     if (!(balancerBroken && allSubchannelsBroken)) {
1602       verify(subchannelPool, never()).takeOrCreateSubchannel(
1603           eq(backendList.get(0)), any(Attributes.class));
1604       verify(subchannelPool, never()).takeOrCreateSubchannel(
1605           eq(backendList.get(1)), any(Attributes.class));
1606     }
1607   }
1608 
1609   @Test
grpclbFallback_allLost_failToFallback()1610   public void grpclbFallback_allLost_failToFallback() {
1611     long loadReportIntervalMillis = 1983;
1612     InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);
1613 
1614     // Create balancer and (empty) backend addresses
1615     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
1616     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
1617 
1618     inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
1619         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
1620 
1621     // Attempted to connect to balancer
1622     assertEquals(1, fakeOobChannels.size());
1623     fakeOobChannels.poll();
1624     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1625     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1626     assertEquals(1, lbRequestObservers.size());
1627     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1628 
1629     verify(lbRequestObserver).onNext(
1630         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1631             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1632             .build()));
1633     lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
1634     // We don't care if these methods have been run.
1635     inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
1636     inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
1637 
1638     inOrder.verifyNoMoreInteractions();
1639 
1640     // Balancer returns a server list
1641     List<ServerEntry> serverList = Arrays.asList(
1642         new ServerEntry("127.0.0.1", 2000, "token0001"),
1643         new ServerEntry("127.0.0.1", 2010, "token0002"));
1644     lbResponseObserver.onNext(buildInitialResponse());
1645     lbResponseObserver.onNext(buildLbResponse(serverList));
1646 
1647     List<Subchannel> subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList);
1648 
1649     // Break connections
1650     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1651     // A new stream to LB is created
1652     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1653     assertEquals(1, lbRequestObservers.size());
1654 
1655     // Break all subchannel connections
1656     Status error = Status.UNAUTHENTICATED.withDescription("Permission denied");
1657     for (Subchannel subchannel : subchannels) {
1658       deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
1659     }
1660 
1661     // Recycle all subchannels
1662     for (Subchannel subchannel : subchannels) {
1663       verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
1664     }
1665 
1666     // RPC error status includes errors of subchannels to balancer-provided backends
1667     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
1668     PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
1669     assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
1670     assertThat(result.getStatus().getDescription())
1671         .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription());
1672     assertThat(result.getStatus().getDescription()).contains(error.getDescription());
1673   }
1674 
fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs)1675   private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists(
1676       InOrder inOrder, List<EquivalentAddressGroup> addrs) {
1677     return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null);
1678   }
1679 
fallbackTestVerifyUseOfBalancerBackendLists( InOrder inOrder, List<ServerEntry> servers)1680   private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists(
1681       InOrder inOrder, List<ServerEntry> servers) {
1682     ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>();
1683     ArrayList<String> tokens = new ArrayList<>();
1684     for (ServerEntry server : servers) {
1685       addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS));
1686       tokens.add(server.token);
1687     }
1688     return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens);
1689   }
1690 
fallbackTestVerifyUseOfBackendLists( InOrder inOrder, List<EquivalentAddressGroup> addrs, @Nullable List<String> tokens)1691   private List<Subchannel> fallbackTestVerifyUseOfBackendLists(
1692       InOrder inOrder, List<EquivalentAddressGroup> addrs,
1693       @Nullable List<String> tokens) {
1694     if (tokens != null) {
1695       assertEquals(addrs.size(), tokens.size());
1696     }
1697     for (EquivalentAddressGroup addr : addrs) {
1698       inOrder.verify(subchannelPool).takeOrCreateSubchannel(eq(addr), any(Attributes.class));
1699     }
1700     RoundRobinPicker picker = (RoundRobinPicker) currentPicker;
1701     assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
1702     assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY);
1703     assertEquals(addrs.size(), mockSubchannels.size());
1704     ArrayList<Subchannel> subchannels = new ArrayList<>(mockSubchannels);
1705     mockSubchannels.clear();
1706     for (Subchannel subchannel : subchannels) {
1707       deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
1708     }
1709     inOrder.verify(helper, atLeast(0))
1710         .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
1711     inOrder.verify(helper, never())
1712         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1713 
1714     ArrayList<BackendEntry> pickList = new ArrayList<>();
1715     for (int i = 0; i < addrs.size(); i++) {
1716       Subchannel subchannel = subchannels.get(i);
1717       BackendEntry backend;
1718       if (tokens == null) {
1719         backend = new BackendEntry(subchannel);
1720       } else {
1721         backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i));
1722       }
1723       pickList.add(backend);
1724       deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
1725       inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1726       picker = (RoundRobinPicker) pickerCaptor.getValue();
1727       assertThat(picker.dropList)
1728           .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
1729       assertThat(picker.pickList).containsExactlyElementsIn(pickList);
1730       inOrder.verify(helper, never())
1731           .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1732     }
1733     return subchannels;
1734   }
1735 
1736   @Test
grpclbMultipleAuthorities()1737   public void grpclbMultipleAuthorities() throws Exception {
1738     List<EquivalentAddressGroup> backendList = Collections.singletonList(
1739         new EquivalentAddressGroup(new FakeSocketAddress("not-a-lb-address")));
1740     List<EquivalentAddressGroup> grpclbBalancerList = Arrays.asList(
1741         new EquivalentAddressGroup(
1742             new FakeSocketAddress("fake-address-1"),
1743             lbAttributes("fake-authority-1")),
1744         new EquivalentAddressGroup(
1745             new FakeSocketAddress("fake-address-2"),
1746             lbAttributes("fake-authority-2")),
1747         new EquivalentAddressGroup(
1748             new FakeSocketAddress("fake-address-3"),
1749             lbAttributes("fake-authority-1").toBuilder()
1750                 .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value").build()
1751             ));
1752     deliverResolvedAddresses(backendList, grpclbBalancerList);
1753 
1754     List<EquivalentAddressGroup> goldenOobEagList =
1755         Arrays.asList(
1756             new EquivalentAddressGroup(
1757                 new FakeSocketAddress("fake-address-1"),
1758                 Attributes.newBuilder()
1759                     .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1")
1760                     .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1")
1761                     .build()),
1762             new EquivalentAddressGroup(
1763                 new FakeSocketAddress("fake-address-2"),
1764                 Attributes.newBuilder()
1765                     .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-2")
1766                     .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-2")
1767                     .build()),
1768             new EquivalentAddressGroup(
1769                 new FakeSocketAddress("fake-address-3"),
1770                 Attributes.newBuilder()
1771                     .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, "fake-authority-1")
1772                     .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "value")
1773                     .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, "fake-authority-1")
1774                     .build()
1775             ));
1776 
1777     verify(helper).createOobChannel(eq(goldenOobEagList),
1778         eq("fake-authority-1" + NO_USE_AUTHORITY_SUFFIX));
1779   }
1780 
1781   @Test
grpclbBalancerStreamClosedAndRetried()1782   public void grpclbBalancerStreamClosedAndRetried() throws Exception {
1783     LoadBalanceRequest expectedInitialRequest =
1784         LoadBalanceRequest.newBuilder()
1785             .setInitialRequest(
1786                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1787             .build();
1788     InOrder inOrder =
1789         inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper);
1790     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
1791     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
1792 
1793     assertEquals(1, fakeOobChannels.size());
1794     @SuppressWarnings("unused")
1795     ManagedChannel oobChannel = fakeOobChannels.poll();
1796 
1797     // First balancer RPC
1798     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1799     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1800     assertEquals(1, lbRequestObservers.size());
1801     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1802     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1803     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1804 
1805     // Balancer closes it immediately (erroneously)
1806     lbResponseObserver.onCompleted();
1807 
1808     // Will start backoff sequence 1 (10ns)
1809     inOrder.verify(backoffPolicyProvider).get();
1810     inOrder.verify(backoffPolicy1).nextBackoffNanos();
1811     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1812     inOrder.verify(helper).refreshNameResolution();
1813 
1814     // Fast-forward to a moment before the retry
1815     fakeClock.forwardNanos(9);
1816     verifyNoMoreInteractions(mockLbService);
1817     // Then time for retry
1818     fakeClock.forwardNanos(1);
1819     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1820     lbResponseObserver = lbResponseObserverCaptor.getValue();
1821     assertEquals(1, lbRequestObservers.size());
1822     lbRequestObserver = lbRequestObservers.poll();
1823     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1824     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1825 
1826     // Balancer closes it with an error.
1827     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1828     // Will continue the backoff sequence 1 (100ns)
1829     verifyNoMoreInteractions(backoffPolicyProvider);
1830     inOrder.verify(backoffPolicy1).nextBackoffNanos();
1831     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1832     inOrder.verify(helper).refreshNameResolution();
1833 
1834     // Fast-forward to a moment before the retry
1835     fakeClock.forwardNanos(100 - 1);
1836     verifyNoMoreInteractions(mockLbService);
1837     // Then time for retry
1838     fakeClock.forwardNanos(1);
1839     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1840     lbResponseObserver = lbResponseObserverCaptor.getValue();
1841     assertEquals(1, lbRequestObservers.size());
1842     lbRequestObserver = lbRequestObservers.poll();
1843     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1844     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1845 
1846     // Balancer sends initial response.
1847     lbResponseObserver.onNext(buildInitialResponse());
1848 
1849     // Then breaks the RPC
1850     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1851 
1852     // Will reset the retry sequence and retry immediately, because balancer has responded.
1853     inOrder.verify(backoffPolicyProvider).get();
1854     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1855     lbResponseObserver = lbResponseObserverCaptor.getValue();
1856     assertEquals(1, lbRequestObservers.size());
1857     lbRequestObserver = lbRequestObservers.poll();
1858     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1859     inOrder.verify(helper).refreshNameResolution();
1860 
1861     // Fail the retry after spending 4ns
1862     fakeClock.forwardNanos(4);
1863     lbResponseObserver.onError(Status.UNAVAILABLE.asException());
1864 
1865     // Will be on the first retry (10ns) of backoff sequence 2.
1866     inOrder.verify(backoffPolicy2).nextBackoffNanos();
1867     assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1868     inOrder.verify(helper).refreshNameResolution();
1869 
1870     // Fast-forward to a moment before the retry, the time spent in the last try is deducted.
1871     fakeClock.forwardNanos(10 - 4 - 1);
1872     verifyNoMoreInteractions(mockLbService);
1873     // Then time for retry
1874     fakeClock.forwardNanos(1);
1875     inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1876     assertEquals(1, lbRequestObservers.size());
1877     lbRequestObserver = lbRequestObservers.poll();
1878     verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
1879     assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
1880 
1881     // Wrapping up
1882     verify(backoffPolicyProvider, times(2)).get();
1883     verify(backoffPolicy1, times(2)).nextBackoffNanos();
1884     verify(backoffPolicy2, times(1)).nextBackoffNanos();
1885     verify(helper, times(4)).refreshNameResolution();
1886   }
1887 
1888   @Test
grpclbWorking_pickFirstMode()1889   public void grpclbWorking_pickFirstMode() throws Exception {
1890     InOrder inOrder = inOrder(helper);
1891 
1892     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
1893 
1894     deliverResolvedAddresses(
1895         Collections.<EquivalentAddressGroup>emptyList(),
1896         grpclbBalancerList,
1897         GrpclbConfig.create(Mode.PICK_FIRST));
1898 
1899     assertEquals(1, fakeOobChannels.size());
1900     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
1901     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
1902     assertEquals(1, lbRequestObservers.size());
1903     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
1904     verify(lbRequestObserver).onNext(
1905         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
1906             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
1907             .build()));
1908 
1909     // Simulate receiving LB response
1910     List<ServerEntry> backends1 = Arrays.asList(
1911         new ServerEntry("127.0.0.1", 2000, "token0001"),
1912         new ServerEntry("127.0.0.1", 2010, "token0002"));
1913     inOrder.verify(helper, never())
1914         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1915     lbResponseObserver.onNext(buildInitialResponse());
1916     lbResponseObserver.onNext(buildLbResponse(backends1));
1917 
1918     inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
1919     CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
1920     assertThat(createSubchannelArgs.getAddresses())
1921         .containsExactly(
1922             new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
1923             new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
1924 
1925     // Initially IDLE
1926     inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
1927     RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
1928 
1929     // Only one subchannel is created
1930     assertThat(mockSubchannels).hasSize(1);
1931     Subchannel subchannel = mockSubchannels.poll();
1932     assertThat(picker0.dropList).containsExactly(null, null);
1933     assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
1934 
1935     // PICK_FIRST doesn't eagerly connect
1936     verify(subchannel, never()).requestConnection();
1937 
1938     // CONNECTING
1939     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
1940     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
1941     RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
1942     assertThat(picker1.dropList).containsExactly(null, null);
1943     assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY);
1944 
1945     // TRANSIENT_FAILURE
1946     Status error = Status.UNAVAILABLE.withDescription("Simulated connection error");
1947     deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
1948     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
1949     RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
1950     assertThat(picker2.dropList).containsExactly(null, null);
1951     assertThat(picker2.pickList).containsExactly(new ErrorEntry(error));
1952 
1953     // READY
1954     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
1955     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1956     RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
1957     assertThat(picker3.dropList).containsExactly(null, null);
1958     assertThat(picker3.pickList).containsExactly(
1959         new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
1960 
1961 
1962     // New server list with drops
1963     List<ServerEntry> backends2 = Arrays.asList(
1964         new ServerEntry("127.0.0.1", 2000, "token0001"),
1965         new ServerEntry("token0003"),  // drop
1966         new ServerEntry("127.0.0.1", 2020, "token0004"));
1967     inOrder.verify(helper, never())
1968         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
1969     lbResponseObserver.onNext(buildLbResponse(backends2));
1970 
1971     // new addresses will be updated to the existing subchannel
1972     // createSubchannel() has ever been called only once
1973     verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class));
1974     assertThat(mockSubchannels).isEmpty();
1975     verify(subchannel).updateAddresses(
1976         eq(Arrays.asList(
1977             new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")),
1978             new EquivalentAddressGroup(backends2.get(2).addr,
1979                 eagAttrsWithToken("token0004")))));
1980     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
1981     RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
1982     assertThat(picker4.dropList).containsExactly(
1983         null, new DropEntry(getLoadRecorder(), "token0003"), null);
1984     assertThat(picker4.pickList).containsExactly(
1985         new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
1986 
1987     // Subchannel goes IDLE, but PICK_FIRST will not try to reconnect
1988     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
1989     inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
1990     RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue();
1991     verify(subchannel, never()).requestConnection();
1992 
1993     // ... until it's selected
1994     PickSubchannelArgs args = mock(PickSubchannelArgs.class);
1995     PickResult pick = picker5.pickSubchannel(args);
1996     assertThat(pick).isSameInstanceAs(PickResult.withNoResult());
1997     verify(subchannel).requestConnection();
1998 
1999     // ... or requested by application
2000     balancer.requestConnection();
2001     verify(subchannel, times(2)).requestConnection();
2002 
2003     // PICK_FIRST doesn't use subchannelPool
2004     verify(subchannelPool, never())
2005         .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
2006     verify(subchannelPool, never())
2007         .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
2008   }
2009 
2010   @Test
grpclbWorking_pickFirstMode_lbSendsEmptyAddress()2011   public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception {
2012     InOrder inOrder = inOrder(helper);
2013 
2014     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
2015     deliverResolvedAddresses(
2016         Collections.<EquivalentAddressGroup>emptyList(),
2017         grpclbBalancerList,
2018         GrpclbConfig.create(Mode.PICK_FIRST));
2019 
2020     assertEquals(1, fakeOobChannels.size());
2021     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2022     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2023     assertEquals(1, lbRequestObservers.size());
2024     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2025     verify(lbRequestObserver).onNext(
2026         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
2027             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2028             .build()));
2029 
2030     // Simulate receiving LB response
2031     List<ServerEntry> backends1 = Arrays.asList(
2032         new ServerEntry("127.0.0.1", 2000, "token0001"),
2033         new ServerEntry("127.0.0.1", 2010, "token0002"));
2034     inOrder.verify(helper, never())
2035         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2036     lbResponseObserver.onNext(buildInitialResponse());
2037     lbResponseObserver.onNext(buildLbResponse(backends1));
2038 
2039     inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
2040     CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
2041     assertThat(createSubchannelArgs.getAddresses())
2042         .containsExactly(
2043             new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
2044             new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
2045 
2046     // Initially IDLE
2047     inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
2048     RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
2049 
2050     // Only one subchannel is created
2051     assertThat(mockSubchannels).hasSize(1);
2052     Subchannel subchannel = mockSubchannels.poll();
2053     assertThat(picker0.dropList).containsExactly(null, null);
2054     assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
2055 
2056     // PICK_FIRST doesn't eagerly connect
2057     verify(subchannel, never()).requestConnection();
2058 
2059     // CONNECTING
2060     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
2061     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
2062     RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
2063     assertThat(picker1.dropList).containsExactly(null, null);
2064     assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY);
2065 
2066     // TRANSIENT_FAILURE
2067     Status error = Status.UNAVAILABLE.withDescription("Simulated connection error");
2068     deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
2069     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
2070     RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
2071     assertThat(picker2.dropList).containsExactly(null, null);
2072     assertThat(picker2.pickList).containsExactly(new ErrorEntry(error));
2073 
2074     // READY
2075     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
2076     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2077     RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
2078     assertThat(picker3.dropList).containsExactly(null, null);
2079     assertThat(picker3.pickList).containsExactly(
2080         new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
2081 
2082     inOrder.verify(helper, never())
2083         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2084 
2085     // Empty addresses from LB
2086     lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
2087 
2088     // new addresses will be updated to the existing subchannel
2089     // createSubchannel() has ever been called only once
2090     inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class));
2091     assertThat(mockSubchannels).isEmpty();
2092     verify(subchannel).shutdown();
2093 
2094     // RPC error status includes message of no backends provided by balancer
2095     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
2096     RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue();
2097     assertThat(errorPicker.pickList)
2098         .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS));
2099 
2100     lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
2101 
2102     // Test recover from new LB response with addresses
2103     // New server list with drops
2104     List<ServerEntry> backends2 = Arrays.asList(
2105         new ServerEntry("127.0.0.1", 2000, "token0001"),
2106         new ServerEntry("token0003"),  // drop
2107         new ServerEntry("127.0.0.1", 2020, "token0004"));
2108     inOrder.verify(helper, never())
2109         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2110     lbResponseObserver.onNext(buildLbResponse(backends2));
2111 
2112     // new addresses will be updated to the existing subchannel
2113     inOrder.verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class));
2114     inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
2115     subchannel = mockSubchannels.poll();
2116 
2117     // Subchannel became READY
2118     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
2119     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
2120     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2121     RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
2122     assertThat(picker4.pickList).containsExactly(
2123         new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
2124   }
2125 
2126   @Test
shutdownWithoutSubchannel_roundRobin()2127   public void shutdownWithoutSubchannel_roundRobin() throws Exception {
2128     subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.ROUND_ROBIN));
2129   }
2130 
2131   @Test
shutdownWithoutSubchannel_pickFirst()2132   public void shutdownWithoutSubchannel_pickFirst() throws Exception {
2133     subtestShutdownWithoutSubchannel(GrpclbConfig.create(Mode.PICK_FIRST));
2134   }
2135 
subtestShutdownWithoutSubchannel(GrpclbConfig grpclbConfig)2136   private void subtestShutdownWithoutSubchannel(GrpclbConfig grpclbConfig) {
2137     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
2138     deliverResolvedAddresses(
2139         Collections.<EquivalentAddressGroup>emptyList(),
2140         grpclbBalancerList,
2141         grpclbConfig);
2142     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2143     assertEquals(1, lbRequestObservers.size());
2144     StreamObserver<LoadBalanceRequest> requestObserver = lbRequestObservers.poll();
2145 
2146     verify(requestObserver, never()).onCompleted();
2147     balancer.shutdown();
2148     ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
2149     verify(requestObserver).onError(throwableCaptor.capture());
2150     assertThat(Status.fromThrowable(throwableCaptor.getValue()).getCode())
2151         .isEqualTo(Code.CANCELLED);
2152   }
2153 
2154   @Test
pickFirstMode_defaultTimeout_fallback()2155   public void pickFirstMode_defaultTimeout_fallback() throws Exception {
2156     pickFirstModeFallback(GrpclbState.FALLBACK_TIMEOUT_MS);
2157   }
2158 
2159   @Test
pickFirstMode_serviceConfigTimeout_fallback()2160   public void pickFirstMode_serviceConfigTimeout_fallback() throws Exception {
2161     pickFirstModeFallback(12345);
2162   }
2163 
pickFirstModeFallback(long timeout)2164   private void pickFirstModeFallback(long timeout) throws Exception {
2165     InOrder inOrder = inOrder(helper);
2166 
2167     // Name resolver returns balancer and backend addresses
2168     List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
2169     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
2170     deliverResolvedAddresses(
2171         backendList, grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST, null, timeout));
2172 
2173     // Attempted to connect to balancer
2174     assertEquals(1, fakeOobChannels.size());
2175     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2176     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2177     assertEquals(1, lbRequestObservers.size());
2178 
2179     // Fallback timer expires with no response
2180     fakeClock.forwardTime(timeout, TimeUnit.MILLISECONDS);
2181 
2182     // Entering fallback mode
2183     inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
2184     CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
2185     assertThat(createSubchannelArgs.getAddresses())
2186         .containsExactly(backendList.get(0), backendList.get(1));
2187 
2188     assertThat(mockSubchannels).hasSize(1);
2189     Subchannel subchannel = mockSubchannels.poll();
2190 
2191     // Initially IDLE
2192     inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
2193     RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
2194 
2195     // READY
2196     deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
2197     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2198     RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
2199     assertThat(picker1.dropList).containsExactly(null, null);
2200     assertThat(picker1.pickList).containsExactly(
2201         new BackendEntry(subchannel, new TokenAttachingTracerFactory(null)));
2202 
2203     assertThat(picker0.dropList).containsExactly(null, null);
2204     assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));
2205 
2206 
2207     // Finally, an LB response, which brings us out of fallback
2208     List<ServerEntry> backends1 = Arrays.asList(
2209         new ServerEntry("127.0.0.1", 2000, "token0001"),
2210         new ServerEntry("127.0.0.1", 2010, "token0002"));
2211     inOrder.verify(helper, never())
2212         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2213     lbResponseObserver.onNext(buildInitialResponse());
2214     lbResponseObserver.onNext(buildLbResponse(backends1));
2215 
2216     // new addresses will be updated to the existing subchannel
2217     // createSubchannel() has ever been called only once
2218     inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class));
2219     assertThat(mockSubchannels).isEmpty();
2220     verify(subchannel).updateAddresses(
2221         eq(Arrays.asList(
2222             new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
2223             new EquivalentAddressGroup(backends1.get(1).addr,
2224                 eagAttrsWithToken("token0002")))));
2225     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2226     RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
2227     assertThat(picker2.dropList).containsExactly(null, null);
2228     assertThat(picker2.pickList).containsExactly(
2229         new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
2230 
2231     // PICK_FIRST doesn't use subchannelPool
2232     verify(subchannelPool, never())
2233         .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
2234     verify(subchannelPool, never())
2235         .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
2236   }
2237 
2238   @Test
switchMode()2239   public void switchMode() throws Exception {
2240     InOrder inOrder = inOrder(helper);
2241 
2242     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
2243     deliverResolvedAddresses(
2244         Collections.<EquivalentAddressGroup>emptyList(),
2245         grpclbBalancerList,
2246         GrpclbConfig.create(Mode.ROUND_ROBIN));
2247 
2248     assertEquals(1, fakeOobChannels.size());
2249     ManagedChannel oobChannel = fakeOobChannels.poll();
2250     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2251     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2252     assertEquals(1, lbRequestObservers.size());
2253     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2254     verify(lbRequestObserver).onNext(
2255         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
2256             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2257             .build()));
2258 
2259     // Simulate receiving LB response
2260     List<ServerEntry> backends1 = Arrays.asList(
2261         new ServerEntry("127.0.0.1", 2000, "token0001"),
2262         new ServerEntry("127.0.0.1", 2010, "token0002"));
2263     inOrder.verify(helper, never())
2264         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2265     lbResponseObserver.onNext(buildInitialResponse());
2266     lbResponseObserver.onNext(buildLbResponse(backends1));
2267 
2268     // ROUND_ROBIN: create one subchannel per server
2269     verify(subchannelPool).takeOrCreateSubchannel(
2270         eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
2271         any(Attributes.class));
2272     verify(subchannelPool).takeOrCreateSubchannel(
2273         eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
2274         any(Attributes.class));
2275     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
2276     assertEquals(2, mockSubchannels.size());
2277     Subchannel subchannel1 = mockSubchannels.poll();
2278     Subchannel subchannel2 = mockSubchannels.poll();
2279     verify(subchannelPool, never())
2280         .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
2281 
2282     // Switch to PICK_FIRST
2283     deliverResolvedAddresses(
2284         Collections.<EquivalentAddressGroup>emptyList(),
2285         grpclbBalancerList, GrpclbConfig.create(Mode.PICK_FIRST));
2286 
2287 
2288     // GrpclbState will be shutdown, and a new one will be created
2289     assertThat(oobChannel.isShutdown()).isTrue();
2290     verify(subchannelPool)
2291         .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
2292     verify(subchannelPool)
2293         .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
2294 
2295     // A new LB stream is created
2296     assertEquals(1, fakeOobChannels.size());
2297     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
2298     lbResponseObserver = lbResponseObserverCaptor.getValue();
2299     assertEquals(1, lbRequestObservers.size());
2300     lbRequestObserver = lbRequestObservers.poll();
2301     verify(lbRequestObserver).onNext(
2302         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
2303             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2304             .build()));
2305 
2306     // Simulate receiving LB response
2307     inOrder.verify(helper, never())
2308         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2309     lbResponseObserver.onNext(buildInitialResponse());
2310     lbResponseObserver.onNext(buildLbResponse(backends1));
2311 
2312     // PICK_FIRST Subchannel
2313     inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
2314     CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
2315     assertThat(createSubchannelArgs.getAddresses())
2316         .containsExactly(
2317             new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
2318             new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
2319 
2320     inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
2321   }
2322 
eagAttrsWithToken(String token)2323   private static Attributes eagAttrsWithToken(String token) {
2324     return LB_BACKEND_ATTRS.toBuilder().set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, token).build();
2325   }
2326 
2327   @Test
switchMode_nullLbPolicy()2328   public void switchMode_nullLbPolicy() throws Exception {
2329     InOrder inOrder = inOrder(helper);
2330 
2331     final List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
2332     deliverResolvedAddresses(
2333         Collections.<EquivalentAddressGroup>emptyList(),
2334         grpclbBalancerList);
2335 
2336     assertEquals(1, fakeOobChannels.size());
2337     ManagedChannel oobChannel = fakeOobChannels.poll();
2338     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2339     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2340     assertEquals(1, lbRequestObservers.size());
2341     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2342     verify(lbRequestObserver).onNext(
2343         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
2344             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2345             .build()));
2346 
2347     // Simulate receiving LB response
2348     List<ServerEntry> backends1 = Arrays.asList(
2349         new ServerEntry("127.0.0.1", 2000, "token0001"),
2350         new ServerEntry("127.0.0.1", 2010, "token0002"));
2351     inOrder.verify(helper, never())
2352         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2353     lbResponseObserver.onNext(buildInitialResponse());
2354     lbResponseObserver.onNext(buildLbResponse(backends1));
2355 
2356     // ROUND_ROBIN: create one subchannel per server
2357     verify(subchannelPool).takeOrCreateSubchannel(
2358         eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
2359         any(Attributes.class));
2360     verify(subchannelPool).takeOrCreateSubchannel(
2361         eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
2362         any(Attributes.class));
2363     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
2364     assertEquals(2, mockSubchannels.size());
2365     Subchannel subchannel1 = mockSubchannels.poll();
2366     Subchannel subchannel2 = mockSubchannels.poll();
2367     verify(subchannelPool, never())
2368         .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
2369 
2370     // Switch to PICK_FIRST
2371     deliverResolvedAddresses(
2372         Collections.<EquivalentAddressGroup>emptyList(),
2373         grpclbBalancerList,
2374         GrpclbConfig.create(Mode.PICK_FIRST));
2375 
2376     // GrpclbState will be shutdown, and a new one will be created
2377     assertThat(oobChannel.isShutdown()).isTrue();
2378     verify(subchannelPool)
2379         .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
2380     verify(subchannelPool)
2381         .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
2382 
2383     // A new LB stream is created
2384     assertEquals(1, fakeOobChannels.size());
2385     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
2386     lbResponseObserver = lbResponseObserverCaptor.getValue();
2387     assertEquals(1, lbRequestObservers.size());
2388     lbRequestObserver = lbRequestObservers.poll();
2389     verify(lbRequestObserver).onNext(
2390         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
2391             InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2392             .build()));
2393 
2394     // Simulate receiving LB response
2395     inOrder.verify(helper, never())
2396         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2397     lbResponseObserver.onNext(buildInitialResponse());
2398     lbResponseObserver.onNext(buildLbResponse(backends1));
2399 
2400     // PICK_FIRST Subchannel
2401     inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
2402     CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
2403     assertThat(createSubchannelArgs.getAddresses())
2404         .containsExactly(
2405             new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
2406             new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")));
2407 
2408     inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
2409   }
2410 
2411   @Test
switchServiceName()2412   public void switchServiceName() throws Exception {
2413     InOrder inOrder = inOrder(helper);
2414 
2415     String serviceName = "foo.google.com";
2416     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
2417 
2418     deliverResolvedAddresses(
2419         Collections.<EquivalentAddressGroup>emptyList(),
2420         grpclbBalancerList,
2421         GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName, GrpclbState.FALLBACK_TIMEOUT_MS));
2422 
2423     assertEquals(1, fakeOobChannels.size());
2424     ManagedChannel oobChannel = fakeOobChannels.poll();
2425     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2426     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2427     assertEquals(1, lbRequestObservers.size());
2428     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2429     verify(lbRequestObserver).onNext(
2430         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
2431             InitialLoadBalanceRequest.newBuilder().setName(serviceName).build())
2432             .build()));
2433 
2434     // Simulate receiving LB response
2435     List<ServerEntry> backends1 = Arrays.asList(
2436         new ServerEntry("127.0.0.1", 2000, "token0001"),
2437         new ServerEntry("127.0.0.1", 2010, "token0002"));
2438     inOrder.verify(helper, never())
2439         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2440     lbResponseObserver.onNext(buildInitialResponse());
2441     lbResponseObserver.onNext(buildLbResponse(backends1));
2442 
2443     // ROUND_ROBIN: create one subchannel per server
2444     verify(subchannelPool).takeOrCreateSubchannel(
2445         eq(new EquivalentAddressGroup(backends1.get(0).addr, LB_BACKEND_ATTRS)),
2446         any(Attributes.class));
2447     verify(subchannelPool).takeOrCreateSubchannel(
2448         eq(new EquivalentAddressGroup(backends1.get(1).addr, LB_BACKEND_ATTRS)),
2449         any(Attributes.class));
2450     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
2451     assertEquals(2, mockSubchannels.size());
2452     Subchannel subchannel1 = mockSubchannels.poll();
2453     Subchannel subchannel2 = mockSubchannels.poll();
2454     verify(subchannelPool, never())
2455         .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
2456 
2457     // Switch to different serviceName
2458     serviceName = "bar.google.com";
2459     List<EquivalentAddressGroup> newGrpclbResolutionList = createResolvedBalancerAddresses(1);
2460     deliverResolvedAddresses(
2461         Collections.<EquivalentAddressGroup>emptyList(),
2462         newGrpclbResolutionList,
2463         GrpclbConfig.create(Mode.ROUND_ROBIN, serviceName, GrpclbState.FALLBACK_TIMEOUT_MS));
2464 
2465     // GrpclbState will be shutdown, and a new one will be created
2466     assertThat(oobChannel.isShutdown()).isTrue();
2467     verify(subchannelPool)
2468         .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
2469     verify(subchannelPool)
2470         .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
2471 
2472     assertEquals(1, fakeOobChannels.size());
2473     verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
2474     assertEquals(1, lbRequestObservers.size());
2475     lbRequestObserver = lbRequestObservers.poll();
2476     verify(lbRequestObserver).onNext(
2477         eq(LoadBalanceRequest.newBuilder().setInitialRequest(
2478             InitialLoadBalanceRequest.newBuilder().setName(serviceName).build())
2479             .build()));
2480   }
2481 
2482   @Test
grpclbWorking_lbSendsFallbackMessage()2483   public void grpclbWorking_lbSendsFallbackMessage() {
2484     InOrder inOrder = inOrder(helper, subchannelPool);
2485     List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
2486     List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2);
2487     deliverResolvedAddresses(backendList, grpclbBalancerList);
2488 
2489     // Fallback timer is started as soon as the addresses are resolved.
2490     assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
2491     verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
2492         eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
2493     assertEquals(1, fakeOobChannels.size());
2494     ManagedChannel oobChannel = fakeOobChannels.poll();
2495     verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2496     StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2497     assertEquals(1, lbRequestObservers.size());
2498     StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2499     verify(lbRequestObserver).onNext(
2500         eq(LoadBalanceRequest.newBuilder()
2501             .setInitialRequest(
2502                 InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2503             .build()));
2504 
2505     // Simulate receiving LB response
2506     ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001");
2507     ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002");
2508     List<ServerEntry> backends1 = Arrays.asList(backend1a, backend1b);
2509     inOrder.verify(helper, never())
2510         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2511     logs.clear();
2512     lbResponseObserver.onNext(buildInitialResponse());
2513     assertThat(logs).containsExactly(
2514         "INFO: [grpclb-<api.google.com>] Got an LB initial response: " + buildInitialResponse());
2515     logs.clear();
2516     lbResponseObserver.onNext(buildLbResponse(backends1));
2517 
2518     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
2519         eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)),
2520         any(Attributes.class));
2521     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
2522         eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)),
2523         any(Attributes.class));
2524 
2525     assertEquals(2, mockSubchannels.size());
2526     Subchannel subchannel1 = mockSubchannels.poll();
2527     Subchannel subchannel2 = mockSubchannels.poll();
2528 
2529     verify(subchannel1).requestConnection();
2530     verify(subchannel2).requestConnection();
2531     assertEquals(
2532         new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS),
2533         subchannel1.getAddresses());
2534     assertEquals(
2535         new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS),
2536         subchannel2.getAddresses());
2537 
2538     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING));
2539     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING));
2540 
2541     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
2542     RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();
2543     assertThat(picker0.dropList).containsExactly(null, null);
2544     assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY);
2545     inOrder.verifyNoMoreInteractions();
2546 
2547     assertThat(logs)
2548         .containsExactly(
2549             "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends1))
2550         .inOrder();
2551     logs.clear();
2552 
2553     // Let subchannels be connected
2554     deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
2555     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2556 
2557     RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
2558 
2559     assertThat(picker1.dropList).containsExactly(null, null);
2560     assertThat(picker1.pickList).containsExactly(
2561         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"));
2562 
2563     deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
2564     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2565 
2566     RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
2567     assertThat(picker2.dropList).containsExactly(null, null);
2568     assertThat(picker2.pickList).containsExactly(
2569         new BackendEntry(subchannel1, getLoadRecorder(), "token0001"),
2570         new BackendEntry(subchannel2, getLoadRecorder(), "token0002"))
2571         .inOrder();
2572 
2573     // Balancer forces entering fallback mode
2574     lbResponseObserver.onNext(buildLbFallbackResponse());
2575 
2576     // existing subchannels must be returned immediately to gracefully shutdown.
2577     verify(subchannelPool)
2578         .returnSubchannel(eq(subchannel1), eq(ConnectivityStateInfo.forNonError(READY)));
2579     verify(subchannelPool)
2580         .returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY)));
2581 
2582     // verify fallback
2583     fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
2584 
2585     assertFalse(oobChannel.isShutdown());
2586     verify(lbRequestObserver, never()).onCompleted();
2587 
2588     //////////////////////////////////////////////////////////////////////
2589     // Name resolver sends new resolution results without any backend addr
2590     //////////////////////////////////////////////////////////////////////
2591     deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
2592 
2593     // Still in fallback logic, except that the backend list is empty
2594     for (Subchannel subchannel : mockSubchannels) {
2595       verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class));
2596     }
2597 
2598     // RPC error status includes message of fallback requested by balancer
2599     inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
2600     PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
2601     assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
2602     assertThat(result.getStatus().getDescription())
2603         .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription());
2604     assertThat(result.getStatus().getDescription())
2605         .contains(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription());
2606 
2607     // exit fall back by providing two new backends
2608     ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001");
2609     ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002");
2610     List<ServerEntry> backends2 = Arrays.asList(backend2a, backend2b);
2611 
2612     inOrder.verify(helper, never())
2613         .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
2614     logs.clear();
2615     lbResponseObserver.onNext(buildLbResponse(backends2));
2616 
2617     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
2618         eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)),
2619         any(Attributes.class));
2620     inOrder.verify(subchannelPool).takeOrCreateSubchannel(
2621         eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)),
2622         any(Attributes.class));
2623 
2624     assertEquals(2, mockSubchannels.size());
2625     Subchannel subchannel3 = mockSubchannels.poll();
2626     Subchannel subchannel4 = mockSubchannels.poll();
2627     verify(subchannel3).requestConnection();
2628     verify(subchannel4).requestConnection();
2629     assertEquals(
2630         new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS),
2631         subchannel3.getAddresses());
2632     assertEquals(
2633         new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS),
2634         subchannel4.getAddresses());
2635 
2636     deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING));
2637     deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING));
2638 
2639     inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
2640     RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue();
2641     assertThat(picker6.dropList).containsExactly(null, null);
2642     assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY);
2643     inOrder.verifyNoMoreInteractions();
2644 
2645     assertThat(logs)
2646         .containsExactly(
2647             "DEBUG: [grpclb-<api.google.com>] Got an LB response: " + buildLbResponse(backends2))
2648         .inOrder();
2649     logs.clear();
2650 
2651     // Let new subchannels be connected
2652     deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
2653     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2654 
2655     RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
2656     assertThat(picker3.dropList).containsExactly(null, null);
2657     assertThat(picker3.pickList).containsExactly(
2658         new BackendEntry(subchannel3, getLoadRecorder(), "token1001"));
2659 
2660     deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY));
2661     inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
2662 
2663     RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
2664     assertThat(picker4.dropList).containsExactly(null, null);
2665     assertThat(picker4.pickList).containsExactly(
2666         new BackendEntry(subchannel3, getLoadRecorder(), "token1001"),
2667         new BackendEntry(subchannel4, getLoadRecorder(), "token1002"))
2668         .inOrder();
2669   }
2670 
2671   @Test
useIndependentRpcContext()2672   public void useIndependentRpcContext() {
2673     // Simulates making RPCs within the context of an inbound RPC.
2674     CancellableContext cancellableContext = Context.current().withCancellation();
2675     Context prevContext = cancellableContext.attach();
2676     try {
2677       List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
2678       List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2);
2679       deliverResolvedAddresses(backendList, grpclbBalancerList);
2680 
2681       verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)),
2682           eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX));
2683       verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
2684       StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
2685       assertEquals(1, lbRequestObservers.size());
2686       StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
2687       verify(lbRequestObserver).onNext(
2688           eq(LoadBalanceRequest.newBuilder()
2689               .setInitialRequest(
2690                   InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
2691               .build()));
2692       lbResponseObserver.onNext(buildInitialResponse());
2693 
2694       // The inbound RPC finishes and closes its context. The outbound RPC's control plane RPC
2695       // should not be impacted (no retry).
2696       cancellableContext.close();
2697       assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
2698       verifyNoMoreInteractions(mockLbService);
2699     } finally {
2700       cancellableContext.detach(prevContext);
2701     }
2702   }
2703 
deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState)2704   private void deliverSubchannelState(
2705       final Subchannel subchannel, final ConnectivityStateInfo newState) {
2706     ((FakeSubchannel) subchannel).updateState(newState);
2707   }
2708 
deliverNameResolutionError(final Status error)2709   private void deliverNameResolutionError(final Status error) {
2710     syncContext.execute(new Runnable() {
2711       @Override
2712       public void run() {
2713         balancer.handleNameResolutionError(error);
2714       }
2715     });
2716   }
2717 
deliverResolvedAddresses( final List<EquivalentAddressGroup> backendAddrs, List<EquivalentAddressGroup> balancerAddrs)2718   private void deliverResolvedAddresses(
2719       final List<EquivalentAddressGroup> backendAddrs,
2720       List<EquivalentAddressGroup> balancerAddrs) {
2721     deliverResolvedAddresses(backendAddrs, balancerAddrs, GrpclbConfig.create(Mode.ROUND_ROBIN));
2722   }
2723 
deliverResolvedAddresses( final List<EquivalentAddressGroup> backendAddrs, List<EquivalentAddressGroup> balancerAddrs, final GrpclbConfig grpclbConfig)2724   private void deliverResolvedAddresses(
2725       final List<EquivalentAddressGroup> backendAddrs,
2726       List<EquivalentAddressGroup> balancerAddrs,
2727       final GrpclbConfig grpclbConfig) {
2728     final Attributes attrs =
2729         Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build();
2730     syncContext.execute(new Runnable() {
2731       @Override
2732       public void run() {
2733         balancer.acceptResolvedAddresses(
2734             ResolvedAddresses.newBuilder()
2735                 .setAddresses(backendAddrs)
2736                 .setAttributes(attrs)
2737                 .setLoadBalancingPolicyConfig(grpclbConfig)
2738                 .build());
2739       }
2740     });
2741   }
2742 
getLoadRecorder()2743   private GrpclbClientLoadRecorder getLoadRecorder() {
2744     return balancer.getGrpclbState().getLoadRecorder();
2745   }
2746 
createResolvedBackendAddresses(int n)2747   private static List<EquivalentAddressGroup> createResolvedBackendAddresses(int n) {
2748     List<EquivalentAddressGroup> list = new ArrayList<>();
2749     for (int i = 0; i < n; i++) {
2750       SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
2751       list.add(new EquivalentAddressGroup(addr));
2752     }
2753     return list;
2754   }
2755 
createResolvedBalancerAddresses(int n)2756   private static List<EquivalentAddressGroup> createResolvedBalancerAddresses(int n) {
2757     List<EquivalentAddressGroup> list = new ArrayList<>();
2758     for (int i = 0; i < n; i++) {
2759       SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
2760       list.add(new EquivalentAddressGroup(addr, lbAttributes(lbAuthority(i))));
2761     }
2762     return list;
2763   }
2764 
lbAuthority(int unused)2765   private static String lbAuthority(int unused) {
2766     // TODO(ejona): Support varying authorities
2767     return "lb.google.com";
2768   }
2769 
lbAttributes(String authority)2770   private static Attributes lbAttributes(String authority) {
2771     return Attributes.newBuilder()
2772         .set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority)
2773         .build();
2774   }
2775 
buildInitialResponse()2776   private static LoadBalanceResponse buildInitialResponse() {
2777     return buildInitialResponse(0);
2778   }
2779 
buildInitialResponse(long loadReportIntervalMillis)2780   private static LoadBalanceResponse buildInitialResponse(long loadReportIntervalMillis) {
2781     return LoadBalanceResponse.newBuilder()
2782         .setInitialResponse(
2783             InitialLoadBalanceResponse.newBuilder()
2784                 .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis)))
2785         .build();
2786   }
2787 
buildLbFallbackResponse()2788   private static LoadBalanceResponse buildLbFallbackResponse() {
2789     return LoadBalanceResponse.newBuilder()
2790         .setFallbackResponse(FallbackResponse.newBuilder().build())
2791         .build();
2792   }
2793 
buildLbResponse(List<ServerEntry> servers)2794   private static LoadBalanceResponse buildLbResponse(List<ServerEntry> servers) {
2795     ServerList.Builder serverListBuilder = ServerList.newBuilder();
2796     for (ServerEntry server : servers) {
2797       if (server.addr != null) {
2798         serverListBuilder.addServers(Server.newBuilder()
2799             .setIpAddress(ByteString.copyFrom(server.addr.getAddress().getAddress()))
2800             .setPort(server.addr.getPort())
2801             .setLoadBalanceToken(server.token)
2802             .build());
2803       } else {
2804         serverListBuilder.addServers(Server.newBuilder()
2805             .setDrop(true)
2806             .setLoadBalanceToken(server.token)
2807             .build());
2808       }
2809     }
2810     return LoadBalanceResponse.newBuilder()
2811         .setServerList(serverListBuilder.build())
2812         .build();
2813   }
2814 
xattr(List<EquivalentAddressGroup> lbAddr)2815   private List<EquivalentAddressGroup> xattr(List<EquivalentAddressGroup> lbAddr) {
2816     List<EquivalentAddressGroup> oobAddr = new ArrayList<>(lbAddr.size());
2817     for (EquivalentAddressGroup lb : lbAddr) {
2818       String authority = lb.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
2819       Attributes attrs = lb.getAttributes().toBuilder()
2820           .set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, authority)
2821           .build();
2822       oobAddr.add(new EquivalentAddressGroup(lb.getAddresses(), attrs));
2823     }
2824     return oobAddr;
2825   }
2826 
2827   private static class ServerEntry {
2828     final InetSocketAddress addr;
2829     final String token;
2830 
ServerEntry(String host, int port, String token)2831     ServerEntry(String host, int port, String token) {
2832       this.addr = new InetSocketAddress(host, port);
2833       this.token = token;
2834     }
2835 
2836     // Drop entry
ServerEntry(String token)2837     ServerEntry(String token) {
2838       this.addr = null;
2839       this.token = token;
2840     }
2841   }
2842 
2843   private static class FakeSubchannel extends Subchannel {
2844     private final Attributes attributes;
2845     private List<EquivalentAddressGroup> eags;
2846     private SubchannelStateListener listener;
2847 
FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes)2848     public FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes) {
2849       this.eags = Collections.unmodifiableList(eags);
2850       this.attributes = attributes;
2851     }
2852 
2853     @Override
getAllAddresses()2854     public List<EquivalentAddressGroup> getAllAddresses() {
2855       return eags;
2856     }
2857 
2858     @Override
getAttributes()2859     public Attributes getAttributes() {
2860       return attributes;
2861     }
2862 
2863     @Override
start(SubchannelStateListener listener)2864     public void start(SubchannelStateListener listener) {
2865       this.listener = checkNotNull(listener, "listener");
2866     }
2867 
2868     @Override
updateAddresses(List<EquivalentAddressGroup> addrs)2869     public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2870       this.eags = Collections.unmodifiableList(addrs);
2871     }
2872 
2873     @Override
shutdown()2874     public void shutdown() {
2875     }
2876 
2877     @Override
requestConnection()2878     public void requestConnection() {
2879     }
2880 
updateState(ConnectivityStateInfo newState)2881     public void updateState(ConnectivityStateInfo newState) {
2882       listener.onSubchannelState(newState);
2883     }
2884   }
2885 
2886   private class FakeHelper extends Helper {
2887 
2888     @Override
getSynchronizationContext()2889     public SynchronizationContext getSynchronizationContext() {
2890       return syncContext;
2891     }
2892 
2893     @Override
createOobChannel(List<EquivalentAddressGroup> eag, String authority)2894     public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag, String authority) {
2895       ManagedChannel channel =
2896           InProcessChannelBuilder
2897               .forName("fakeLb")
2898               .directExecutor()
2899               .overrideAuthority(authority)
2900               .build();
2901       fakeOobChannels.add(channel);
2902       oobChannelTracker.add(channel);
2903       return channel;
2904     }
2905 
2906     @Override
createOobChannel(EquivalentAddressGroup eag, String authority)2907     public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
2908       return createOobChannel(Collections.singletonList(eag), authority);
2909     }
2910 
2911     @Override
createSubchannel(CreateSubchannelArgs args)2912     public Subchannel createSubchannel(CreateSubchannelArgs args) {
2913       FakeSubchannel subchannel =
2914           mock(
2915               FakeSubchannel.class,
2916               AdditionalAnswers
2917                   .delegatesTo(new FakeSubchannel(args.getAddresses(), args.getAttributes())));
2918       mockSubchannels.add(subchannel);
2919       unpooledSubchannelTracker.add(subchannel);
2920       return subchannel;
2921     }
2922 
2923     @Override
getScheduledExecutorService()2924     public ScheduledExecutorService getScheduledExecutorService() {
2925       return fakeClock.getScheduledExecutorService();
2926     }
2927 
2928     @Override
getChannelLogger()2929     public ChannelLogger getChannelLogger() {
2930       return channelLogger;
2931     }
2932 
2933     @Override
updateBalancingState( @onnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker)2934     public void updateBalancingState(
2935         @Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
2936       currentPicker = newPicker;
2937     }
2938 
2939     @Override
refreshNameResolution()2940     public void refreshNameResolution() {
2941     }
2942 
2943     @Override
getAuthority()2944     public String getAuthority() {
2945       return SERVICE_AUTHORITY;
2946     }
2947 
2948     @Override
updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag)2949     public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
2950     }
2951 
2952     @Override
updateOobChannelAddresses(ManagedChannel channel, List<EquivalentAddressGroup> eag)2953     public void updateOobChannelAddresses(ManagedChannel channel,
2954         List<EquivalentAddressGroup> eag) {
2955     }
2956   }
2957 }
2958